Customizing eager
Ignoring missing upstream data
By default, AutomationCondition.eager()
will not materialize a target if it has any missing upstream data.
If it is expected to have missing upstream data, remove ~AutomationCondition.any_deps_missing()
from the eager policy to allow execution:
import dagster as dg
condition = (
dg.AutomationCondition.eager()
.without(~dg.AutomationCondition.any_deps_missing())
.with_label("eager_allow_missing")
)
Updating older time partitions
By default, AutomationCondition.eager()
will only update the latest time partition of an asset.
If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:
from dagster import AutomationCondition
condition = AutomationCondition.eager().without(
AutomationCondition.in_latest_time_window(),
)
Waiting for all blocking asset checks to complete before executing
The AutomationCondition.all_deps_blocking_checks_passed()
condition becomes true after all upstream blocking checks have passed.
This can be combined with AutomationCondition.eager()
to ensure that your asset does not execute if upstream data is failing data quality checks:
import dagster as dg
condition = (
dg.AutomationCondition.eager()
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)
Ignoring materializations from manual runs
By default, AutomationCondition.eager()
materializes a target whenever any upstream event occurs, regardless of the source of that event.
It can be useful to ignore runs of certain types when determining if an upstream asset should be considered "updated". This can be done using AutomationCondition.executed_with_tags()
to filter updates for runs with tags matching particular keys:
import dagster as dg
# detects if the latest run of the target was executed via an automation condition
executed_via_condition = dg.AutomationCondition.executed_with_tags(
tag_values={"dagster/from_automation_condition": "true"}
)
condition = dg.AutomationCondition.eager().replace(
"newly_updated",
dg.AutomationCondition.newly_updated() & executed_via_condition,
)
Ignoring dependencies
By default, AutomationCondition.eager()
will trigger a target if any upstream dependencies are updated.
In some cases, it can be useful to ignore some upstream dependencies that should not trigger downstream compute. This can be done by passing in an AssetSelection
to be ignored:
import dagster as dg
condition = dg.AutomationCondition.eager().ignore(dg.AssetSelection.assets("foo"))
Alternatively, you can pass in an AssetSelection
to be allowed:
import dagster as dg
condition = dg.AutomationCondition.eager().allow(dg.AssetSelection.groups("abc"))
Respecting data versioning
By default, AutomationCondition.eager()
will consider any upstream asset to be "updated" if it has been materialized, regardless of the data version of that materialization.
If you want to only consider upstream assets to be "updated" if the data version has changed, you can use AutomationCondition.data_version_changed()
:
import dagster as dg
condition = dg.AutomationCondition.eager().replace(
"newly_updated", dg.AutomationCondition.data_version_changed()
)