Ask AI

Creating custom Declarative Automation conditions#

Declarative Automation includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but the core system is extremely flexible and can be tailored to your specific needs.

By the end of this guide, you'll understand how to create AutomationConditions for a variety of scenarios.


Prerequisites#

Before continuing, you should be familiar with:


Examples#

Ignore missing upstream data when using AutomationCondition.eager()#

By default, AutomationCondition.eager() will not materialize a target if it has any missing upstream data. If it is expected to have missing upstream data, remove ~AutomationCondition.any_deps_missing() from the eager policy to allow execution:

import dagster as dg

condition = (
    dg.AutomationCondition.eager()
    .without(~dg.AutomationCondition.missing())
    .with_label("eager_allow_missing")
)

Update older time partitions when using AutomationCondition.eager()#

By default, AutomationCondition.eager() will only update the latest time partition of an asset. If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:

from dagster import AutomationCondition

condition = AutomationCondition.eager().without(
    AutomationCondition.in_latest_time_window(),
)

Update an older time partition when using AutomationCondition.on_cron()#

By default, AutomationCondition.on_cron() will target the latest time partition of an asset. If you instead want to update partitions on a delay, then you can replace this condition with one that targets a partition that has a specific lag from the latest time window:

from datetime import timedelta

from dagster import AutomationCondition

five_days_ago_condition = AutomationCondition.in_latest_time_window(
    timedelta(days=5)
) & ~AutomationCondition.in_latest_time_window(timedelta(days=4))

condition = five_days_ago_condition & AutomationCondition.eager().without(
    AutomationCondition.in_latest_time_window(),
)

Ignore dependencies when using AutomationCondition.on_cron()#

By default, AutomationCondition.on_cron() will wait for all upstream dependencies to be updated before executing the asset it's attached to. In some cases, it can be useful to ignore some upstream dependencies in this calculation. This can be done by passing in an AssetSelection to be ignored:

import dagster as dg

all_deps_except_foo_updated = dg.AutomationCondition.all_deps_updated_since_cron(
    "@hourly"
).ignore(dg.AssetSelection.assets("foo"))

condition = (
    dg.AutomationCondition.on_cron("@hourly").without(
        dg.AutomationCondition.all_deps_updated_since_cron("@hourly")
    )
) & all_deps_except_foo_updated

Alternatively, you can pass in an AssetSelection to be allowed:

import dagster as dg

group_abc_updated = dg.AutomationCondition.all_deps_updated_since_cron("@hourly").allow(
    dg.AssetSelection.groups("abc")
)

condition = (
    dg.AutomationCondition.on_cron("@hourly").without(
        dg.AutomationCondition.all_deps_updated_since_cron("@hourly")
    )
) & group_abc_updated

Wait for all blocking asset checks to complete before executing#

The AutomationCondition.all_deps_blocking_checks_passed() condition becomes true after all upstream blocking checks have passed. This can be combined with built-in conditions such as AutomationCondition.on_cron() and AutomationCondition.eager() to ensure that your asset does not execute if upstream data is in a bad state:

import dagster as dg

condition = (
    dg.AutomationCondition.eager()
    & dg.AutomationCondition.all_deps_blocking_checks_passed()
)

Describing conditions with labels#

When there are a large number of sub-conditions that make up an AutomationCondition, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then be displayed in the Dagster UI.

Arbitrary string labels can be attached to any node in the AutomationCondition tree by using the with_label() method, allowing you to describe the purpose of a specific sub-condition. For example:

from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
    AutomationCondition.in_progress() | AutomationCondition.failed()
).with_label("Any parents in progress or failed")

Then, when viewing evaluation results in the UI, the label will display next to the condition:

Any parents in progress or failed condition label in the Dagster UI

Hovering over or expanding the label will display its sub-conditions:

Expanded Any parents in progress or failed condition label with a list of sub-conditions in the Dagster UI

Arbitary Python AutomationConditions
Experimental
#

Some automation use cases require custom business logic that cannot be expressed with off-the-shelf components. In these cases, you can define AutomationConditions which execute arbitrary python code, and compose them with the built-in conditions.

Setup#

By default, Dagster executes AutomationConditionSensorDefinitions in a daemon process that does not have access to your user code. In order to execute arbitrary Python code, you'll need to update this to execute on your user code server. This is the same place that your @sensor methods are evaluated.

Automation condition evaluation can be more resource-intensive than a typical sensor. A limit of 500 assets or checks per sensor is enforced.

To do this, add an automation condition sensor to your definitions with the use_user_code_server flag set to True:

import dagster as dg

defs = dg.Definitions(
  sensors=[dg.AutomationConditionSensorDefinition("automation_condition_sensor", target=dg.AssetSelection.all(), use_user_code_server=True)]
)

This will allow your sensor to target automation conditions containing custom python code.

Defining a custom condition#

You can create your own subclass of AutomationCondition, defining the evaluate() method. For example, imagine you want to avoid executing anything on a company holiday. To do this, you can first define a condition which detects if it's currently a company holiday:

import dagster as dg

class IsCompanyHoliday(dg.AutomationCondition):
  def evaluate(self, context: dg.AutomationContext) -> dg.AutomationResult:
    if is_company_holiday(context.evaluation_time):
      true_subset = context.candidate_subset
    else:
      true_subset = context.get_empty_subset()
    return dg.AutomationResult(true_subset, context=context)

In this example, we build up a subset of the evaluated asset for which this condition is True. We use EntitySubsets, rather than a pure True / False to account for partitioned assets, for which individual partitions may have different results.

In our case, the condition will be applied the same regardless of if it's partitioned or not, so we don't need to have any special logic to differntiate between these cases. If it's not a company holiday, we can return an empty subset (meaning that this condition is not true for any subset of the asset), and if it is a company holiday, we return the candidate_subset, which is the subset of the asset that we need to evaluate. This subset shrinks as we filter partitions out using the & condition, so if you have an expression A & B, and A returns the empty subset, then the candidate subset for B will be empty as well. This helps avoid expensive computation in cases where we know it won't impact the final output.

Once this condition is defined, you can use this condition as part of a broader expression, for example:

import dagster as dg

condition = AutomationCondition.eager() & ~IsCompanyHoliday()