Skip to main content

Customizing eager

tip

You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.

Ignoring missing upstream data

By default, AutomationCondition.eager() will not materialize a target if it has any missing upstream data.

If it is expected to have missing upstream data, remove ~AutomationCondition.any_deps_missing() from the eager policy to allow execution:

src/<project_name>/defs/assets.py
import dagster as dg

condition = (
dg.AutomationCondition.eager()
.without(~dg.AutomationCondition.any_deps_missing())
.with_label("eager_allow_missing")
)

Updating older time partitions

By default, AutomationCondition.eager() will only update the latest time partition of an asset.

If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:

src/<project_name>/defs/assets.py
import dagster as dg

condition = dg.AutomationCondition.eager().without(
dg.AutomationCondition.in_latest_time_window(),
)

Waiting for all blocking asset checks to complete before executing

The AutomationCondition.all_deps_blocking_checks_passed() condition becomes true after all upstream blocking checks have passed.

This can be combined with AutomationCondition.eager() to ensure that your asset does not execute if upstream data is failing data quality checks:

src/<project_name>/defs/assets.py
import dagster as dg

condition = (
dg.AutomationCondition.eager()
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)

Ignoring materializations from manual runs

By default, AutomationCondition.eager() materializes a target whenever any upstream event occurs, regardless of the source of that event.

It can be useful to ignore runs of certain types when determining if an upstream asset should be considered "updated". This can be done using AutomationCondition.any_new_update_has_run_tags() to filter updates for runs with tags matching particular keys:

src/<project_name>/defs/assets.py
import dagster as dg

# detects if any of the new updates of the target was executed via an automation condition
executed_via_condition = dg.AutomationCondition.any_new_update_has_run_tags(
tag_values={"dagster/from_automation_condition": "true"}
)

condition = dg.AutomationCondition.eager().replace(
"newly_updated",
dg.AutomationCondition.newly_updated() & executed_via_condition,
)

Ignoring dependencies

By default, AutomationCondition.eager() will trigger a target if any upstream dependencies are updated.

In some cases, it can be useful to ignore some upstream dependencies that should not trigger downstream compute. This can be done by passing in an AssetSelection to be ignored:

src/<project_name>/defs/assets.py
import dagster as dg

condition = dg.AutomationCondition.eager().ignore(dg.AssetSelection.assets("foo"))

Alternatively, you can pass in an AssetSelection to be allowed:

src/<project_name>/defs/assets.py
import dagster as dg

condition = dg.AutomationCondition.eager().allow(dg.AssetSelection.groups("abc"))

Respecting data versioning

By default, AutomationCondition.eager() will consider any upstream asset to be "updated" if it has been materialized, regardless of the data version of that materialization.

If you want to only consider upstream assets to be "updated" if the data version has changed, you can use AutomationCondition.data_version_changed():

src/<project_name>/defs/assets.py
import dagster as dg

condition = dg.AutomationCondition.eager().replace(
"any_deps_updated",
dg.AutomationCondition.any_deps_updated().replace(
"newly_updated", dg.AutomationCondition.data_version_changed()
),
)

Combining scheduled and dependency-driven execution

For more complex automation patterns, you can combine scheduled execution with dependency-driven updates. This pattern ensures regular execution on a schedule while also allowing for more frequent updates when dependencies change, with additional safety checks:

src/<project_name>/defs/assets.py
import dagster as dg

daily_success_condition = dg.AutomationCondition.newly_updated().since(
dg.AutomationCondition.on_cron("*/5 * * * *")
)

custom_condition = (
dg.AutomationCondition.on_cron("*/5 * * * *") # Runs every 5 minutes
| (
dg.AutomationCondition.any_deps_updated() # When any dependency updates
& daily_success_condition # Only if asset was updated since daily cron
& ~dg.AutomationCondition.any_deps_missing() # But not if deps are missing
& ~dg.AutomationCondition.any_deps_in_progress() # And not if deps are in progress
)
)


@dg.asset
def upstream_asset_1(): ...


@dg.asset
def upstream_asset_2(): ...


@dg.asset(
automation_condition=custom_condition, deps=["upstream_asset_1", "upstream_asset_2"]
)
def automation_condition_combination(context: dg.AssetExecutionContext): ...

The custom_condition in this example will execute the target asset in two scenarios:

  1. Scheduled execution: Runs on the specified cron schedule (every 5 minutes in this example)
  2. Dependency-driven execution: Runs when upstream dependencies are updated, but only if:
    • The asset was successfully updated since the last scheduled run
    • No upstream dependencies are missing
    • No upstream dependencies are currently in progress

This approach is particularly useful for assets that need guaranteed regular execution ,but should also respond quickly to upstream changes, while avoiding execution during problematic states.