Migrating from sensors to Declarative Automation
Asset sensors and multi-asset sensors provide imperative control over when to launch runs in response to asset materializations. Declarative Automation replaces much of this with a composable, condition-based model where you describe what should happen rather than how to detect it.
This guide covers common sensor patterns, their Declarative Automation equivalents, caveats, and strategies for handling scenarios like dynamic run configuration.
Advantages of Declarative Automation
Declarative Automation provides several advantages over imperative sensors for asset-centric workflows:
- Asset-level observability. With sensors, understanding why an asset did or didn't execute requires reading sensor logs and tracing cursor state. With Declarative Automation, every asset shows its automation condition evaluation directly in the Dagster UI — you can inspect the condition tree, see which sub-conditions are true or false, and understand exactly why an asset was or wasn't requested on any given tick.
- No separate orchestration code. Sensors require a separate sensor function definition and job definition. Declarative Automation conditions are declared directly on the asset, eliminating the need for standalone orchestration code that can drift out of sync with the assets it manages.
- Dependency-aware by default. Built-in conditions like
eager()andon_cron()automatically respect the asset graph — they understand upstream/downstream relationships, partition mappings, and in-progress state. Sensors must manually implement this awareness through cursor management and event inspection. - Composable and reusable. Conditions are built from small, composable operators (
since,newly_true,all_deps_match) that can be combined, customized, and shared across assets. Sensor logic tends to be bespoke per sensor, making it harder to maintain consistency across a growing asset graph. - Reduced boilerplate. Common patterns like "run after all dependencies update" or "run on a schedule when dependencies are fresh" are one-liners with
on_cron(), replacing dozens of lines of sensor code with cursor management, event filtering, and job wiring.
When to migrate
Declarative Automation is a good fit if your sensor does one of the following:
- Triggers downstream assets when upstream assets update
- Waits for all (or any) of a set of dependencies before launching a run
- Schedules assets on a cron cadence after dependencies are fresh
- Fills in missing partitions when upstream data arrives
Keep your sensor if it:
- It's working as intended. If declarative automation does not help you achieve a concrete goal, we recommend keeping your sensor approach.
- Performs side effects (sends Slack notifications, writes audit logs, calls external APIs).
- Needs to inspect materialization metadata (for example, row count thresholds) to decide whether to trigger. However, this can often be handled with asset checks and an automation condition that gates on upstream checks passing.
- Produces
RunRequestobjects with dynamicrun_configthat varies per materialization event (though see Migrating run config for strategies to move this logic). - Needs to accumulate multiple materialization events before triggering (for example, "wait for 5 materializations before firing"). Declarative Automation conditions are boolean per evaluation tick, not counters.
Migrating single-asset sensors
Single-asset sensors (@asset_sensor) monitor one upstream asset and trigger a job when it materializes. These are the simplest sensors to migrate because AutomationCondition.eager is a near-direct replacement.
Basic trigger on upstream update
- Sensor
- Declarative Automation
import dagster as dg
downstream_job = dg.define_asset_job("downstream_job", selection=["downstream"])
@dg.asset_sensor(asset_key=dg.AssetKey("raw_data"), job=downstream_job)
def raw_data_sensor(context: dg.SensorEvaluationContext, asset_event: dg.EventLogEntry):
return dg.RunRequest()
import dagster as dg
@dg.asset(
deps=["raw_data"],
automation_condition=dg.AutomationCondition.eager(),
)
def downstream(): ...
eager() triggers whenever any dependency updates, which for a single dependency is equivalent to "trigger when this asset updates." It also includes guards that prevent execution when dependencies are missing or in progress. See the docs to learn more.
If your @asset_sensor monitors an asset in a different code location, no special handling is needed, since Declarative Automation natively detects materializations across code locations. Declare the upstream asset key as a dependency and use eager() as shown above.
Conditional trigger based on metadata
- Sensor
- Declarative Automation
import dagster as dg
analytics_job = dg.define_asset_job("analytics_job", selection=["analytics"])
@dg.asset_sensor(asset_key=dg.AssetKey("daily_sales"), job=analytics_job)
def sales_sensor(context, asset_event):
metadata = asset_event.dagster_event.event_specific_data.materialization.metadata
row_count = metadata.get("row_count").value if "row_count" in metadata else 0
if row_count > 1000:
return dg.RunRequest()
return dg.SkipReason(f"Row count {row_count} below threshold")
Automation conditions don't have access to materialization metadata directly. Instead, use an asset check to inspect the metadata and gate downstream execution:
import dagster as dg
def load_sales_data() -> int:
return 1500
@dg.asset
def daily_sales() -> dg.MaterializeResult:
row_count = load_sales_data()
return dg.MaterializeResult(metadata={"row_count": row_count})
@dg.asset_check(asset="daily_sales")
def sales_row_count_check(
context: dg.AssetCheckExecutionContext,
) -> dg.AssetCheckResult:
event = context.instance.get_latest_materialization_event(
dg.AssetKey("daily_sales")
)
metadata = event.asset_materialization.metadata
row_count = metadata["row_count"].value
passed = row_count > 1000
return dg.AssetCheckResult(
passed=passed,
severity=dg.AssetCheckSeverity.ERROR,
metadata={"row_count": row_count},
)
all_deps_checks_passed = dg.AutomationCondition.all_deps_match(
dg.AutomationCondition.all_checks_match(
~dg.AutomationCondition.check_failed()
| dg.AutomationCondition.will_be_requested(),
)
)
@dg.asset(
deps=["daily_sales"],
automation_condition=dg.AutomationCondition.eager() & all_deps_checks_passed,
)
def analytics(): ...
How this works:
daily_salesmaterializes and reportsrow_countin its metadata.sales_row_count_checkis an asset check ondaily_sales— it reads the materialization metadata from the event log and fails withERRORseverity if the row count is below the threshold.analyticsuseseager()combined with a custom condition that gates on all upstream asset checks passing. Theeager()condition detects thatdaily_salesupdated, but the checks condition holds execution until no upstream checks are in a failed state. If any check fails,analyticsdoes not execute.
Migrating multi-asset sensors
Pattern 1: All dependencies updated to trigger downstream
- Sensor
- Declarative Automation
import dagster as dg
downstream_job = dg.define_asset_job("downstream_job", selection=["downstream"])
@dg.multi_asset_sensor(
monitored_assets=[dg.AssetKey("asset_a"), dg.AssetKey("asset_b")],
job=downstream_job,
)
def all_deps_sensor(context):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return dg.RunRequest()
import dagster as dg
@dg.asset(
deps=["asset_a", "asset_b"],
automation_condition=dg.AutomationCondition.on_cron("0 * * * *"),
)
def downstream(): ...
This sensor checks that every monitored asset has a new materialization before triggering. AutomationCondition.eager is not the right replacement, since it fires when any dep updates. Use AutomationCondition.on_cron instead.
Why on_cron is recommended
on_cron waits until all dependencies have updated since the latest cron tick before requesting execution. For more detail on how on_cron works, see the guide.
- Synchronization guarantee. The cron tick acts as a synchronization barrier. All dependencies must update within the same cron window, preventing desynchronization issues that plague event-driven "all dependencies" approaches (see why custom all-dependencies conditions are risky).
- Simple to reason about. "Run once per hour after all dependencies are fresh" is easy to understand and debug.
- Built-in deduplication. Only fires once per cron window, even if dependencies update multiple times.
The trade-off is that you need to pick a cron schedule. The interval must be wide enough that all dependencies complete within a single window. If your dependencies all complete within a 30-minute window, an hourly cron works. A very frequent cron like "* * * * *" (every minute) actually makes this worse, as it creates more cron ticks that can land between dependency updates and reset the tracking.
For advanced uses of on_cron, see the customizing on_cron guide.
Why custom "all dependencies updated" conditions are risky
If you want to trigger as soon as all dependencies have updated since the target's last materialization (no cron boundary), you might try building a custom condition using AutomationCondition.asset_matches:
all_deps_updated_since_target = (
dg.AutomationCondition.all_deps_match(
dg.AutomationCondition.newly_updated().since(
dg.AutomationCondition.asset_matches(
"my_downstream_asset",
dg.AutomationCondition.newly_requested()
| dg.AutomationCondition.newly_updated(),
)
)
)
& ~dg.AutomationCondition.in_progress()
)
This condition individually tracks each upstream dependency and checks whether it has been updated more recently than the target asset was last requested or materialized. Unlike the broken all_deps_match(newly_updated()) approach (where newly_updated() is only true for a single tick), this uses .since() to create persistent per-dep tracking.
However, this approach has serious desynchronization risks. Consider a target asset with two upstream dependencies that update on different cadences, one daily and one weekly:
-
Normal operation works fine. Both dependencies update, the condition is satisfied, the target materializes. The
.since()resets and waits for both dependencies to update again. -
Manual backfill breaks synchronization. If someone manually backfills the weekly asset mid-week, the daily asset has already updated since the target's last materialization. The condition is immediately satisfied and fires an unwanted run — the target materializes against the backfilled weekly data and the latest daily data, even though you probably wanted to wait for the next natural cycle.
-
Desynchronization can persist. With two weekly dependencies, a mid-week backfill of one doesn't fire (the other weekly hasn't updated). But the next week, whichever weekly dependency materializes first now satisfies the condition for the backfilled dependency (which updated mid-week, more recently than the target). The target fires before the second weekly dependency has this week's data, and this off-by-one-week pattern continues indefinitely.
The root cause is that an event-driven "all dependencies updated since target" condition has no synchronization barrier. Unlike a cron tick that forces all dependencies into the same time window, the condition can be satisfied by dependencies that updated at different times for different reasons.
on_cron avoids this entirely because the cron tick resets tracking for every dependency simultaneously. A backfilled asset's update only counts if it happened after the latest cron tick, which is the same tick that every other dependency is measured against.
The asset_matches approach also requires hardcoding the target asset key in the condition because there is currently no built-in way for a condition to reference the asset it is applied to. This means the condition is not reusable across assets.
Pattern 2: Any dependency updated to trigger downstream
- Sensor
- Declarative Automation
import dagster as dg
downstream_job = dg.define_asset_job("downstream_job", selection=["downstream"])
@dg.multi_asset_sensor(
monitored_assets=[dg.AssetKey("asset_a"), dg.AssetKey("asset_b")],
job=downstream_job,
)
def any_dep_sensor(context):
asset_events = context.latest_materialization_records_by_key()
if any(asset_events.values()):
context.advance_all_cursors()
return dg.RunRequest()
import dagster as dg
@dg.asset(
deps=["asset_a", "asset_b"],
automation_condition=dg.AutomationCondition.eager(),
)
def downstream(): ...
eager() triggers when any dependency updates. It also includes guards: it won't fire if any dependency is missing or if the target is already in progress.
Pattern 3: Partitioned assets — map upstream to downstream partitions
- Sensor
- Declarative Automation
import dagster as dg
@dg.asset(partitions_def=dg.HourlyPartitionsDefinition("2024-01-01-00:00"))
def hourly_asset(): ...
@dg.asset(
deps=["hourly_asset"],
partitions_def=dg.WeeklyPartitionsDefinition(start_date="2024-01-01"),
)
def weekly_asset(): ...
weekly_job = dg.define_asset_job("weekly_job", selection=["weekly_asset"])
@dg.multi_asset_sensor(
monitored_assets=[hourly_asset.key],
job=weekly_job,
)
def hourly_to_weekly(context):
for partition, record in context.latest_materialization_records_by_partition(
hourly_asset.key
).items():
mapped = context.get_downstream_partition_keys(
partition,
from_asset_key=hourly_asset.key,
to_asset_key=weekly_asset.key,
)
for p in mapped:
yield weekly_job.run_request_for_partition(partition_key=p)
context.advance_cursor({hourly_asset.key: record})
When both assets have partition definitions with a defined mapping, Declarative Automation automatically handles the partition fan-in/fan-out. Choose between AutomationCondition.on_missing and eager() based on whether the downstream should re-materialize when upstream partitions re-run:
# on_missing: materialize each weekly partition ONCE when all hourly partitions exist.
# Does NOT re-trigger if hourly partitions re-run later.
@dg.asset(
deps=["hourly_asset"],
partitions_def=dg.WeeklyPartitionsDefinition(start_date="2024-01-01"),
automation_condition=dg.AutomationCondition.on_missing(),
)
def weekly_asset_on_missing(): ...
# eager: materialize when all hourly partitions exist, AND re-materialize whenever
# any hourly partition updates. This can produce many redundant downstream runs.
@dg.asset(
deps=["hourly_asset"],
partitions_def=dg.WeeklyPartitionsDefinition(start_date="2024-01-01"),
automation_condition=dg.AutomationCondition.eager(),
)
def weekly_asset_eager(): ...
For most partition fan-in use cases, on_missing is the better default. If you need re-materialization on upstream refreshes, use eager, but be aware of the volume of downstream runs it can produce.
Pattern 4: Only trigger when specific upstream assets update
- Sensor
- Declarative Automation
import dagster as dg
downstream_job = dg.define_asset_job("downstream_job", selection=["downstream"])
@dg.multi_asset_sensor(
monitored_assets=[dg.AssetKey("important_source")],
job=downstream_job,
)
def selective_sensor(context):
events = context.latest_materialization_records_by_key()
if events.get(dg.AssetKey("important_source")):
context.advance_all_cursors()
return dg.RunRequest()
import dagster as dg
@dg.asset(
deps=["important_source", "static_config"],
automation_condition=dg.AutomationCondition.eager().ignore(
dg.AssetSelection.keys("static_config")
),
)
def downstream(): ...
Use .ignore() to exclude specific dependencies from triggering automation, and .allow() to restrict to only specific dependencies.
Migrating sensors that poll external system
A common pattern is a sensor that queries an external system (e.g., a relational database's information schema, or a set of files in blob storage) to detect when the data within that system has been updated, and then launches a Dagster job in response.
- Sensor
- Declarative Automation
import dagster as dg
@dg.asset
def sales_report(): ...
analytics_job = dg.define_asset_job("analytics_job", selection=["sales_report"])
@dg.sensor(job=analytics_job, minimum_interval_seconds=60)
def db_table_sensor(context: dg.SensorEvaluationContext):
last_checked = float(context.cursor) if context.cursor else 0
# Query the database's information schema for table update times
updated_tables = query_information_schema(last_checked)
if updated_tables:
context.update_cursor(str(max(t["last_altered"] for t in updated_tables)))
return dg.RunRequest()
def query_information_schema(since_timestamp: float) -> list:
# In practice, this would query your database, e.g.:
# SELECT table_name, last_altered
# FROM information_schema.tables
# WHERE last_altered > :since_timestamp
...
The sensor handles two responsibilities: detecting external changes and triggering downstream work. This couples detection logic to orchestration, and the downstream assets have no visibility into which upstream tables actually changed.
The recommended approach splits these responsibilities:
- Model the external tables as external assets. Each table gets an
AssetSpec, making it visible in the asset graph and available as a dependency. - Use a sensor to report materializations when the information schema shows a table has been updated. This sensor doesn't launch jobs — it just records that an external asset was updated.
- Downstream assets use Declarative Automation (
eager(),on_cron(), etc.) to react to those updates through the normal asset graph.
from datetime import datetime, timezone
import dagster as dg
raw_orders = dg.AssetSpec("raw_orders", group_name="external_db")
raw_customers = dg.AssetSpec("raw_customers", group_name="external_db")
@dg.sensor(minimum_interval_seconds=60)
def db_table_update_sensor(
context: dg.SensorEvaluationContext,
) -> dg.SensorResult:
last_checked = float(context.cursor) if context.cursor else 0
updated_tables = query_information_schema(last_checked)
asset_events = [
dg.AssetMaterialization(
asset_key=table["table_name"],
metadata={
"last_altered": dg.MetadataValue.text(
datetime.fromtimestamp(
table["last_altered"], tz=timezone.utc
).isoformat()
),
},
)
for table in updated_tables
]
new_cursor = (
str(max(t["last_altered"] for t in updated_tables))
if updated_tables
else str(last_checked)
)
return dg.SensorResult(asset_events=asset_events, cursor=new_cursor)
@dg.asset(
deps=[raw_orders, raw_customers],
automation_condition=dg.AutomationCondition.on_cron("0 * * * *"),
)
def sales_report(): ...
@dg.asset(
deps=[raw_orders],
automation_condition=dg.AutomationCondition.eager(),
)
def order_metrics(): ...
def query_information_schema(since_timestamp: float) -> list:
# In practice, this would query your database, e.g.:
# SELECT table_name, last_altered
# FROM information_schema.tables
# WHERE last_altered > :since_timestamp
...
This approach gives you:
- Full lineage visibility. External tables appear in the asset graph alongside Dagster-managed assets, so you can trace data flow from source to downstream.
- Decoupled detection from orchestration. The sensor's only job is to report that external data changed. Downstream scheduling is handled by automation conditions, which can differ per asset —
eager()for latency-sensitive assets,on_cron()for batch assets that need all dependencies fresh. - Standard automation tooling. The same condition evaluation UI, tick history, and debugging tools that work for Dagster-native assets also work for assets downstream of external assets.
For full details on modeling external assets, reporting materializations with sensors or the REST API, and building dependency graphs of external assets, see the External assets guide.
Migrating run config
Sensors can build RunRequest objects with arbitrary run_config that varies per materialization event. Declarative Automation launches runs with a fixed configuration. This section covers strategies for handling this gap.
Strategy 1: Move config into the asset function
If your sensor passes configuration derived from upstream metadata, move that logic into the asset itself. The asset can read from the event log at execution time.
- Sensor
- Declarative Automation
import dagster as dg
processing_job = dg.define_asset_job("processing_job", selection=["processed_data"])
@dg.multi_asset_sensor(
monitored_assets=[dg.AssetKey("raw_data")],
job=processing_job,
)
def config_sensor(context):
events = context.latest_materialization_records_by_key()
record = events.get(dg.AssetKey("raw_data"))
if record:
metadata = record.asset_materialization.metadata
file_path = metadata["file_path"].value
context.advance_all_cursors()
return dg.RunRequest(
run_config={"ops": {"process": {"config": {"file_path": file_path}}}}
)
import dagster as dg
def process_file(file_path: str): ...
@dg.asset(
deps=["raw_data"],
automation_condition=dg.AutomationCondition.eager(),
)
def processed_data(context: dg.AssetExecutionContext):
instance = context.instance
event = instance.get_latest_materialization_event(dg.AssetKey("raw_data"))
metadata = event.asset_materialization.metadata
file_path = metadata["file_path"].value
return process_file(file_path)
This is the most common approach. The asset is self-contained and doesn't rely on external orchestration to pass it the right parameters.
Strategy 2: Use tags on the automation sensor
If you need to attach fixed metadata to all runs launched by automation, use run_tags on the AutomationConditionSensorDefinition:
import dagster as dg
automation_sensor = dg.AutomationConditionSensorDefinition(
"my_automation_sensor",
target=dg.AssetSelection.all(),
default_status=dg.DefaultSensorStatus.RUNNING,
run_tags={"team": "data-eng", "source": "automation"},
)
This doesn't replace dynamic run config, but it handles the common case of tagging runs for observability or routing.
When to keep the sensor
If your sensor's run config truly varies per materialization event in ways that can't be derived at execution time (e.g., the sensor receives webhook payloads with one-time tokens, or the config depends on which specific materialization event triggered the sensor out of many), keep the sensor. Not every sensor needs to be migrated.
Setting up the automation condition sensor
Declarative Automation conditions are evaluated by an AutomationConditionSensorDefinition. A default sensor is created automatically for any code location with automation conditions — you just need to toggle it on in the UI under Automation. For custom sensor configurations (evaluation frequency, run tags, asset selection scoping), see Automation condition sensors.
Caveats
Declarative Automation is job-less
Declarative automation is powerful, but because it is at the asset level, there is no longer a job with a schedule that "contains" a set of assets. Jobs provide a helpful way to observe and reason about your data pipelines, and we recommend using schedules and jobs as long as they are useful before switching to a more complex and flexible system like declarative automation.
eager() fires on any dependency update, not all
If your multi-asset sensor waits for all monitored assets before triggering, eager() is not a replacement, since it fires when any dependency updates. For alternatives, see Pattern 1.
No side effects in conditions
Automation conditions should be true or false. Move side effects to run status sensors or asset checks.
Cursor semantics are different
In Declarative Automation, cursor state is managed automatically via .since() and .since_last_handled(). You can't implement patterns like "skip every other materialization" or "batch N events."
Evaluation frequency
The automation condition sensor defaults to 30 second evaluations, but complex conditions over many assets can take longer. For large deployments, consider splitting into multiple sensors with targeted AssetSelection.
Run grouping behavior
Declarative Automation groups assets into runs automatically when downstream conditions depend on upstream assets being requested in the same tick. This means you have less control over run boundaries than with explicit RunRequest objects.
Partitioned assets with on_cron
For time-partitioned assets, on_cron only targets the latest time partition. For all partitions or specific historical partitions, use on_missing() or a custom condition with in_latest_time_window(lookback_delta=...).
Observable source assets
If your upstream is an @observable_source_asset, eager() only treats an observation as an "update" if the data version changes. Regular @asset materializations are always treated as updates.