Declarative Automation includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but you can also customize conditions.
By the end of this guide, you'll understand how AutomationConditions work and how to create your own custom conditions.
Each AutomationCondition consists of a set of operands and various operators. To create conditions that suit your specific needs, you can combine the operators and operands listed below. For example:
from dagster import AutomationCondition
in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress()| AutomationCondition.failed())
This condition translates to Any upstream dependencies (parents) part of an in-progress run or failed during the latest run.
True for any upstream partition. Can be used with .allow() and .ignore() to target specific upstream assets. Refer to the Targeting dependencies section for an example.
AutomationCondition.all_deps_match(A)
True for at least one partition of each upstream asset. Can be used with .allow() and .ignore() to target specific upstream assets. Refer to the Targeting dependencies section for an example.
Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the AutomationCondition.any_deps_match() operator. This operator takes an arbitrary AutomationCondition, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.
This operator and AutomationCondition.all_deps_match() can be further customized to only target specific sets of upstream assets by using .allow() and .ignore().
For example, to target updates from a specific asset group, you can use any_deps_match with the newly_updated operand and tell it to target only the metrics asset group:
from dagster import AssetSelection, AutomationCondition
AutomationCondition.any_deps_match(
AutomationCondition.newly_updated()).allow(AssetSelection.groups("metrics"))
Or to ignore missing partitions from an upstream asset, you can use any_deps_match with the missing operand and tell it to ignore a specific asset:
When there are a large number of sub-conditions that make up an AutomationCondition, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then display in the Dagster UI.
Arbitrary string labels can be attached to any node in the AutomationCondition tree by using the with_label() method, allowing you to describe the purpose of a specific sub-condition. For example:
from dagster import AutomationCondition
in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress()| AutomationCondition.failed()).with_label("Any parents in progress or failed")
Then, when viewing evaluation results in the UI, the label will display next to the condition:
Hovering over or expanding the label will display its sub-conditions:
In some cases, you may want to use statuses and events in your automation conditions:
Statuses are persistent states that are and will be true for some period of time. For example, the AutomationCondition.missing() condition will be true only if an asset partition has never been materialized or observed.
Events are transient and reflect something that may only be true for an instant. For example, the AutomationCondition.newly_updated() condition will be true only if an asset partition was materialized since the previous evaluation.
Using the <A>.since(<B>) operator, you can create conditions that detect if one event has happened more recently than another. Think of this as converting two events to a status - in this case, A has occurred more recently than B - as this will stay true for some period of time. This operator becomes true whenever <A> is true, and will remain true until <B> is also true.
Conversely, it can also be useful to convert statuses to events. For example, the default eager() condition ensures that Dagster only tries to materialize a missing asset partition once using the following sub-condition:
from dagster import AutomationCondition
AutomationCondition.missing().newly_true().since(
AutomationCondition.newly_requested()| AutomationCondition.newly_updated())
By using the <A>.newly_true() operator, you can turn the status of "being missing" into a single event, specifically the point in time where an asset partition entered the missing state. From there, you can ensure that an asset is materialized only once in response to detecting a missing partition.
Dagster can group the execution of multiple assets into a single, logical run. For example, imagine you have a series of dependent assets, each with an AutomationCondition.eager() condition. When you update the first asset in the chain, the desired behavior is typically to have all downstream assets grouped into a single run, rather than executing each asset in order in individual run.
To create this scenario, you can use AutomationCondition.will_be_requested(). Because each AutomationCondition is evaluated in order, you can query if an upstream asset will be requested on the current tick. For example:
from dagster import AutomationCondition
any_parent_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing()&~AutomationCondition.will_be_requested())