Declarative Automation includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but the core system is extremely flexible and can be tailored to your specific needs.
By the end of this guide, you'll understand how AutomationConditions work and how to create your own custom conditions.
Each AutomationCondition consists of a set of operands and various operators. To create conditions that suit your needs, you can combine the operators and operands listed below. For example:
from dagster import AutomationCondition
in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress()| AutomationCondition.failed())
This condition translates to Any upstream dependencies (parents) are part of an in-progress run or failed during the latest run.
Operands are base conditions which can be true or false about a given target. For partitioned assets, the target will be a given partition of the asset.
Operand
Description
AutomationCondition.missing
Target has not been executed
AutomationCondition.in_progress
Target is part of an in-progress run
AutomationCondition.execution_failed
Target failed to be executed in its latest run
AutomationCondition.newly_updated
Target was updated since the previous evaluation
AutomationCondition.newly_requested
Target was requested on the previous evaluation
AutomationCondition.code_version_changed
Target has a new code version since the previous evaluation
AutomationCondition.cron_tick_passed
A new tick of the provided cron schedule occurred since the previous evaluation
AutomationCondition.in_latest_time_window
Target falls within the latest time window of the asset’s PartitionsDefinition, if applicable.
The above conditions can be built into more complex expressions using the following operators:
Operator
Description
~ (tilde)
NOT; condition is not true; ex: ~A
| (pipe)
OR; either condition is true; ex: A | B
& (ampersand)
AND; both conditions are true; ex: A & B
A.newly_true()
Condition A was false on the previous evaluation and is now true.
A.since(B)
Condition A became true more recently than Condition B.
AutomationCondition.any_deps_match(A)
Condition A is true for any upstream partition. Can be used with .allow() and .ignore() to target specific upstream assets. Refer to the Targeting dependencies section for an example.
AutomationCondition.all_deps_match(A)
Condition A is true for at least one partition of each upstream asset. Can be used with .allow() and .ignore() to target specific upstream assets. Refer to the Targeting dependencies section for an example.
It's common to have use cases similar to pre-built policies but with minor differences. While it is always possible to copy the base implementation and modify it as needed, it can often be simpler to use the .without() method to remove the unwanted sub-conditions or add additional conditions with the & operator.
By default, AutomationCondition.eager() will not materialize a target if it has any missing upstream data. If it is expected to have missing upstream data, remove ~AutomationCondition.any_deps_missing() from the eager policy to allow execution:
from dagster import AutomationCondition
condition = AutomationCondition.eager().without(~AutomationCondition.any_deps_missing(),)
AutomationCondition.eager(): Update older time partitions#
By default, AutomationCondition.eager() will only update the latest time partition of an asset. If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:
from dagster import AutomationCondition
condition = AutomationCondition.eager().without(
AutomationCondition.in_latest_time_window(),)
Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the AutomationCondition.any_deps_match() operator. This operator takes an arbitrary AutomationCondition, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.
This operator and AutomationCondition.all_deps_match() can be further customized to only target specific sets of upstream assets by using .allow() and .ignore().
For example, to target updates from a specific asset group, you can use any_deps_match with the newly_updated operand and tell it to target only the metrics asset group:
from dagster import AssetSelection, AutomationCondition
AutomationCondition.any_deps_match(
AutomationCondition.newly_updated()).allow(AssetSelection.groups("metrics"))
Or to ignore missing partitions from an upstream asset, you can use any_deps_match with the missing operand and tell it to ignore a specific asset:
Note that these ignore() and allow() methods also work for composite conditions such as AutomationCondition.any_deps_missing() or AutomationCondition.any_deps_updated().
When there are a large number of sub-conditions that make up an AutomationCondition, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then be displayed in the Dagster UI.
Arbitrary string labels can be attached to any node in the AutomationCondition tree by using the with_label() method, allowing you to describe the purpose of a specific sub-condition. For example:
from dagster import AutomationCondition
in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress()| AutomationCondition.failed()).with_label("Any parents in progress or failed")
Then, when viewing evaluation results in the UI, the label will display next to the condition:
Hovering over or expanding the label will display its sub-conditions:
Some automation use cases require custom business logic that cannot be expressed with off-the-shelf components. In these cases, you can define AutomationConditions which execute arbitrary python code, and compose them with the built-in conditions.
By default, Dagster executes AutomationConditionSensorDefinitions in a daemon process that does not have access to your user code. In order to execute arbitrary Python code, you'll need to update this to execute on your user code server. This is the same place that your @sensor methods are evaluated.
Automation condition evaluation can be more resource-intensive than a typical sensor. A limit of 500 assets or checks per sensor is enforced.
To do this, add an automation condition sensor to your definitions with the use_user_code_server flag set to True:
import dagster as dg
defs = dg.Definitions(
sensors=[dg.AutomationConditionSensorDefinition("automation_condition_sensor", target=dg.AssetSelection.all(), use_user_code_server=True)])
This will allow your sensor to target automation conditions containing custom python code.
You can create your own subclass of AutomationCondition, defining the evaluate() method. For example, imagine you want to avoid executing anything on a company holiday. To do this, you can first define a condition which detects if it's currently a company holiday:
In this example, we build up a subset of the evaluated asset for which this condition is True. We use EntitySubsets, rather than a pure True / False to account for partitioned assets, for which individual partitions may have different results.
In our case, the condition will be applied the same regardless of if it's partitioned or not, so we don't need to have any special logic to differntiate between these cases. If it's not a company holiday, we can return an empty subset (meaning that this condition is not true for any subset of the asset), and if it is a company holiday, we return the candidate_subset, which is the subset of the asset that we need to evaluate. This subset shrinks as we filter partitions out using the & condition, so if you have an expression A & B, and A returns the empty subset, then the candidate subset for B will be empty as well. This helps avoid expensive computation in cases where we know it won't impact the final output.
Once this condition is defined, you can use this condition as part of a broader expression, for example:
import dagster as dg
condition = AutomationCondition.eager()&~IsCompanyHoliday()