Ask AI

Creating custom Declarative Automation conditions#

Declarative Automation is currently experimental.

Declarative Automation includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but you can also customize conditions.

By the end of this guide, you'll understand how AutomationConditions work and how to create your own custom conditions.


Prerequisites#

Before continuing, you should be familiar with:


How it works#

Each AutomationCondition consists of a set of operands and various operators. To create conditions that suit your specific needs, you can combine the operators and operands listed below. For example:

from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
    AutomationCondition.in_progress() | AutomationCondition.failed()
)

This condition translates to Any upstream dependencies (parents) part of an in-progress run or failed during the latest run.

Operands#

Operands are base conditions which can be true or false about a given asset partition.

OperandDescription
AutomationCondition.missingThe asset partition has never been materialized or observed
AutomationCondition.in_progressThe asset partition is part of an in-progress run
AutomationCondition.failedThe asset partition failed to be materialized in its latest run
AutomationCondition.newly_updatedThe asset partition was materialized since the previous evaluation
AutomationCondition.newly_requestedThe asset partition was requested on the previous evaluation
AutomationCondition.code_version_changedThe asset has a new code version since the previous evaluation
AutomationCondition.cron_tick_passedA new tick of the provided cron schedule occurred since the previous evaluation
AutomationCondition.in_latest_time_windowThe asset partition falls within the latest time window of the asset’s PartitionsDefinition, if applicable.
AutomationCondition.will_be_requestedThe asset partition will be requested in this tick

Operators#

The above conditions can be built into more complex expressions using the following operators:

OperatorDescription
~ (tilde)NOT; condition is not true; ex: ~A
| (pipe)OR; either condition must be true; ex: A | B
& (ampersand)AND; both conditions must be true; ex: A & B
A.newly_true()Condition A was false on the previous evaluation and is now true.
A.since(B)Condition A became true more recently than Condition B.
AutomationCondition.any_deps_match(A)Condition A is true for any upstream partition. Can be used with .allow() and .ignore() to target specific upstream assets. Refer to the Targeting dependencies section for an example.
AutomationCondition.all_deps_match(A)Condition A is true for at least one partition of each upstream asset. Can be used with .allow() and .ignore() to target specific upstream assets. Refer to the Targeting dependencies section for an example.
AutomationCondition.any_downstream_condition()Any AutomationCondition on a downstream asset evaluates to true

Composite conditions#

Finally, there are a set of pre-built conditions which make it easier to construct common combinations of the above conditions.

ConditionDescription
AutomationCondition.any_deps_updatedAny dependencies have been updated since the previous evaluation
AutomationCondition.any_deps_missingAny dependencies have never been materialized or observed
AutomationCondition.any_deps_in_progressAny dependencies are part of an in-progress run
AutomationCondition.all_deps_updated_since_cronAll dependencies have been updated since the latest tick of the provided cron schedule

Modifying policies#

It's common to have use cases similar to pre-built policies but with minor differences. While it is always possible to copy the base implementation and modify it as needed, it can often be simpler to use the .without() method to remove the unwanted sub-conditions or add additional conditions with the & operator.

AutomationCondition.eager(): Ignoring missing dependencies#

By default, AutomationCondition.eager() will not materialize an asset partition if it has any missing dependencies. If it is expected to have missing upstream data, remove ~AutomationCondition.any_deps_missing() from the eager policy to allow execution:

from dagster import AutomationCondition

condition = AutomationCondition.eager().without(
  ~AutomationCondition.any_deps_missing(),
)

AutomationCondition.eager(): Update older time partitions#

By default, AutomationCondition.eager() will only update the latest time partition of an asset. If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:

from dagster import AutomationCondition

condition = AutomationCondition.eager().without(
  AutomationCondition.in_latest_time_window(),
)

Targeting dependencies#

Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the AutomationCondition.any_deps_match() operator. This operator takes an arbitrary AutomationCondition, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.

This operator and AutomationCondition.all_deps_match() can be further customized to only target specific sets of upstream assets by using .allow() and .ignore().

For example, to target updates from a specific asset group, you can use any_deps_match with the newly_updated operand and tell it to target only the metrics asset group:

from dagster import AssetSelection, AutomationCondition

AutomationCondition.any_deps_match(
    AutomationCondition.newly_updated()
).allow(AssetSelection.groups("metrics"))

Or to ignore missing partitions from an upstream asset, you can use any_deps_match with the missing operand and tell it to ignore a specific asset:

AutomationCondition.any_deps_match(
    AutomationCondition.missing()
).ignore(AssetSelection.keys("taxi_trips"))

Note that these ignore() and allow() methods also work for composite conditions such as AutomationCondition.any_deps_missing() or AutomationCondition.any_deps_updated().


Describing conditions with labels#

When there are a large number of sub-conditions that make up an AutomationCondition, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then display in the Dagster UI.

Arbitrary string labels can be attached to any node in the AutomationCondition tree by using the with_label() method, allowing you to describe the purpose of a specific sub-condition. For example:

from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
    AutomationCondition.in_progress() | AutomationCondition.failed()
).with_label("Any parents in progress or failed")

Then, when viewing evaluation results in the UI, the label will display next to the condition:

Any parents in progress or failed condition label in the Dagster UI

Hovering over or expanding the label will display its sub-conditions:

Expanded Any parents in progress or failed condition label with a list of sub-conditions in the Dagster UI