You can set up Dagster to automatically materialize assets when criteria are met. This enables a declarative approach to asset scheduling – instead of defining imperative workflows to materialize your assets, you just describe the conditions under which they should be materialized.
At a high-level, the most common way for assets to be auto-materialized is "eagerly" -- immediately after upstream changes occur, a run will be kicked off to incorporate those changes into a given asset. However, the precise rules that govern when runs are kicked off can be customized on an asset-by-asset basis.
To enable assets to be automatically materialized, you need to first flip a toggle in the Dagster UI.
You can set up an asset to be auto-materialized by assigning it an
AutoMaterializePolicy. Each policy consists of a set of
AutoMaterializeRules, each representing individual reasons that an asset should be materialized or not at a given point in time. If there's at least one rule determining that the asset should be materialized, and no rules determining that it should be skipped, a run will be launched to materialize that asset.
It is recommended to start with the built-in
AutoMaterializePolicy.eager and further customize from there if necessary. This policy consists of all of the supported rules, other than
skip_on_not_all_parents_updated. The supported rules are currently:
|Materialize an asset partition if one of its parents has been updated more recently than it has.|
|Materialize an asset partition if it has never been materialized before.|
|Materialize an asset partition if it is required to satisfy a |
|Skip materializing an asset partition if one of its parent asset partitions has never been materialized (for regular assets) or observed (for observable source assets).|
|Skip materializing an asset partition if any of its parents has not incorporated the latest data from its ancestors.|
|Skip materializing an asset partition if any of its parents have not been updated since the asset's last materialization.|
In this example, we use
AutoMaterializePolicy.eager to indicate that, any time that
asset1 is materialized,
asset2 should be automatically materialized right after:
from dagster import AutoMaterializePolicy, asset @asset def asset1(): ... @asset(auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1]) def asset2(): ...
If you want to apply the same
AutoMaterializePolicy to a set of assets, you can use the
auto_materialize_policy argument when loading them with functions like
from dagster import ( AutoMaterializePolicy, Definitions, asset, load_assets_from_current_module, ) @asset def asset1(): ... @asset(deps=[asset1]) def asset2(): ... defs = Definitions( assets=load_assets_from_current_module( auto_materialize_policy=AutoMaterializePolicy.eager(), ) )
Auto-materialize policies can be customized by adding or removing rules. These changes will be reflected in the UI for individual assets.
By default, the eager policy will materialize an asset whenever any of its parents have been updated. In cases where an asset has many parents, this may cause more materializations than desired, as each parent update will result in an additional downstream materialization. To avoid this, the
skip_on_not_all_parents_updated rule can be applied to a given policy to force it to wait until all of an asset's parents have been updated before materializing it.
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset wait_for_all_parents_policy = AutoMaterializePolicy.eager().with_rules( AutoMaterializeRule.skip_on_not_all_parents_updated() ) @asset(auto_materialize_policy=wait_for_all_parents_policy) def asset1(upstream1, upstream2): ...
By default, the eager policy won't materialize an asset if any of its parents are missing. In some cases, it's desirable to allow the downstream asset to be materialized, even if some of its parent assets/partitions are missing. To enable this, the
skip_on_parent_missing rule can be removed from a given policy to prevent this from blocking the materialization of an asset.
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset allow_missing_parents_policy = AutoMaterializePolicy.eager().without_rules( AutoMaterializeRule.skip_on_parent_missing(), ) @asset(auto_materialize_policy=allow_missing_parents_policy) def asset1(upstream1, upstream2): ...
AutoMaterializeRule generally applies individually to each partition of a partitioned asset. Here's a pipeline with two daily-partitioned assets that have eager auto-materialize policies. At the end of each day, a partition for that day will be added to the set of partitions for each of the assets. Dagster will notice that the new partitions exist, but have no materializations, and then auto-materialize them.
from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager(), ) def asset1(): ... @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1], ) def asset2(): ...
If the last partition of
asset1 is re-materialized, e.g. manually from the UI, then the corresponding partition of
asset2 will be auto-materialized after.
By default, a given
AutoMaterializePolicy will not allow more than one partition of an asset to be materialized per minute. Any partitions exceeding this threshold will be discarded. Manual intervention will be required to materialize the discarded partitions.
This threshold may be increased as follows:
from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset @asset( partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), auto_materialize_policy=AutoMaterializePolicy.eager( max_materializations_per_minute=7 ), ) def asset1(): ...
For time-partitioned assets, the
N most recent partitions will be selected from the set of candidates to be materialized. For other types of partitioned assets, the selection will be random.
Observable source assets are assets that your code doesn't materialize, but that you provide a function for that can tell when they've changed. The
AutoMaterializeRule.materialize_on_parent_updated rule incorporates the observed data versions of these assets when determining if it should fire for a downstream asset.
In this example, we check every minute to see whether
source_file was modified. If it was, then the
asset1 will cause it to be materialized.
import os from dagster import AutoMaterializePolicy, DataVersion, asset, observable_source_asset @observable_source_asset(auto_observe_interval_minutes=1) def source_file(): return DataVersion(str(os.path.getmtime("source_file.csv"))) @asset( deps=[source_file], auto_materialize_policy=AutoMaterializePolicy.eager(), ) def asset1(): ...
Lazy auto-materialize policies do not currently have any effect on partitioned assets.
Freshness policies express how fresh an asset needs to be relative to data at the root of the graph. This means that, for an asset to meet its freshness policy, both it and upstream assets need to be materialized in time.
Instead of auto-materializing downstream assets immediately after new upstream data arrives, you can use
AutoMaterializePolicy.lazy to materialize assets only when needed to meet an asset's
FreshnessPolicy. This allows you to avoid unnecessary materializations.
In this example, even if
asset1 is materialized every hour,
asset2 will only be materialized roughly once per day:
from dagster import AutoMaterializePolicy, FreshnessPolicy, asset @asset def asset1(): ... @asset( auto_materialize_policy=AutoMaterializePolicy.lazy(), freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60), deps=[asset1], ) def asset2(): ...
Setting a lazy auto-materialize policy on an asset allows it to be auto-materialized to help downstream assets meet their freshness policies. In this example, both
asset3 will be auto-materialized up to once per day to help
asset3 meet its freshness policy. Conversely, if
asset2 did not have an auto-materialize policy, then
asset3 would never become fresh unless
asset2 were materialized in some other way.
from dagster import AutoMaterializePolicy, FreshnessPolicy, asset @asset def asset1(): ... @asset(auto_materialize_policy=AutoMaterializePolicy.lazy(), deps=[asset1]) def asset2(): ... @asset( auto_materialize_policy=AutoMaterializePolicy.lazy(), freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60), deps=[asset2], ) def asset3(): ...
If multiple assets with freshness policies depend on the same upstream asset, Dagster will try to materialize the upstream asset at times that allow it to minimize the number of runs of the upstream asset while meeting the downstream freshness policies.