Ask AI

Auto-Materializing Assets
Experimental
#

You can set up Dagster to automatically materialize assets when criteria are met. This enables a declarative approach to asset scheduling – instead of defining imperative workflows to materialize your assets, you just describe the conditions under which they should be materialized.

At a high-level, the most common way for assets to be auto-materialized is "eagerly" -- immediately after upstream changes occur, a run will be kicked off to incorporate those changes into a given asset. However, the precise rules that govern when runs are kicked off can be customized on an asset-by-asset basis.

Turning on auto-materializing#

To enable assets to be automatically materialized, you need to first flip a toggle in the Dagster UI.

  • If you're using an open source Dagster deployment, you can get to this toggle by clicking "Deployment" in the top navigation pane and then clicking on the "Daemons" tab.
  • If you're using Dagster Cloud, you can get to this toggle by clicking "Deployment" in the top navigation pane, then clicking on the "Agents" tab, then looking under "Cloud service statuses".

Auto-materialize policies#

You can set up an asset to be auto-materialized by assigning it an AutoMaterializePolicy. Each policy consists of a set of AutoMaterializeRules, each representing individual reasons that an asset should be materialized or not at a given point in time. If there's at least one rule determining that the asset should be materialized, and no rules determining that it should be skipped, a run will be launched to materialize that asset.

It is recommended to start with the built-in AutoMaterializePolicy.eager and further customize from there if necessary. This policy consists of all of the supported rules, other than materialize_on_cron and skip_on_not_all_parents_updated. The supported rules are currently:

NameDescription
AutoMaterializeRule.materialize_on_parent_updatedMaterialize an asset partition if one of its parents has been updated more recently than it has.
AutoMaterializeRule.materialize_on_missingMaterialize an asset partition if it has never been materialized before.
AutoMaterializeRule.materialize_on_cronMaterialize an asset partition if it has not been materialized since the latest tick of a given cron schedule.
AutoMaterializeRule.materialize_on_required_for_freshnessMaterialize an asset if it is required to satisfy a FreshnessPolicy of this asset or one of its downstream assets.
AutoMaterializeRule.skip_on_parent_missingSkip materializing an asset partition if one of its parent asset partitions has never been materialized (for regular assets) or observed (for observable source assets).
AutoMaterializeRule.skip_on_parent_outdatedSkip materializing an asset partition if any of its parents has not incorporated the latest data from its ancestors.
AutoMaterializeRule.skip_on_not_all_parents_updatedSkip materializing an asset partition if any of its parents have not been updated since the asset's last materialization.

In this example, we use AutoMaterializePolicy.eager to indicate that, any time that asset1 is materialized, asset2 should be automatically materialized right after:

from dagster import AutoMaterializePolicy, asset


@asset
def asset1(): ...


@asset(auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1])
def asset2(): ...

This example assumes that asset1 will be materialized in some other way - e.g. manually, via a sensor, or via a schedule.

Adding an auto-materialize policy to multiple assets at once#

If you want to apply the same AutoMaterializePolicy to a set of assets, you can use the auto_materialize_policy argument when loading them with functions like load_assets_from_current_module and load_assets_from_package_module.

from dagster import (
    AutoMaterializePolicy,
    Definitions,
    asset,
    load_assets_from_current_module,
)


@asset
def asset1(): ...


@asset(deps=[asset1])
def asset2(): ...


defs = Definitions(
    assets=load_assets_from_current_module(
        auto_materialize_policy=AutoMaterializePolicy.eager(),
    )
)

Run tags#

Runs triggered by auto-materialize policies are tagged with dagster/auto_materialize: true. Additional tags can be configured in dagster.yaml (OSS) or deployment settings (Cloud).

Customizing auto-materialize policies#

Auto-materialize policies can be customized by adding or removing rules. These changes will be reflected in the UI for individual assets.

Auto-materialize only once all parents have been updated#

By default, the eager policy will materialize an asset whenever any of its parents have been updated. In cases where an asset has many parents, this may cause more materializations than desired, as each parent update will result in an additional downstream materialization. To avoid this, the skip_on_not_all_parents_updated rule can be applied to a given policy to force it to wait until all of an asset's parents have been updated before materializing it.

from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset

wait_for_all_parents_policy = AutoMaterializePolicy.eager().with_rules(
    AutoMaterializeRule.skip_on_not_all_parents_updated()
)


@asset(auto_materialize_policy=wait_for_all_parents_policy)
def asset1(upstream1, upstream2): ...

Auto-materialize even if some parents are missing#

By default, the eager policy won't materialize an asset if any of its parents are missing. In some cases, it's desirable to allow the downstream asset to be materialized, even if some of its parent assets/partitions are missing. To enable this, the skip_on_parent_missing rule can be removed from a given policy to prevent this from blocking the materialization of an asset.

from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset

allow_missing_parents_policy = AutoMaterializePolicy.eager().without_rules(
    AutoMaterializeRule.skip_on_parent_missing(),
)


@asset(auto_materialize_policy=allow_missing_parents_policy)
def asset1(upstream1, upstream2): ...

Auto-materialize root assets on a regular cadence#

By default, the eager policy will only materialize an asset if it's missing or one of its parents update. This means that an unpartitioned root asset will only get auto-materialized a single time, as it has no parents which can update. In some cases, it's desirable to recompute these assets on a regular basis. To enable this, the materialize_on_cron rule can be added to a given policy.

from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset

materialize_on_cron_policy = AutoMaterializePolicy.eager().with_rules(
    # try to materialize this asset if it hasn't been materialized since the last cron tick
    AutoMaterializeRule.materialize_on_cron("0 9 * * *", timezone="US/Central"),
)


@asset(auto_materialize_policy=materialize_on_cron_policy)
def root_asset(): ...

Auto-materialization and partitions#

Each AutoMaterializeRule generally applies individually to each partition of a partitioned asset. Here's a pipeline with two daily-partitioned assets that have eager auto-materialize policies. At the end of each day, a partition for that day will be added to the set of partitions for each of the assets. Dagster will notice that the new partitions exist, but have no materializations, and then auto-materialize them.

from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def asset1(): ...


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    deps=[asset1],
)
def asset2(): ...

If the last partition of asset1 is re-materialized, e.g. manually from the UI, then the corresponding partition of asset2 will be auto-materialized after.

By default, a given AutoMaterializePolicy will not allow more than one partition of an asset to be materialized per minute. Any partitions exceeding this threshold will be discarded. Manual intervention will be required to materialize the discarded partitions.

This threshold may be increased as follows:

from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"),
    auto_materialize_policy=AutoMaterializePolicy.eager(
        max_materializations_per_minute=7
    ),
)
def asset1(): ...

For time-partitioned assets, the N most recent partitions will be selected from the set of candidates to be materialized. For other types of partitioned assets, the selection will be random.

Auto-materialize policies and data versions#

Observable source assets are assets that your code doesn't materialize, but that you provide a function for that can tell when they've changed. The AutoMaterializeRule.materialize_on_parent_updated rule incorporates the observed data versions of these assets when determining if it should fire for a downstream asset.

In this example, we check every minute to see whether source_file was modified. If it was, then the AutoMaterializePolicy on asset1 will cause it to be materialized.

import os

from dagster import AutoMaterializePolicy, DataVersion, asset, observable_source_asset


@observable_source_asset(auto_observe_interval_minutes=1)
def source_file():
    return DataVersion(str(os.path.getmtime("source_file.csv")))


@asset(
    deps=[source_file],
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def asset1(): ...