Ask AI

Multi-Assets#

A multi-asset represents a set of asset definitions that are all updated by the same op or graph.

Relevant APIs#

NameDescription
@multi_assetA decorator used to define multi-assets.

Overview#

When working with asset definitions, it's sometimes inconvenient or impossible to map each persisted asset to a unique op or graph. A multi-asset is a way to define a single op or graph that will produce the contents of multiple data assets at once.

Multi-assets may be useful in the following scenarios:

  • A single call to an API results in multiple tables being updated (e.g. Airbyte, Fivetran, dbt).
  • The same in-memory object is used to compute multiple assets.

Defining multi-assets#

The function responsible for computing the contents of any asset is an op. Multi-assets are responsible for updating multiple assets, so the underlying op will have multiple outputs -- one for each associated asset.

A basic multi-asset#

The easiest way to create a multi-asset is with the @multi_asset decorator. This decorator functions similarly to the @asset decorator, but requires an outs parameter specifying each output asset of the function.

from dagster import AssetOut, multi_asset, AssetExecutionContext


@multi_asset(
    outs={
        "my_string_asset": AssetOut(),
        "my_int_asset": AssetOut(),
    }
)
def my_function():
    return "abc", 123

By default, the names of the outputs will be used to form the asset keys of the multi-asset. The decorated function will be used to create the op for these assets and must emit an output for each of them. In this case, we can emit multiple outputs by returning a tuple of values, one for each asset.

Conditional materialization#

In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the is_required parameter along with yield syntax to implement this behavior.

If the is_required parameter is set to False on an output, and your function does not yield an Output object for that output, then no asset materialization event will be created, the I/O manager will not be invoked, downstream assets will not be materialized, and asset sensors monitoring the asset will not trigger.

import random

from dagster import AssetOut, Output, asset, multi_asset


@multi_asset(
    outs={"asset1": AssetOut(is_required=False), "asset2": AssetOut(is_required=False)}
)
def assets_1_and_2():
    if random.randint(1, 10) < 5:
        yield Output([1, 2, 3, 4], output_name="asset1")

    if random.randint(1, 10) < 5:
        yield Output([6, 7, 8, 9], output_name="asset2")


@asset
def downstream1(asset1):
    # will not run when assets_1_and_2 doesn't materialize the asset1
    return asset1 + [5]


@asset
def downstream2(asset2):
    # will not run when assets_1_and_2 doesn't materialize the asset2
    return asset2 + [10]

Subsetting multi-assets#

By default, it is assumed that the computation inside of a multi-asset will always produce the contents all of the associated assets. This means that attempting to execute a set of assets that produces some, but not all, of the assets defined by a given multi-asset will result in an error.

Sometimes, the underlying computation is sufficiently flexible to allow for computing an arbitrary subset of the assets associated with it. In these cases, set the is_required attribute of the outputs to False, and set the can_subset parameter of the decorator to True.

Inside the body of the function, we can use context.selected_output_names or context.selected_asset_keys to find out which computations should be run.

from dagster import AssetOut, Output, multi_asset


@multi_asset(
    outs={
        "a": AssetOut(is_required=False),
        "b": AssetOut(is_required=False),
    },
    can_subset=True,
)
def split_actions(context: AssetExecutionContext):
    if "a" in context.op_execution_context.selected_output_names:
        yield Output(value=123, output_name="a")
    if "b" in context.op_execution_context.selected_output_names:
        yield Output(value=456, output_name="b")

Because our outputs are now optional, we can no longer rely on returning a tuple of values, as we don't know in advance which values will be computed. Instead, we explicitly yield each output that we're expected to create.

Dependencies inside multi-assets#

When a multi-asset is created, it is assumed that each output asset depends on all of the input assets. This may not always be the case, however.

In these situations, you may optionally provide a mapping from each output asset to the set of AssetKeys that it depends on. This information is used to display lineage information in the Dagster UI and for parsing selections over your asset graph.

from dagster import AssetKey, AssetOut, Output, multi_asset


@multi_asset(
    outs={"c": AssetOut(), "d": AssetOut()},
    internal_asset_deps={
        "c": {AssetKey("a")},
        "d": {AssetKey("b")},
    },
)
def my_complex_assets(a, b):
    # c only depends on a
    yield Output(value=a + 1, output_name="c")
    # d only depends on b
    yield Output(value=b + 1, output_name="d")

Multi-asset code versions#

Multi-assets may assign different code versions for each of their outputs:

@multi_asset(
    specs=[AssetSpec(key="a", code_version="1"), AssetSpec(key="b", code_version="2")]
)
def multi_asset_with_versions():
    with open("data/a.json", "w") as f:
        json.dump(100, f)
        yield MaterializeResult("a")
    with open("data/b.json", "w") as f:
        json.dump(200, f)
        yield MaterializeResult("b")

Just as with regular assets, these versions are attached to the AssetMaterialization objects for each of the constituent assets and represented in the UI.