Define environment-aware assets
A key challenge in multi-environment deployments is writing code that adapts to its environment without requiring separate codebases. Dagster+ injects a DAGSTER_CLOUD_DEPLOYMENT_NAME environment variable into every run, which you can use to drive environment-specific behavior.
Step 1: Read the deployment name
Create a helper function that reads the deployment name and falls back to "local" when running outside of Dagster+:
def get_deployment_name() -> str:
"""Return the current Dagster+ deployment name, falling back to 'local'."""
return os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "local")
The fallback to "local" means your assets behave like dev assets when you run dg dev on your laptop — no special configuration needed.
Step 2: Define assets with environment-aware behavior
Organize assets into three groups that represent the layers of a typical data pipeline: raw_data, staging, and analytics. Each asset uses the deployment name to adjust its behavior — for example, processing more rows in prod than in dev.
The raw_data group ingests from source systems:
@dg.asset(group_name="raw_data", tags={"layer": "raw"})
def raw_orders(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Ingest raw orders from source system.
In a real pipeline this would read from a database or API.
The source connection varies by deployment environment.
"""
deployment = get_deployment_name()
source_db = os.getenv("SOURCE_DATABASE_URL", "sqlite:///local.db")
context.log.info(f"Reading orders from {source_db} (deployment={deployment})")
row_count = 1000 if deployment == "prod" else 100
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(row_count),
"deployment": dg.MetadataValue.text(deployment),
"source": dg.MetadataValue.text(source_db),
}
)
@dg.asset(group_name="raw_data", tags={"layer": "raw"})
def raw_customers(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Ingest raw customers from source system."""
deployment = get_deployment_name()
context.log.info(f"Reading customers (deployment={deployment})")
row_count = 5000 if deployment == "prod" else 50
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(row_count),
"deployment": dg.MetadataValue.text(deployment),
}
)
The staging group cleans and validates the raw data:
@dg.asset(group_name="staging", tags={"layer": "staging"}, deps=[raw_orders])
def cleaned_orders(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Clean and validate raw orders."""
deployment = get_deployment_name()
context.log.info(f"Cleaning orders (deployment={deployment})")
return dg.MaterializeResult(metadata={"deployment": dg.MetadataValue.text(deployment)})
@dg.asset(group_name="staging", tags={"layer": "staging"}, deps=[raw_customers])
def cleaned_customers(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Clean and validate raw customers."""
deployment = get_deployment_name()
context.log.info(f"Cleaning customers (deployment={deployment})")
return dg.MaterializeResult(metadata={"deployment": dg.MetadataValue.text(deployment)})
The analytics group computes downstream metrics:
@dg.asset(
group_name="analytics",
tags={"layer": "analytics", "domain": "revenue"},
deps=[cleaned_orders, cleaned_customers],
)
def daily_revenue(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Compute daily revenue metrics by joining orders and customers."""
deployment = get_deployment_name()
warehouse = os.getenv("WAREHOUSE_URL", "duckdb:///local_warehouse.db")
context.log.info(f"Computing revenue → {warehouse} (deployment={deployment})")
return dg.MaterializeResult(
metadata={
"deployment": dg.MetadataValue.text(deployment),
"warehouse": dg.MetadataValue.text(warehouse),
}
)
The asset metadata records which deployment materialized the asset, making it easy to trace runs back to their environment in the Dagster+ UI.
Step 3: Define a schedule
Add a schedule in a separate file that runs the full analytics pipeline every morning:
daily_revenue_job = dg.define_asset_job(
name="daily_revenue_job",
selection=dg.AssetSelection.groups("analytics").upstream(),
tags={"team": "data-engineering"},
)
daily_revenue_schedule = dg.ScheduleDefinition(
name="daily_revenue_schedule",
job=daily_revenue_job,
cron_schedule="0 6 * * *", # 6 AM daily
default_status=dg.DefaultScheduleStatus.STOPPED,
)
Setting default_status=STOPPED means the schedule won't automatically activate in branch deployments or when the code location first loads — you enable it explicitly per environment.
Step 4: Configure the code location
The dagster_cloud.yaml file tells Dagster+ how to find your code and which container registry to pull images from:
locations:
- location_name: dagster_deploy_demo
code_source:
package_name: project_dagster_plus_deployment
build:
directory: .
registry: ${DAGSTER_CONTAINER_REGISTRY}
The ${DAGSTER_CONTAINER_REGISTRY} placeholder is substituted at deploy time by the CI/CD pipeline using the CONTAINER_REGISTRY GitHub variable.
Step 5: Verify the definitions load
Use dg check to confirm all assets and schedules are valid before moving on:
dg check defs
Next steps
Continue this example with containerizing the project.