You can set up Dagster to automatically materialize assets when criteria are met. This enables a declarative approach to asset scheduling – instead of defining imperative workflows to materialize your assets, you just describe the conditions under which they should be materialized.
At a high-level, the most common way for assets to be auto-materialized is "eagerly" -- immediately after upstream changes occur, a run will be kicked off to incorporate those changes into a given asset. However, the precise rules that govern when runs are kicked off can be customized on an asset-by-asset basis.
To enable assets to be automatically materialized, you need to first flip a toggle in the Dagster UI.
You can set up an asset to be auto-materialized by assigning it an
AutoMaterializePolicy. Each policy consists of a set of
AutoMaterializeRules, each representing individual reasons that an asset should be materialized or not at a given point in time. If there's at least one rule determining that the asset should be materialized, and no rules determining that it should be skipped, a run will be launched to materialize that asset.
It is recommended to start with the built-in
AutoMaterializePolicy.eager and further customize from there if necessary. This policy consists of all of the supported rules, other than
skip_on_not_all_parents_updated. The supported rules are currently:
|Materialize an asset partition if one of its parents has been updated more recently than it has.|
|Materialize an asset partition if it has never been materialized before.|
|Materialize an asset partition if it has not been materialized since the latest tick of a given cron schedule.|
|Materialize an asset if it is required to satisfy a |
|Skip materializing an asset partition if one of its parent asset partitions has never been materialized (for regular assets) or observed (for observable source assets).|
|Skip materializing an asset partition if any of its parents has not incorporated the latest data from its ancestors.|
|Skip materializing an asset partition if any of its parents have not been updated since the asset's last materialization.|
In this example, we use
AutoMaterializePolicy.eager to indicate that, any time that
asset1 is materialized,
asset2 should be automatically materialized right after:
from dagster import AutoMaterializePolicy, asset @asset def asset1(): ... @asset(auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1]) def asset2(): ...
If you want to apply the same
AutoMaterializePolicy to a set of assets, you can use the
auto_materialize_policy argument when loading them with functions like
from dagster import ( AutoMaterializePolicy, Definitions, asset, load_assets_from_current_module, ) @asset def asset1(): ... @asset(deps=[asset1]) def asset2(): ... defs = Definitions( assets=load_assets_from_current_module( auto_materialize_policy=AutoMaterializePolicy.eager(), ) )
Auto-materialize policies can be customized by adding or removing rules. These changes will be reflected in the UI for individual assets.
By default, the eager policy will materialize an asset whenever any of its parents have been updated. In cases where an asset has many parents, this may cause more materializations than desired, as each parent update will result in an additional downstream materialization. To avoid this, the
skip_on_not_all_parents_updated rule can be applied to a given policy to force it to wait until all of an asset's parents have been updated before materializing it.
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset wait_for_all_parents_policy = AutoMaterializePolicy.eager().with_rules( AutoMaterializeRule.skip_on_not_all_parents_updated() ) @asset(auto_materialize_policy=wait_for_all_parents_policy) def asset1(upstream1, upstream2): ...
By default, the eager policy won't materialize an asset if any of its parents are missing. In some cases, it's desirable to allow the downstream asset to be materialized, even if some of its parent assets/partitions are missing. To enable this, the
skip_on_parent_missing rule can be removed from a given policy to prevent this from blocking the materialization of an asset.
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset allow_missing_parents_policy = AutoMaterializePolicy.eager().without_rules( AutoMaterializeRule.skip_on_parent_missing(), ) @asset(auto_materialize_policy=allow_missing_parents_policy) def asset1(upstream1, upstream2): ...
By default, the eager policy will only materialize an asset if it's missing or one of its parents update. This means that an unpartitioned root asset will only get auto-materialized a single time, as it has no parents which can update. In some cases, it's desirable to recompute these assets on a regular basis. To enable this, the
materialize_on_cron rule can be added to a given policy.
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset materialize_on_cron_policy = AutoMaterializePolicy.eager().with_rules( # try to materialize this asset if it hasn't been materialized since the last cron tick AutoMaterializeRule.materialize_on_cron("0 9 * * *", timezone="US/Central"), ) @asset(auto_materialize_policy=materialize_on_cron_policy) def root_asset(): ...
AutoMaterializeRule generally applies individually to each partition of a partitioned asset. Here's a pipeline with two daily-partitioned assets that have eager auto-materialize policies. At the end of each day, a partition for that day will be added to the set of partitions for each of the assets. Dagster will notice that the new partitions exist, but have no materializations, and then auto-materialize them.
from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager(), ) def asset1(): ... @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1], ) def asset2(): ...
If the last partition of
asset1 is re-materialized, e.g. manually from the UI, then the corresponding partition of
asset2 will be auto-materialized after.
By default, a given
AutoMaterializePolicy will not allow more than one partition of an asset to be materialized per minute. Any partitions exceeding this threshold will be discarded. Manual intervention will be required to materialize the discarded partitions.
This threshold may be increased as follows:
from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager( max_materializations_per_minute=7 ), ) def asset1(): ...
For time-partitioned assets, the
N most recent partitions will be selected from the set of candidates to be materialized. For other types of partitioned assets, the selection will be random.
Observable source assets are assets that your code doesn't materialize, but that you provide a function for that can tell when they've changed. The
AutoMaterializeRule.materialize_on_parent_updated rule incorporates the observed data versions of these assets when determining if it should fire for a downstream asset.
In this example, we check every minute to see whether
source_file was modified. If it was, then the
asset1 will cause it to be materialized.
import os from dagster import AutoMaterializePolicy, DataVersion, asset, observable_source_asset @observable_source_asset(auto_observe_interval_minutes=1) def source_file(): return DataVersion(str(os.path.getmtime("source_file.csv"))) @asset( deps=[source_file], auto_materialize_policy=AutoMaterializePolicy.eager(), ) def asset1(): ...