Skip to main content

Define environment-aware assets

A key challenge in multi-environment deployments is writing code that adapts to its environment without requiring separate codebases. Dagster+ injects a DAGSTER_CLOUD_DEPLOYMENT_NAME environment variable into every run, which you can use to drive environment-specific behavior.

Step 1: Read the deployment name

Create a helper function that reads the deployment name and falls back to "local" when running outside of Dagster+:

src/project_dagster_plus_deployment/defs/assets.py
def get_deployment_name() -> str:
"""Return the current Dagster+ deployment name, falling back to 'local'."""
return os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "local")

The fallback to "local" means your assets behave like dev assets when you run dg dev on your laptop — no special configuration needed.

Step 2: Define assets with environment-aware behavior

Organize assets into three groups that represent the layers of a typical data pipeline: raw_data, staging, and analytics. Each asset uses the deployment name to adjust its behavior — for example, processing more rows in prod than in dev.

The raw_data group ingests from source systems:

src/project_dagster_plus_deployment/defs/assets.py
@dg.asset(group_name="raw_data", tags={"layer": "raw"})
def raw_orders(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Ingest raw orders from source system.

In a real pipeline this would read from a database or API.
The source connection varies by deployment environment.
"""
deployment = get_deployment_name()
source_db = os.getenv("SOURCE_DATABASE_URL", "sqlite:///local.db")
context.log.info(f"Reading orders from {source_db} (deployment={deployment})")

row_count = 1000 if deployment == "prod" else 100
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(row_count),
"deployment": dg.MetadataValue.text(deployment),
"source": dg.MetadataValue.text(source_db),
}
)


@dg.asset(group_name="raw_data", tags={"layer": "raw"})
def raw_customers(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Ingest raw customers from source system."""
deployment = get_deployment_name()
context.log.info(f"Reading customers (deployment={deployment})")

row_count = 5000 if deployment == "prod" else 50
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(row_count),
"deployment": dg.MetadataValue.text(deployment),
}
)

The staging group cleans and validates the raw data:

src/project_dagster_plus_deployment/defs/assets.py
@dg.asset(group_name="staging", tags={"layer": "staging"}, deps=[raw_orders])
def cleaned_orders(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Clean and validate raw orders."""
deployment = get_deployment_name()
context.log.info(f"Cleaning orders (deployment={deployment})")
return dg.MaterializeResult(metadata={"deployment": dg.MetadataValue.text(deployment)})


@dg.asset(group_name="staging", tags={"layer": "staging"}, deps=[raw_customers])
def cleaned_customers(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Clean and validate raw customers."""
deployment = get_deployment_name()
context.log.info(f"Cleaning customers (deployment={deployment})")
return dg.MaterializeResult(metadata={"deployment": dg.MetadataValue.text(deployment)})

The analytics group computes downstream metrics:

src/project_dagster_plus_deployment/defs/assets.py
@dg.asset(
group_name="analytics",
tags={"layer": "analytics", "domain": "revenue"},
deps=[cleaned_orders, cleaned_customers],
)
def daily_revenue(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
"""Compute daily revenue metrics by joining orders and customers."""
deployment = get_deployment_name()
warehouse = os.getenv("WAREHOUSE_URL", "duckdb:///local_warehouse.db")
context.log.info(f"Computing revenue → {warehouse} (deployment={deployment})")
return dg.MaterializeResult(
metadata={
"deployment": dg.MetadataValue.text(deployment),
"warehouse": dg.MetadataValue.text(warehouse),
}
)

The asset metadata records which deployment materialized the asset, making it easy to trace runs back to their environment in the Dagster+ UI.

Step 3: Define a schedule

Add a schedule in a separate file that runs the full analytics pipeline every morning:

src/project_dagster_plus_deployment/defs/schedules.py
daily_revenue_job = dg.define_asset_job(
name="daily_revenue_job",
selection=dg.AssetSelection.groups("analytics").upstream(),
tags={"team": "data-engineering"},
)

daily_revenue_schedule = dg.ScheduleDefinition(
name="daily_revenue_schedule",
job=daily_revenue_job,
cron_schedule="0 6 * * *", # 6 AM daily
default_status=dg.DefaultScheduleStatus.STOPPED,
)

Setting default_status=STOPPED means the schedule won't automatically activate in branch deployments or when the code location first loads — you enable it explicitly per environment.

Step 4: Configure the code location

The dagster_cloud.yaml file tells Dagster+ how to find your code and which container registry to pull images from:

dagster_cloud.yaml
locations:
- location_name: dagster_deploy_demo
code_source:
package_name: project_dagster_plus_deployment
build:
directory: .
registry: ${DAGSTER_CONTAINER_REGISTRY}

The ${DAGSTER_CONTAINER_REGISTRY} placeholder is substituted at deploy time by the CI/CD pipeline using the CONTAINER_REGISTRY GitHub variable.

Step 5: Verify the definitions load

Use dg check to confirm all assets and schedules are valid before moving on:

dg check defs

Next steps

Continue this example with containerizing the project.