Customizing eager
You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>
. For more information, see the dg
CLI docs.
Ignoring missing upstream data
By default, AutomationCondition.eager()
will not materialize a target if it has any missing upstream data.
If it is expected to have missing upstream data, remove ~AutomationCondition.any_deps_missing()
from the eager policy to allow execution:
import dagster as dg
condition = (
dg.AutomationCondition.eager()
.without(~dg.AutomationCondition.any_deps_missing())
.with_label("eager_allow_missing")
)
Updating older time partitions
By default, AutomationCondition.eager()
will only update the latest time partition of an asset.
If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:
import dagster as dg
condition = dg.AutomationCondition.eager().without(
dg.AutomationCondition.in_latest_time_window(),
)
Waiting for all blocking asset checks to complete before executing
The AutomationCondition.all_deps_blocking_checks_passed()
condition becomes true after all upstream blocking checks have passed.
This can be combined with AutomationCondition.eager()
to ensure that your asset does not execute if upstream data is failing data quality checks:
import dagster as dg
condition = (
dg.AutomationCondition.eager()
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)
Ignoring materializations from manual runs
By default, AutomationCondition.eager()
materializes a target whenever any upstream event occurs, regardless of the source of that event.
It can be useful to ignore runs of certain types when determining if an upstream asset should be considered "updated". This can be done using AutomationCondition.any_new_update_has_run_tags()
to filter updates for runs with tags matching particular keys:
import dagster as dg
# detects if any of the new updates of the target was executed via an automation condition
executed_via_condition = dg.AutomationCondition.any_new_update_has_run_tags(
tag_values={"dagster/from_automation_condition": "true"}
)
condition = dg.AutomationCondition.eager().replace(
"newly_updated",
dg.AutomationCondition.newly_updated() & executed_via_condition,
)
Ignoring dependencies
By default, AutomationCondition.eager()
will trigger a target if any upstream dependencies are updated.
In some cases, it can be useful to ignore some upstream dependencies that should not trigger downstream compute. This can be done by passing in an AssetSelection
to be ignored:
import dagster as dg
condition = dg.AutomationCondition.eager().ignore(dg.AssetSelection.assets("foo"))
Alternatively, you can pass in an AssetSelection
to be allowed:
import dagster as dg
condition = dg.AutomationCondition.eager().allow(dg.AssetSelection.groups("abc"))
Respecting data versioning
By default, AutomationCondition.eager()
will consider any upstream asset to be "updated" if it has been materialized, regardless of the data version of that materialization.
If you want to only consider upstream assets to be "updated" if the data version has changed, you can use AutomationCondition.data_version_changed()
:
import dagster as dg
condition = dg.AutomationCondition.eager().replace(
"any_deps_updated",
dg.AutomationCondition.any_deps_updated().replace(
"newly_updated", dg.AutomationCondition.data_version_changed()
),
)
Combining scheduled and dependency-driven execution
For more complex automation patterns, you can combine scheduled execution with dependency-driven updates. This pattern ensures regular execution on a schedule while also allowing for more frequent updates when dependencies change, with additional safety checks:
import dagster as dg
daily_success_condition = dg.AutomationCondition.newly_updated().since(
dg.AutomationCondition.on_cron("*/5 * * * *")
)
custom_condition = (
dg.AutomationCondition.on_cron("*/5 * * * *") # Runs every 5 minutes
| (
dg.AutomationCondition.any_deps_updated() # When any dependency updates
& daily_success_condition # Only if asset was updated since daily cron
& ~dg.AutomationCondition.any_deps_missing() # But not if deps are missing
& ~dg.AutomationCondition.any_deps_in_progress() # And not if deps are in progress
)
)
@dg.asset
def upstream_asset_1(): ...
@dg.asset
def upstream_asset_2(): ...
@dg.asset(
automation_condition=custom_condition, deps=["upstream_asset_1", "upstream_asset_2"]
)
def automation_condition_combination(context: dg.AssetExecutionContext): ...
The custom_condition
in this example will execute the target asset in two scenarios:
- Scheduled execution: Runs on the specified cron schedule (every 5 minutes in this example)
- Dependency-driven execution: Runs when upstream dependencies are updated, but only if:
- The asset was successfully updated since the last scheduled run
- No upstream dependencies are missing
- No upstream dependencies are currently in progress
This approach is particularly useful for assets that need guaranteed regular execution ,but should also respond quickly to upstream changes, while avoiding execution during problematic states.