Software-Defined Assets

An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. A software-defined asset is a Dagster object that couples an asset to the function and upstream assets that are used to produce its contents.

@dagster.asset(compute_fn=None, *, name=None, key_prefix=None, ins=None, deps=None, metadata=None, description=None, config_schema=None, required_resource_keys=None, resource_defs=None, io_manager_def=None, io_manager_key=None, compute_kind=None, dagster_type=None, partitions_def=None, op_tags=None, group_name=None, output_required=True, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None, retry_policy=None, code_version=None, key=None, non_argument_deps=None, check_specs=None)[source]

Create a definition for how to compute an asset.

A software-defined asset is the combination of:
  1. An asset key, e.g. the name of a table.

  2. A function, which can be run to compute the contents of the asset.

  3. A set of upstream assets that are provided as inputs to the function when computing the asset.

Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows about the upstream assets it depends on. The upstream assets are inferred from the arguments to the decorated function. The name of the argument designates the name of the upstream asset.

An asset has an op inside it to represent the function that computes it. The name of the op will be the segments of the asset key, separated by double-underscores.

Parameters:
  • name (Optional[str]) – The name of the asset. If not provided, defaults to the name of the decorated function. The asset’s name must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, the asset’s key is the concatenation of the key_prefix and the asset’s name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • deps (Optional[Sequence[Union[AssetDep, AssetsDefinition, SourceAsset, AssetKey, str]]]) – The assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. If the AssetsDefinition for a multi_asset is provided, dependencies on all assets created by the multi_asset will be created.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the asset’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata entries for the asset.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the op.

  • io_manager_key (Optional[str]) – The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: “io_manager”). Only one of io_manager_key and io_manager_def can be provided.

  • io_manager_def (Optional[object]) – experimental (This parameter may break in future versions, even between dot releases.) (Experimental) The IOManager used for storing the output of the op as an asset, and for loading it in downstream ops. Only one of io_manager_def and io_manager_key can be provided.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in the Dagster UI as a badge on the asset.

  • dagster_type (Optional[DagsterType]) – Allows specifying type validation functions that will be executed on the output of the decorated function after it runs.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

  • resource_defs (Optional[Mapping[str, object]]) – experimental (This parameter may break in future versions, even between dot releases.) (Experimental) A mapping of resource keys to resources. These resources will be initialized during execution, and can be accessed from the context within the body of the function.

  • output_required (bool) – Whether the decorated function will always materialize an asset. Defaults to True. If False, the function can return None, which will not be materialized to storage and will halt execution of downstream assets.

  • freshness_policy (FreshnessPolicy) – A constraint telling Dagster how often this asset is intended to be updated with respect to its root data.

  • auto_materialize_policy (AutoMaterializePolicy) – experimental (This parameter may break in future versions, even between dot releases.) (Experimental) Configure Dagster to automatically materialize this asset according to its FreshnessPolicy and when upstream dependencies change.

  • backfill_policy (BackfillPolicy) – experimental (This parameter may break in future versions, even between dot releases.) (Experimental) Configure Dagster to backfill this asset according to its BackfillPolicy.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that computes the asset.

  • code_version (Optional[str]) – (Experimental) Version of the code that generates this asset. In general, versions should be set only for code that deterministically produces the same output when given the same inputs.

  • check_specs (Optional[Sequence[AssetCheckSpec]]) – (Experimental) Specs for asset checks that execute in the decorated function after materializing the asset.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – deprecated (This parameter will be removed in version 2.0.0. use deps instead.) Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the asset.

  • key (Optional[CoeercibleToAssetKey]) – The key for this asset. If provided, cannot specify key_prefix or name.

Examples

@asset
def my_asset(my_upstream_asset: int) -> int:
    return my_upstream_asset + 1
class dagster.MaterializeResult(*, asset_key=None, metadata=None, check_results=None, data_version=None)[source]

An object representing a successful materialization of an asset. These can be returned from @asset and @multi_asset decorated functions to pass metadata or specify specific assets were materialized.

asset_key

Optional in @asset, required in @multi_asset to discern which asset this refers to.

Type:

Optional[AssetKey]

metadata

Metadata to record with the corresponding AssetMaterialization event.

Type:

Optional[MetadataUserInput]

check_results

Check results to record with the corresponding AssetMaterialization event.

Type:

Optional[Sequence[AssetCheckResult]]

data_version

The data version of the asset that was observed.

Type:

Optional[DataVersion]

class dagster.AssetSpec(key, *, deps=None, description=None, metadata=None, skippable=False, group_name=None, code_version=None, freshness_policy=None, auto_materialize_policy=None)[source]

Specifies the core attributes of an asset. This object is attached to the decorated function that defines how it materialized.

key

The unique identifier for this asset.

Type:

AssetKey

deps

The asset keys for the upstream assets that materializing this asset depends on.

Type:

Optional[AbstractSet[AssetKey]]

description

Human-readable description of this asset.

Type:

Optional[str]

metadata

A dict of static metadata for this asset. For example, users can provide information about the database table this asset corresponds to.

Type:

Optional[Dict[str, Any]]

skippable

Whether this asset can be omitted during materialization, causing downstream dependencies to skip.

Type:

bool

group_name

A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

Type:

Optional[str]

code_version

The version of the code for this specific asset, overriding the code version of the materialization function

Type:

Optional[str]

freshness_policy

A policy which indicates how up to date this asset is intended to be.

Type:

Optional[FreshnessPolicy]

auto_materialize_policy

AutoMaterializePolicy to apply to the specified asset.

Type:

Optional[AutoMaterializePolicy]

backfill_policy

BackfillPolicy to apply to the specified asset.

Type:

Optional[BackfillPolicy]

class dagster.AssetDep(asset, *, partition_mapping=None)[source]

Specifies a dependency on an upstream asset.

asset

The upstream asset to depend on.

Type:

Union[AssetKey, str, AssetSpec, AssetsDefinition, SourceAsset]

partition_mapping

Defines what partitions to depend on in the upstream asset. If not provided and the upstream asset is partitioned, defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

Type:

Optional[PartitionMapping]

Examples

upstream_asset = AssetSpec("upstream_asset")
downstream_asset = AssetSpec(
    "downstream_asset",
    deps=[
        AssetDep(
            upstream_asset,
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
        )
    ]
)
class dagster.AssetIn(key=None, metadata=None, key_prefix=None, input_manager_key=None, partition_mapping=None, dagster_type=<class 'dagster._core.definitions.utils.NoValueSentinel'>)[source]

Defines an asset dependency.

key_prefix

If provided, the asset’s key is the concatenation of the key_prefix and the input name. Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str]]]

key

The asset’s key. Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str], AssetKey]]

metadata

A dict of the metadata for the input. For example, if you only need a subset of columns from an upstream table, you could include that in metadata and the IO manager that loads the upstream table could use the metadata to determine which columns to load.

Type:

Optional[Dict[str, Any]]

partition_mapping

Defines what partitions to depend on in the upstream asset. If not provided, defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

Type:

Optional[PartitionMapping]

dagster_type

Allows specifying type validation functions that will be executed on the input of the decorated function before it runs.

Type:

DagsterType

class dagster.SourceAsset(key, metadata=None, io_manager_key=None, io_manager_def=None, description=None, partitions_def=None, group_name=None, resource_defs=None, observe_fn=None, *, auto_observe_interval_minutes=None, _required_resource_keys=None)[source]

A SourceAsset represents an asset that will be loaded by (but not updated by) Dagster.

key

The key of the asset.

Type:

Union[AssetKey, Sequence[str], str]

metadata

Metadata associated with the asset.

Type:

Mapping[str, MetadataValue]

io_manager_key

The key for the IOManager that will be used to load the contents of the asset when it’s used as an input to other assets inside a job.

Type:

Optional[str]

io_manager_def

(Experimental) The definition of the IOManager that will be used to load the contents of the asset when it’s used as an input to other assets inside a job.

Type:

Optional[IOManagerDefinition]

resource_defs

(Experimental) resource definitions that may be required by the dagster.IOManagerDefinition provided in the io_manager_def argument.

Type:

Optional[Mapping[str, ResourceDefinition]]

description

The description of the asset.

Type:

Optional[str]

partitions_def

Defines the set of partition keys that compose the asset.

Type:

Optional[PartitionsDefinition]

observe_fn
Type:

Optional[SourceAssetObserveFunction]

property is_observable

Whether the asset is observable.

Type:

bool

property op

The OpDefinition associated with the observation function of an observable source asset.

Throws an error if the asset is not observable.

Type:

OpDefinition

@dagster.observable_source_asset(observe_fn=None, *, key=None, name=None, key_prefix=None, metadata=None, io_manager_key=None, io_manager_def=None, description=None, group_name=None, required_resource_keys=None, resource_defs=None, partitions_def=None, auto_observe_interval_minutes=None)[source]

experimental This API may break in future versions, even between dot releases.

Create a SourceAsset with an associated observation function.

The observation function of a source asset is wrapped inside of an op and can be executed as part of a job. Each execution generates an AssetObservation event associated with the source asset. The source asset observation function should return a DataVersion, a ~dagster.DataVersionsByPartition, or an ObserveResult.

Parameters:
  • name (Optional[str]) – The name of the source asset. If not provided, defaults to the name of the decorated function. The asset’s name must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, the source asset’s key is the concatenation of the key_prefix and the asset’s name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • metadata (Mapping[str, RawMetadataValue]) – Metadata associated with the asset.

  • io_manager_key (Optional[str]) – The key for the IOManager that will be used to load the contents of the source asset when it’s used as an input to other assets inside a job.

  • io_manager_def (Optional[IOManagerDefinition]) – (Experimental) The definition of the IOManager that will be used to load the contents of the source asset when it’s used as an input to other assets inside a job.

  • description (Optional[str]) – The description of the asset.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

  • required_resource_keys (Optional[Set[str]]) – Set of resource keys required by the observe op.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – (Experimental) resource definitions that may be required by the dagster.IOManagerDefinition provided in the io_manager_def argument.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • auto_observe_interval_minutes (Optional[float]) – While the asset daemon is turned on, a run of the observation function for this asset will be launched at this interval.

  • observe_fn (Optional[SourceAssetObserveFunction]) –

class dagster.ObserveResult(*, asset_key=None, metadata=None, check_results=None, data_version=None)[source]

experimental This API may break in future versions, even between dot releases.

An object representing a successful observation of an asset. These can be returned from an @observable_source_asset decorated function to pass metadata.

asset_key

The asset key. Optional to include.

Type:

Optional[AssetKey]

metadata

Metadata to record with the corresponding AssetObservation event.

Type:

Optional[MetadataUserInput]

check_results

Check results to record with the corresponding AssetObservation event.

Type:

Optional[Sequence[AssetCheckResult]]

data_version

The data version of the asset that was observed.

Type:

Optional[DataVersion]

dagster.define_asset_job(name, selection=None, config=None, description=None, tags=None, metadata=None, partitions_def=None, executor_def=None, hooks=None, op_retry_policy=None)[source]

Creates a definition of a job which will either materialize a selection of assets or observe a selection of source assets. This will only be resolved to a JobDefinition once placed in a code location.

Parameters:
  • name (str) – The name for the job.

  • selection (Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]) –

    The assets that will be materialized or observed when the job is run.

    The selected assets must all be included in the assets that are passed to the assets argument of the Definitions object that this job is included on.

    The string “my_asset*” selects my_asset and all downstream assets within the code location. A list of strings represents the union of all assets selected by strings within the list.

    The selection will be resolved to a set of assets when the location is loaded. If the selection resolves to all source assets, the created job will perform source asset observations. If the selection resolves to all regular assets, the created job will materialize assets. If the selection resolves to a mixed set of source assets and regular assets, an error will be thrown.

  • config

    Describes how the Job is parameterized at runtime.

    If no value is provided, then the schema for the job’s run config is a standard format based on its ops and resources.

    If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagster UI, so be careful with secrets.

    If a ConfigMapping object is provided, then the schema for the job’s run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job.

  • tags (Optional[Mapping[str, Any]]) – Arbitrary information that will be attached to the execution of the Job. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • metadata (Optional[Mapping[str, RawMetadataValue]]) – Arbitrary metadata about the job. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.

  • description (Optional[str]) – A description for the Job.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partitions for this job. All AssetDefinitions selected for this job must have a matching PartitionsDefinition. If no PartitionsDefinition is provided, the PartitionsDefinition will be inferred from the selected AssetDefinitions.

  • executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multi_or_in_process_executor, which can be switched between multi-process and in-process modes of execution. The default mode of execution is multi-process.

  • op_retry_policy (Optional[RetryPolicy]) – The default retry policy for all ops that compute assets in this job. Only used if retry policy is not defined on the asset definition.

Returns:

The job, which can be placed inside a code location.

Return type:

UnresolvedAssetJobDefinition

Examples

# A job that targets all assets in the code location:
@asset
def asset1():
    ...

defs = Definitions(
    assets=[asset1],
    jobs=[define_asset_job("all_assets")],
)

# A job that targets a single asset
@asset
def asset1():
    ...

defs = Definitions(
    assets=[asset1],
    jobs=[define_asset_job("all_assets", selection=[asset1])],
)

# A job that targets all the assets in a group:
defs = Definitions(
    assets=assets,
    jobs=[define_asset_job("marketing_job", selection=AssetSelection.groups("marketing"))],
)

@observable_source_asset
def source_asset():
    ...

# A job that observes a source asset:
defs = Definitions(
    assets=assets,
    jobs=[define_asset_job("observation_job", selection=[source_asset])],
)

# Resources are supplied to the assets, not the job:
@asset(required_resource_keys={"slack_client"})
def asset1():
    ...

defs = Definitions(
    assets=[asset1],
    jobs=[define_asset_job("all_assets")],
    resources={"slack_client": prod_slack_client},
)
class dagster.AssetSelection[source]

An AssetSelection defines a query over a set of assets and asset checks, normally all that are defined in a code location.

You can use the “|”, “&”, and “-” operators to create unions, intersections, and differences of selections, respectively.

AssetSelections are typically used with define_asset_job().

By default, selecting assets will also select all of the asset checks that target those assets.

Examples

# Select all assets in group "marketing":
AssetSelection.groups("marketing")

# Select all assets in group "marketing", as well as the asset with key "promotion":
AssetSelection.groups("marketing") | AssetSelection.keys("promotion")

# Select all assets in group "marketing" that are downstream of asset "leads":
AssetSelection.groups("marketing") & AssetSelection.keys("leads").downstream()

# Select a list of assets:
AssetSelection.assets(*my_assets_list)

# Select all assets except for those in group "marketing"
AssetSelection.all() - AssetSelection.groups("marketing")

# Select all assets which are materialized by the same op as "projections":
AssetSelection.keys("projections").required_multi_asset_neighbors()

# Select all assets in group "marketing" and exclude their asset checks:
AssetSelection.groups("marketing") - AssetSelection.all_asset_checks()

# Select all asset checks that target a list of assets:
AssetSelection.checks_for_assets(*my_assets_list)

# Select a specific asset check:
AssetSelection.checks(my_asset_check)
static all(include_sources=False)[source]

Returns a selection that includes all assets and asset checks.

Parameters:

include_sources (bool) – experimental (This parameter may break in future versions, even between dot releases.) If True, then include all source assets.

static all_asset_checks()[source]

Returns a selection that includes all asset checks.

static assets(*assets_defs)[source]

Returns a selection that includes all of the provided assets and asset checks that target them.

static checks(*asset_checks)[source]

Returns a selection that includes all of the provided asset checks.

static checks_for_assets(*assets_defs)[source]

Returns a selection with the asset checks that target the provided assets.

downstream(depth=None, include_self=True)[source]

Returns a selection that includes all assets that are downstream of any of the assets in this selection, selecting the assets in this selection by default. Includes the asset checks targeting the returned assets. Iterates through each asset in this selection and returns the union of all downstream assets.

depth (Optional[int]): If provided, then only include assets to the given depth. A depth

of 2 means all assets that are children or grandchildren of the assets in this selection.

include_self (bool): If True, then include the assets in this selection in the result.

If the include_self flag is False, return each downstream asset that is not part of the original selection. By default, set to True.

static groups(*group_strs, include_sources=False)[source]

Returns a selection that includes materializable assets that belong to any of the provided groups and all the asset checks that target them.

Parameters:

include_sources (bool) – If True, then include source assets matching the group in the selection.

static key_prefixes(*key_prefixes, include_sources=False)[source]

Returns a selection that includes assets that match any of the provided key prefixes and all the asset checks that target them.

Parameters:

include_sources (bool) – If True, then include source assets matching the key prefix(es) in the selection.

Examples

# match any asset key where the first segment is equal to "a" or "b"
# e.g. AssetKey(["a", "b", "c"]) would match, but AssetKey(["abc"]) would not.
AssetSelection.key_prefixes("a", "b")

# match any asset key where the first two segments are ["a", "b"] or ["a", "c"]
AssetSelection.key_prefixes(["a", "b"], ["a", "c"])
static keys(*asset_keys)[source]

Returns a selection that includes assets with any of the provided keys and all asset checks that target them.

Examples

AssetSelection.keys(AssetKey(["a"]))

AssetSelection.keys("a")

AssetSelection.keys(AssetKey(["a"]), AssetKey(["b"]))

AssetSelection.keys("a", "b")

asset_key_list = [AssetKey(["a"]), AssetKey(["b"])]
AssetSelection.keys(*asset_key_list)
required_multi_asset_neighbors()[source]

Given an asset selection in which some assets are output from a multi-asset compute op which cannot be subset, returns a new asset selection that contains all of the assets required to execute the original asset selection. Includes the asset checks targeting the returned assets.

roots()[source]

Given an asset selection, returns a new asset selection that contains all of the root assets within the original asset selection. Includes the asset checks targeting the returned assets.

A root asset is an asset that has no upstream dependencies within the asset selection. The root asset can have downstream dependencies outside of the asset selection.

Because mixed selections of source and materializable assets are currently not supported, keys corresponding to SourceAssets will not be included as roots. To select source assets, use the upstream_source_assets method.

sinks()[source]

Given an asset selection, returns a new asset selection that contains all of the sink assets within the original asset selection. Includes the asset checks targeting the returned assets.

A sink asset is an asset that has no downstream dependencies within the asset selection. The sink asset can have downstream dependencies outside of the asset selection.

sources()[source]

deprecated This API will be removed in version 2.0.

Use AssetSelection.roots instead..

Given an asset selection, returns a new asset selection that contains all of the root assets within the original asset selection. Includes the asset checks targeting the returned assets.

A root asset is a materializable asset that has no upstream dependencies within the asset selection. The root asset can have downstream dependencies outside of the asset selection.

Because mixed selections of source and materializable assets are currently not supported, keys corresponding to SourceAssets will not be included as roots. To select source assets, use the upstream_source_assets method.

upstream(depth=None, include_self=True)[source]

Returns a selection that includes all materializable assets that are upstream of any of the assets in this selection, selecting the assets in this selection by default. Includes the asset checks targeting the returned assets. Iterates through each asset in this selection and returns the union of all upstream assets.

Because mixed selections of source and materializable assets are currently not supported, keys corresponding to SourceAssets will not be included as upstream of regular assets.

Parameters:
  • depth (Optional[int]) – If provided, then only include assets to the given depth. A depth of 2 means all assets that are parents or grandparents of the assets in this selection.

  • include_self (bool) – If True, then include the assets in this selection in the result. If the include_self flag is False, return each upstream asset that is not part of the original selection. By default, set to True.

upstream_source_assets()[source]

Given an asset selection, returns a new asset selection that contains all of the source assets that are parents of assets in the original selection. Includes the asset checks targeting the returned assets.

without_checks()[source]

Removes all asset checks in the selection.

class dagster.FreshnessPolicy(*, maximum_lag_minutes, cron_schedule=None, cron_schedule_timezone=None)[source]

experimental This API may break in future versions, even between dot releases.

A FreshnessPolicy specifies how up-to-date you want a given asset to be.

Attaching a FreshnessPolicy to an asset definition encodes an expectation on the upstream data that you expect to be incorporated into the current state of that asset at certain points in time. How this is calculated differs depending on if the asset is unpartitioned or time-partitioned (other partitioning schemes are not supported).

For time-partitioned assets, the current data time for the asset is simple to calculate. The upstream data that is incorporated into the asset is exactly the set of materialized partitions for that asset. Thus, the current data time for the asset is simply the time up to which all partitions have been materialized.

For unpartitioned assets, the current data time is based on the upstream materialization records that were read to generate the current state of the asset. More specifically, imagine you have two assets, where A depends on B. If B has a FreshnessPolicy defined, this means that at time T, the most recent materialization of B should have come after a materialization of A which was no more than maximum_lag_minutes ago. This calculation is recursive: any given asset is expected to incorporate up-to-date data from all of its upstream assets.

It is assumed that all asset definitions with no upstream asset definitions consume from some always-updating source. That is, if you materialize that asset at time T, it will incorporate all data up to time T.

If cron_schedule is not defined, the given asset will be expected to incorporate upstream data from no more than maximum_lag_minutes ago at all points in time. For example, “The events table should always have data from at most 1 hour ago”.

If cron_schedule is defined, the given asset will be expected to incorporate upstream data from no more than maximum_lag_minutes ago at each cron schedule tick. For example, “By 9AM, the signups table should contain all of yesterday’s data”.

The freshness status of assets with policies defined will be visible in the UI. If you are using an asset reconciliation sensor, this sensor will kick off runs to help keep your assets up to date with respect to their FreshnessPolicy.

Parameters:
  • maximum_lag_minutes (float) – An upper bound for how old the data contained within this asset may be.

  • cron_schedule (Optional[str]) – A cron schedule string (e.g. "0 1 * * *") specifying a series of times by which the maximum_lag_minutes constraint must be satisfied. If no cron schedule is provided, then this constraint must be satisfied at all times.

  • cron_schedule_timezone (Optional[str]) – Timezone in which the cron schedule should be evaluated. If not specified, defaults to UTC. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

# At any point in time, this asset must incorporate all upstream data from at least 30 minutes ago.
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=30))
def fresh_asset():
    ...

# At any point in time, this asset must incorporate all upstream data from at least 30 minutes ago.
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=30))
def cron_up_to_date_asset():
    ...
class dagster.AutoMaterializePolicy(rules, max_materializations_per_minute=1)[source]

experimental This API may break in future versions, even between dot releases.

An AutoMaterializePolicy specifies how Dagster should attempt to keep an asset up-to-date.

Each policy consists of a set of AutoMaterializeRules, which are used to determine whether an asset or a partition of an asset should or should not be auto-materialized.

The most common policy is AutoMaterializePolicy.eager(), which consists of the following rules:

  • AutoMaterializeRule.materialize_on_missing()

    Materialize an asset or a partition if it has never been materialized.

  • AutoMaterializeRule.materialize_on_parent_updated()

    Materialize an asset or a partition if one of its parents have been updated more recently than it has.

  • AutoMaterializeRule.materialize_on_required_for_freshness()

    Materialize an asset or a partition if it is required to satisfy a freshness policy.

  • AutoMaterializeRule.skip_on_parent_outdated()

    Skip materializing an asset or partition if any of its parents have ancestors that have been materialized more recently.

  • AutoMaterializeRule.skip_on_parent_missing()

    Skip materializing an asset or a partition if any parent has never been materialized or observed.

Policies can be customized by adding or removing rules. For example, if you’d like to allow an asset to be materialized even if some of its parent partitions are missing:

from dagster import AutoMaterializePolicy, AutoMaterializeRule

my_policy = AutoMaterializePolicy.eager().without_rules(
    AutoMaterializeRule.skip_on_parent_missing(),
)

If you’d like an asset to wait for all of its parents to be updated before materializing:

from dagster import AutoMaterializePolicy, AutoMaterializeRule

my_policy = AutoMaterializePolicy.eager().with_rules(
    AutoMaterializeRule.skip_on_all_parents_not_updated(),
)

Lastly, the max_materializations_per_minute parameter, which is set to 1 by default, rate-limits the number of auto-materializations that can occur for a particular asset within a short time interval. This mainly matters for partitioned assets. Its purpose is to provide a safeguard against “surprise backfills”, where user-error causes auto-materialize to be accidentally triggered for large numbers of partitions at once.

Warning:

Constructing an AutoMaterializePolicy directly is not recommended as the API is subject to change. AutoMaterializePolicy.eager() and AutoMaterializePolicy.lazy() are the recommended API.

static eager(max_materializations_per_minute=1)[source]

Constructs an eager AutoMaterializePolicy.

Parameters:

max_materializations_per_minute (Optional[int]) – The maximum number of auto-materializations for this asset that may be initiated per minute. If this limit is exceeded, the partitions which would have been materialized will be discarded, and will require manual materialization in order to be updated. Defaults to 1.

static lazy(max_materializations_per_minute=1)[source]

Constructs a lazy AutoMaterializePolicy.

Parameters:

max_materializations_per_minute (Optional[int]) – The maximum number of auto-materializations for this asset that may be initiated per minute. If this limit is exceeded, the partitions which would have been materialized will be discarded, and will require manual materialization in order to be updated. Defaults to 1.

with_rules(*rules_to_add)[source]

Constructs a copy of this policy with the specified rules added. If an instance of a provided rule with the same type exists on this policy, it will be replaced.

without_rules(*rules_to_remove)[source]

Constructs a copy of this policy with the specified rules removed. Raises an error if any of the arguments are not rules in this policy.

class dagster.AutoMaterializeRule[source]

An AutoMaterializeRule defines a bit of logic which helps determine if a materialization should be kicked off for a given asset partition.

Each rule can have one of two decision types, MATERIALIZE (indicating that an asset partition should be materialized) or SKIP (indicating that the asset partition should not be materialized).

Materialize rules are evaluated first, and skip rules operate over the set of candidates that are produced by the materialize rules. Other than that, there is no ordering between rules.

static materialize_on_cron(cron_schedule, timezone='UTC', all_partitions=False)[source]

Materialize an asset partition if it has not been materialized since the latest cron schedule tick. For assets with a time component to their partitions_def, this rule will request all partitions that have been missed since the previous tick.

Parameters:
  • cron_schedule (str) – A cron schedule string (e.g. “0 * * * *”) indicating the ticks for which this rule should fire.

  • timezone (str) – The timezone in which this cron schedule should be evaluated. Defaults to “UTC”.

  • all_partitions (bool) – If True, this rule fires for all partitions of this asset on each cron tick. If False, this rule fires only for the last partition of this asset. Defaults to False.

static materialize_on_missing()[source]

Materialize an asset partition if it has never been materialized before. This rule will not fire for non-root assets unless that asset’s parents have been updated.

static materialize_on_parent_updated(updated_parent_filter=None)[source]

Materialize an asset partition if one of its parents has been updated more recently than it has.

Note: For time-partitioned or dynamic-partitioned assets downstream of an unpartitioned asset, this rule will only fire for the most recent partition of the downstream.

Parameters:

updated_parent_filter (Optional[AutoMaterializeAssetPartitionsFilter]) – Filter to apply to updated parents. If a parent was updated but does not pass the filter criteria, then it won’t count as updated for the sake of this rule.

static materialize_on_required_for_freshness()[source]

Materialize an asset partition if it is required to satisfy a freshness policy of this asset or one of its downstream assets.

Note: This rule has no effect on partitioned assets.

static skip_on_backfill_in_progress(all_partitions=False)[source]

Skip an asset’s partitions if targeted by an in-progress backfill.

Parameters:

all_partitions (bool) – If True, skips all partitions of the asset being backfilled, regardless of whether the specific partition is targeted by a backfill. If False, skips only partitions targeted by a backfill. Defaults to False.

static skip_on_not_all_parents_updated(require_update_for_all_parent_partitions=False)[source]

Skip materializing an asset partition if any of its parents have not been updated since the asset’s last materialization.

Parameters:

require_update_for_all_parent_partitions (Optional[bool]) – Applies only to an unpartitioned asset or an asset partition that depends on more than one partition in any upstream asset. If true, requires all upstream partitions in each upstream asset to be materialized since the downstream asset’s last materialization in order to update it. If false, requires at least one upstream partition in each upstream asset to be materialized since the downstream asset’s last materialization in order to update it. Defaults to false.

static skip_on_parent_missing()[source]

Skip materializing an asset partition if one of its parent asset partitions has never been materialized (for regular assets) or observed (for observable source assets).

static skip_on_parent_outdated()[source]

Skip materializing an asset partition if any of its parents has not incorporated the latest data from its ancestors.

static skip_on_required_but_nonexistent_parents()[source]

Skip an asset partition if it depends on parent partitions that do not exist.

For example, imagine a downstream asset is time-partitioned, starting in 2022, but has a time-partitioned parent which starts in 2023. This rule will skip attempting to materialize downstream partitions from before 2023, since the parent partitions do not exist.

dagster.load_assets_from_modules(modules, group_name=None, key_prefix=None, *, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None, source_key_prefix=None)[source]

Constructs a list of assets and source assets from the given modules.

Parameters:
  • modules (Iterable[ModuleType]) – The Python modules to look for assets inside.

  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

  • freshness_policy (Optional[FreshnessPolicy]) – FreshnessPolicy to apply to all the loaded assets.

  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – AutoMaterializePolicy to apply to all the loaded assets.

  • backfill_policy (Optional[AutoMaterializePolicy]) – BackfillPolicy to apply to all the loaded assets.

  • source_key_prefix (bool) – Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets and source assets defined in the given modules.

Return type:

Sequence[Union[AssetsDefinition, SourceAsset]]

dagster.load_assets_from_current_module(group_name=None, key_prefix=None, *, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None, source_key_prefix=None)[source]

Constructs a list of assets, source assets, and cacheable assets from the module where this function is called.

Parameters:
  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

  • freshness_policy (Optional[FreshnessPolicy]) – FreshnessPolicy to apply to all the loaded assets.

  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – AutoMaterializePolicy to apply to all the loaded assets.

  • backfill_policy (Optional[AutoMaterializePolicy]) – BackfillPolicy to apply to all the loaded assets.

  • source_key_prefix (bool) – Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets, source assets, and cacheable assets defined in the module.

Return type:

Sequence[Union[AssetsDefinition, SourceAsset, CachableAssetsDefinition]]

dagster.load_assets_from_package_module(package_module, group_name=None, key_prefix=None, *, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None, source_key_prefix=None)[source]

Constructs a list of assets and source assets that includes all asset definitions, source assets, and cacheable assets in all sub-modules of the given package module.

A package module is the result of importing a package.

Parameters:
  • package_module (ModuleType) – The package module to looks for assets inside.

  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

  • freshness_policy (Optional[FreshnessPolicy]) – FreshnessPolicy to apply to all the loaded assets.

  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – AutoMaterializePolicy to apply to all the loaded assets.

  • backfill_policy (Optional[AutoMaterializePolicy]) – BackfillPolicy to apply to all the loaded assets.

  • source_key_prefix (bool) – Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets, source assets, and cacheable assets defined in the module.

Return type:

Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]

dagster.load_assets_from_package_name(package_name, group_name=None, key_prefix=None, *, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None, source_key_prefix=None)[source]

Constructs a list of assets, source assets, and cacheable assets that includes all asset definitions and source assets in all sub-modules of the given package.

Parameters:
  • package_name (str) – The name of a Python package to look for assets inside.

  • group_name (Optional[str]) – Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended.

  • freshness_policy (Optional[FreshnessPolicy]) – FreshnessPolicy to apply to all the loaded assets.

  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – AutoMaterializePolicy to apply to all the loaded assets.

  • backfill_policy (Optional[AutoMaterializePolicy]) – BackfillPolicy to apply to all the loaded assets.

  • source_key_prefix (bool) – Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended.

Returns:

A list containing assets, source assets, and cacheable assets defined in the module.

Return type:

Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]

class dagster.AssetsDefinition(*, keys_by_input_name, keys_by_output_name, node_def, partitions_def=None, partition_mappings=None, asset_deps=None, selected_asset_keys=None, can_subset=False, resource_defs=None, group_names_by_key=None, metadata_by_key=None, freshness_policies_by_key=None, auto_materialize_policies_by_key=None, backfill_policy=None, descriptions_by_key=None, check_specs_by_output_name=None, selected_asset_check_keys=None, is_subset=False)[source]

Defines a set of assets that are produced by the same op or graph.

AssetsDefinitions are typically not instantiated directly, but rather produced using the @asset or @multi_asset decorators.

property asset_deps

Maps assets that are produced by this definition to assets that they depend on. The dependencies can be either “internal”, meaning that they refer to other assets that are produced by this definition, or “external”, meaning that they refer to assets that aren’t produced by this definition.

property can_subset

If True, indicates that this AssetsDefinition may materialize any subset of its asset keys in a given computation (as opposed to being required to materialize all asset keys).

Type:

bool

property check_specs

Returns the asset check specs defined on this AssetsDefinition, i.e. the checks that can be executed while materializing the assets.

Return type:

Iterable[AssetsCheckSpec]

property dependency_keys

The asset keys which are upstream of any asset included in this AssetsDefinition.

Type:

Iterable[AssetKey]

property descriptions_by_key

Returns a mapping from the asset keys in this AssetsDefinition to the descriptions assigned to them. If there is no assigned description for a given AssetKey, it will not be present in this dictionary.

Type:

Mapping[AssetKey, str]

static from_graph(graph_def, *, keys_by_input_name=None, keys_by_output_name=None, key_prefix=None, internal_asset_deps=None, partitions_def=None, partition_mappings=None, resource_defs=None, group_name=None, group_names_by_output_name=None, descriptions_by_output_name=None, metadata_by_output_name=None, freshness_policies_by_output_name=None, auto_materialize_policies_by_output_name=None, backfill_policy=None, can_subset=False, check_specs=None)[source]

Constructs an AssetsDefinition from a GraphDefinition.

Parameters:
  • graph_def (GraphDefinition) – The GraphDefinition that is an asset.

  • keys_by_input_name (Optional[Mapping[str, AssetKey]]) – A mapping of the input names of the decorated graph to their corresponding asset keys. If not provided, the input asset keys will be created from the graph input names.

  • keys_by_output_name (Optional[Mapping[str, AssetKey]]) – A mapping of the output names of the decorated graph to their corresponding asset keys. If not provided, the output asset keys will be created from the graph output names.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, key_prefix will be prepended to each key in keys_by_output_name. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by the graph depend on all assets that are consumed by that graph. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the graph.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

  • resource_defs (Optional[Mapping[str, ResourceDefinition]]) – experimental (This parameter may break in future versions, even between dot releases.) (Experimental) A mapping of resource keys to resource definitions. These resources will be initialized during execution, and can be accessed from the body of ops in the graph during execution.

  • group_name (Optional[str]) – A group name for the constructed asset. Assets without a group name are assigned to a group called “default”.

  • group_names_by_output_name (Optional[Mapping[str, Optional[str]]]) – Defines a group name to be associated with some or all of the output assets for this node. Keys are names of the outputs, and values are the group name. Cannot be used with the group_name argument.

  • descriptions_by_output_name (Optional[Mapping[str, Optional[str]]]) – Defines a description to be associated with each of the output asstes for this graph.

  • metadata_by_output_name (Optional[Mapping[str, Optional[MetadataUserInput]]]) – Defines metadata to be associated with each of the output assets for this node. Keys are names of the outputs, and values are dictionaries of metadata to be associated with the related asset.

  • freshness_policies_by_output_name (Optional[Mapping[str, Optional[FreshnessPolicy]]]) – Defines a FreshnessPolicy to be associated with some or all of the output assets for this node. Keys are the names of the outputs, and values are the FreshnessPolicies to be attached to the associated asset.

  • auto_materialize_policies_by_output_name (Optional[Mapping[str, Optional[AutoMaterializePolicy]]]) – Defines an AutoMaterializePolicy to be associated with some or all of the output assets for this node. Keys are the names of the outputs, and values are the AutoMaterializePolicies to be attached to the associated asset.

  • backfill_policy (Optional[BackfillPolicy]) – Defines this asset’s BackfillPolicy

static from_op(op_def, *, keys_by_input_name=None, keys_by_output_name=None, key_prefix=None, internal_asset_deps=None, partitions_def=None, partition_mappings=None, group_name=None, group_names_by_output_name=None, descriptions_by_output_name=None, metadata_by_output_name=None, freshness_policies_by_output_name=None, auto_materialize_policies_by_output_name=None, backfill_policy=None, can_subset=False)[source]

Constructs an AssetsDefinition from an OpDefinition.

Parameters:
  • op_def (OpDefinition) – The OpDefinition that is an asset.

  • keys_by_input_name (Optional[Mapping[str, AssetKey]]) – A mapping of the input names of the decorated op to their corresponding asset keys. If not provided, the input asset keys will be created from the op input names.

  • keys_by_output_name (Optional[Mapping[str, AssetKey]]) – A mapping of the output names of the decorated op to their corresponding asset keys. If not provided, the output asset keys will be created from the op output names.

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, key_prefix will be prepended to each key in keys_by_output_name. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by the op depend on all assets that are consumed by that op. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • partition_mappings (Optional[Mapping[str, PartitionMapping]]) – Defines how to map partition keys for this asset to partition keys of upstream assets. Each key in the dictionary correponds to one of the input assets, and each value is a PartitionMapping. If no entry is provided for a particular asset dependency, the partition mapping defaults to the default partition mapping for the partitions definition, which is typically maps partition keys to the same partition keys in upstream assets.

  • group_name (Optional[str]) – A group name for the constructed asset. Assets without a group name are assigned to a group called “default”.

  • group_names_by_output_name (Optional[Mapping[str, Optional[str]]]) – Defines a group name to be associated with some or all of the output assets for this node. Keys are names of the outputs, and values are the group name. Cannot be used with the group_name argument.

  • descriptions_by_output_name (Optional[Mapping[str, Optional[str]]]) – Defines a description to be associated with each of the output asstes for this graph.

  • metadata_by_output_name (Optional[Mapping[str, Optional[MetadataUserInput]]]) – Defines metadata to be associated with each of the output assets for this node. Keys are names of the outputs, and values are dictionaries of metadata to be associated with the related asset.

  • freshness_policies_by_output_name (Optional[Mapping[str, Optional[FreshnessPolicy]]]) – Defines a FreshnessPolicy to be associated with some or all of the output assets for this node. Keys are the names of the outputs, and values are the FreshnessPolicies to be attached to the associated asset.

  • auto_materialize_policies_by_output_name (Optional[Mapping[str, Optional[AutoMaterializePolicy]]]) – Defines an AutoMaterializePolicy to be associated with some or all of the output assets for this node. Keys are the names of the outputs, and values are the AutoMaterializePolicies to be attached to the associated asset.

  • backfill_policy (Optional[BackfillPolicy]) – Defines this asset’s BackfillPolicy

get_partition_mapping(in_asset_key)[source]

Returns the partition mapping between keys in this AssetsDefinition and a given input asset key (if any).

property group_names_by_key

Returns a mapping from the asset keys in this AssetsDefinition to the group names assigned to them. If there is no assigned group name for a given AssetKey, it will not be present in this dictionary.

Type:

Mapping[AssetKey, str]

property key

The asset key associated with this AssetsDefinition. If this AssetsDefinition has more than one asset key, this will produce an error.

Type:

AssetKey

property keys

The asset keys associated with this AssetsDefinition.

Type:

AbstractSet[AssetKey]

property node_def

Returns the OpDefinition or GraphDefinition that is used to materialize the assets in this AssetsDefinition.

Type:

NodeDefinition

property op

Returns the OpDefinition that is used to materialize the assets in this AssetsDefinition.

Type:

OpDefinition

property partitions_def

The PartitionsDefinition for this AssetsDefinition (if any).

Type:

Optional[PartitionsDefinition]

property required_resource_keys

The set of keys for resources that must be provided to this AssetsDefinition.

Type:

Set[str]

property resource_defs

A mapping from resource name to ResourceDefinition for the resources bound to this AssetsDefinition.

Type:

Mapping[str, ResourceDefinition]

to_source_asset(key=None)[source]

Returns a representation of this asset as a SourceAsset.

If this is a multi-asset, the “key” argument allows selecting which asset to return a SourceAsset representation of.

Parameters:

key (Optional[Union[str, Sequence[str], AssetKey]]]) – If this is a multi-asset, select which asset to return a SourceAsset representation of. If not a multi-asset, this can be left as None.

Returns:

SourceAsset

to_source_assets()[source]

Returns a SourceAsset for each asset in this definition.

Each produced SourceAsset will have the same key, metadata, io_manager_key, etc. as the corresponding asset

@dagster.multi_asset(*, outs=None, name=None, ins=None, deps=None, description=None, config_schema=None, required_resource_keys=None, compute_kind=None, internal_asset_deps=None, partitions_def=None, backfill_policy=None, op_tags=None, can_subset=False, resource_defs=None, group_name=None, retry_policy=None, code_version=None, specs=None, check_specs=None, non_argument_deps=None)[source]

Create a combined definition of multiple assets that are computed using the same op and same upstream assets.

Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset.

You can set I/O managers keys, auto-materialize policies, freshness policies, group names, etc. on an individual asset within the multi-asset by attaching them to the AssetOut corresponding to that asset in the outs parameter.

Parameters:
  • name (Optional[str]) – The name of the op.

  • outs – (Optional[Dict[str, AssetOut]]): The AssetOuts representing the assets materialized by this function. AssetOuts detail the output, IO management, and core asset properties. This argument is required except when AssetSpecs are used.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, AssetKey, str]]]) – The assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. If the AssetsDefinition for a multi_asset is provided, dependencies on all assets created by the multi_asset will be created.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the asset’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by the underlying op.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that produces the asset, e.g. “dbt” or “spark”. It will be displayed in the Dagster UI as a badge on the asset.

  • internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]) – By default, it is assumed that all assets produced by a multi_asset depend on all assets that are consumed by that multi asset. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • backfill_policy (Optional[BackfillPolicy]) – The backfill policy for the op that computes the asset.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • can_subset (bool) – If this asset’s computation can emit a subset of the asset keys based on the context.selected_assets argument. Defaults to False.

  • resource_defs (Optional[Mapping[str, object]]) – experimental (This parameter may break in future versions, even between dot releases.) (Experimental) A mapping of resource keys to resources. These resources will be initialized during execution, and can be accessed from the context within the body of the function.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that computes the asset.

  • code_version (Optional[str]) – (Experimental) Version of the code encapsulated by the multi-asset. If set, this is used as a default code version for all defined assets.

  • specs (Optional[Sequence[AssetSpec]]) – (Experimental) The specifications for the assets materialized by this function.

  • check_specs (Optional[Sequence[AssetCheckSpec]]) – (Experimental) Specs for asset checks that execute in the decorated function after materializing the assets.

  • non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]) – deprecated (This parameter will be removed in version 2.0.0. use deps instead.) Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the multi_asset.

Examples

# Use IO managers to handle I/O:
@multi_asset(
    outs={
        "my_string_asset": AssetOut(),
        "my_int_asset": AssetOut(),
    }
)
def my_function(upstream_asset: int):
    result = upstream_asset + 1
    return str(result), result

# Handle I/O on your own:
@multi_asset(
    outs={
        "asset1": AssetOut(),
        "asset2": AssetOut(),
    },
    deps=["asset0"],
)
def my_function():
    asset0_value = load(path="asset0")
    asset1_result, asset2_result = do_some_transformation(asset0_value)
    write(asset1_result, path="asset1")
    write(asset2_result, path="asset2")
    return None, None
@dagster.graph_asset(compose_fn=None, *, name=None, description=None, ins=None, config=None, key_prefix=None, group_name=None, partitions_def=None, metadata=None, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None, resource_defs=None, check_specs=None, key=None)[source]

Creates a software-defined asset that’s computed using a graph of ops.

This decorator is meant to decorate a function that composes a set of ops or graphs to define the dependencies between them.

Parameters:
  • name (Optional[str]) – The name of the asset. If not provided, defaults to the name of the decorated function. The asset’s name must be a valid name in Dagster (ie only contains letters, numbers, and underscores) and may not contain Python reserved keywords.

  • description (Optional[str]) – A human-readable description of the asset.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • config (Optional[Union[ConfigMapping], Mapping[str, Any]) –

    Describes how the graph underlying the asset is configured at runtime.

    If a ConfigMapping object is provided, then the graph takes on the config schema of this object. The mapping will be applied at runtime to generate the config for the graph’s constituent nodes.

    If a dictionary is provided, then it will be used as the default run config for the graph. This means it must conform to the config schema of the underlying nodes. Note that the values provided will be viewable and editable in the Dagster UI, so be careful with secrets.

    If no value is provided, then the config schema for the graph is the default (derived from the underlying nodes).

  • key_prefix (Optional[Union[str, Sequence[str]]]) – If provided, the asset’s key is the concatenation of the key_prefix and the asset’s name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in Dagster (ie only contains letters, numbers, and underscores) and may not contain Python reserved keywords.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the asset.

  • metadata (Optional[MetadataUserInput]) – Dictionary of metadata to be associated with the asset.

  • freshness_policy (Optional[FreshnessPolicy]) – A constraint telling Dagster how often this asset is intended to be updated with respect to its root data.

  • auto_materialize_policy (Optional[AutoMaterializePolicy]) – The AutoMaterializePolicy to use for this asset.

  • backfill_policy (Optional[BackfillPolicy]) – The BackfillPolicy to use for this asset.

  • key (Optional[CoeercibleToAssetKey]) – The key for this asset. If provided, cannot specify key_prefix or name.

Examples

@op
def fetch_files_from_slack(context) -> pd.DataFrame:
    ...

@op
def store_files(files) -> None:
    files.to_sql(name="slack_files", con=create_db_connection())

@graph_asset
def slack_files_table():
    return store_files(fetch_files_from_slack())
@dagster.graph_multi_asset(*, outs, name=None, ins=None, partitions_def=None, backfill_policy=None, group_name=None, can_subset=False, resource_defs=None, check_specs=None, config=None)[source]

Create a combined definition of multiple assets that are computed using the same graph of ops, and the same upstream assets.

Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset.

Parameters:
  • name (Optional[str]) – The name of the graph.

  • outs – (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets.

  • ins (Optional[Mapping[str, AssetIn]]) – A dictionary that maps input names to information about the input.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the assets.

  • backfill_policy (Optional[BackfillPolicy]) – The backfill policy for the asset.

  • group_name (Optional[str]) – A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset.

  • can_subset (bool) – Whether this asset’s computation can emit a subset of the asset keys based on the context.selected_assets argument. Defaults to False.

  • config (Optional[Union[ConfigMapping], Mapping[str, Any]) –

    Describes how the graph underlying the asset is configured at runtime.

    If a ConfigMapping object is provided, then the graph takes on the config schema of this object. The mapping will be applied at runtime to generate the config for the graph’s constituent nodes.

    If a dictionary is provided, then it will be used as the default run config for the graph. This means it must conform to the config schema of the underlying nodes. Note that the values provided will be viewable and editable in the Dagster UI, so be careful with secrets.

    If no value is provided, then the config schema for the graph is the default (derived

    from the underlying nodes).

class dagster.AssetOut(key_prefix=None, key=None, dagster_type=<class 'dagster._core.definitions.utils.NoValueSentinel'>, description=None, is_required=True, io_manager_key=None, metadata=None, group_name=None, code_version=None, freshness_policy=None, auto_materialize_policy=None, backfill_policy=None)[source]

Defines one of the assets produced by a @multi_asset.

key_prefix

If provided, the asset’s key is the concatenation of the key_prefix and the asset’s name. When using @multi_asset, the asset name defaults to the key of the “outs” dictionary Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str]]]

key

The asset’s key. Only one of the “key_prefix” and “key” arguments should be provided.

Type:

Optional[Union[str, Sequence[str], AssetKey]]

dagster_type

The type of this output. Should only be set if the correct type can not be inferred directly from the type signature of the decorated function.

Type:

Optional[Union[Type, DagsterType]]]

description

Human-readable description of the output.

Type:

Optional[str]

is_required

Whether the presence of this field is required. (default: True)

Type:

bool

io_manager_key

The resource key of the IO manager used for this output. (default: “io_manager”).

Type:

Optional[str]

metadata

A dict of the metadata for the output. For example, users can provide a file path if the data object will be stored in a filesystem, or provide information of a database table when it is going to load the data into the table.

Type:

Optional[Dict[str, Any]]

group_name

A string name used to organize multiple assets into groups. If not provided, the name “default” is used.

Type:

Optional[str]

code_version

The version of the code that generates this asset.

Type:

Optional[str]

freshness_policy

A policy which indicates how up to date this asset is intended to be.

Type:

Optional[FreshnessPolicy]

auto_materialize_policy

AutoMaterializePolicy to apply to the specified asset.

Type:

Optional[AutoMaterializePolicy]

backfill_policy

BackfillPolicy to apply to the specified asset.

Type:

Optional[BackfillPolicy]

class dagster.AssetValueLoader(assets_defs_by_key, source_assets_by_key, instance=None)[source]

Caches resource definitions that are used to load asset values across multiple load invocations.

Should not be instantiated directly. Instead, use get_asset_value_loader().

load_asset_value(asset_key, *, python_type=None, partition_key=None, metadata=None, resource_config=None)[source]

Loads the contents of an asset as a Python object.

Invokes load_input on the IOManager associated with the asset.

Parameters:
  • asset_key (Union[AssetKey, Sequence[str], str]) – The key of the asset to load.

  • python_type (Optional[Type]) – The python type to load the asset as. This is what will be returned inside load_input by context.dagster_type.typing_type.

  • partition_key (Optional[str]) – The partition of the asset to load.

  • metadata (Optional[Dict[str, Any]]) – Input metadata to pass to the IOManager (is equivalent to setting the metadata argument in In or AssetIn).

  • resource_config (Optional[Any]) – A dictionary of resource configurations to be passed to the IOManager.

Returns:

The contents of an asset as a Python object.