Ask AI

Graph-Backed Assets#

Basic software-defined assets are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and assembling them into an op graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries, but doesn't require you to link each intermediate value to an asset in persistent storage.


Relevant APIs#

NameDescription
@graph_assetDecorator for defining an asset that's computed using a graph of ops. The dependencies between the ops are specified inside the body of the decorated function.
@graph_multi_assetDecorator for defining a set of assets that are computed using a graph of ops. The dependencies between the ops are specified inside the body of the decorated function.
AssetsDefinition.from_graphConstructs an asset, given a graph definition. Useful if you have a single graph that you want to use to power multiple different assets.

Defining graph-backed assets#

To define a graph-backed asset, use the @graph_asset decorator. The decorated function defines the dependencies between a set of ops, which are combined to compute the asset.

In this case, when you tell Dagster to materialize the slack_files_table asset, Dagster will invoke fetch_files_from_slack and then invoke store_files after fetch_files_from_slack has completed.

import pandas as pd
from dagster import graph_asset, op
from dagster_slack import SlackResource


@op
def fetch_files_from_slack(slack: SlackResource) -> pd.DataFrame:
    files = slack.get_client().files_list(channel="#random")
    return pd.DataFrame(
        [
            {
                "id": file.get("id"),
                "created": file.get("created"),
                "title": file.get("title"),
                "permalink": file.get("permalink"),
            }
            for file in files
        ]
    )


@op
def store_files(files):
    return files.to_sql(name="slack_files", con=create_db_connection())


@graph_asset
def slack_files_table():
    return store_files(fetch_files_from_slack())

Defining managed-loading dependencies for graph-backed assets#

Similar to software-defined assets, Dagster infers the upstream assets from the names of the arguments to the decorated function. Dagster will then delegate loading the data to an I/O manager.

The example below includes an asset named middle_asset. middle_asset depends on upstream_asset, and downstream_asset depends on middle_asset:

from dagster import asset, graph_asset, op


@asset
def upstream_asset():
    return 1


@op
def add_one(input_num):
    return input_num + 1


@op
def multiply_by_two(input_num):
    return input_num * 2


@graph_asset
def middle_asset(upstream_asset):
    return multiply_by_two(add_one(upstream_asset))


@asset
def downstream_asset(middle_asset):
    return middle_asset + 7

Graph-backed multi-assets#

Using the @graph_multi_asset, you can create a combined definition of multiple assets that are computed using the same graph of ops and same upstream assets.

In the below example, two_assets accepts upstream_asset and outputs two assets, first_asset and second_asset:

from dagster import AssetOut, graph_multi_asset


@graph_multi_asset(outs={"first_asset": AssetOut(), "second_asset": AssetOut()})
def two_assets(upstream_asset):
    one, two = two_outputs(upstream_asset)
    return {"first_asset": one, "second_asset": two}

Advanced: Subsetting graph-backed assets#

By default, when executing a graph-backed asset, every asset produced by the graph must be materialized. This means that attempting to selectively execute a subset of assets defined in the graph-backed asset will result in an error.

If the underlying computation is sufficiently flexible to selectively output a subset of assets, a graph-backed asset can be subsetted. For example, let’s say we wanted to define a graph-backed asset with the structure depicted in the image below. In this case, we want to independently materialize foo_asset and baz_asset.

Graph-backed asset

In order to selectively output an asset from a graph-backed asset, Dagster will run each op that is a dependency of the outputted asset. In the example, if we wanted to selectively materialize foo_asset, Dagster would run foo and bar. If we wanted to selectively materialize baz_asset, Dagster would run all three ops (foo, bar, and baz).

Because the foo op yields an asset output (foo_asset) and is an upstream dependency of another asset generated from the graph (baz_asset), we need to structure foo to selectively return outputs depending on the asset subset selected for execution. We can do this by defining foo to have optional outputs that are yielded conditionally. Dagster provides a context.selected_output_names object on the op context that will return the outputs necessary to generate the asset subset.

During execution, if we select just baz_asset for materialization, the below implementation of foo will return {"foo_2"} for context.selected_output_names, preventing foo_asset from being materialized.

@op(out={"foo_1": Out(is_required=False), "foo_2": Out(is_required=False)})
def foo(context: OpExecutionContext, bar_1):
    # Selectively returns outputs based on selected assets
    if "foo_1" in context.selected_output_names:
        yield Output(bar_1 + 1, output_name="foo_1")
    if "foo_2" in context.selected_output_names:
        yield Output(bar_1 + 2, output_name="foo_2")

Because Dagster flattens each op graph into a flat input/output mapping between ops under the hood, any op that produces an output of the graph must be structured to yield its outputs optionally, enabling the outputs to be returned independently.

In the example, foo and baz produce outputs of my_graph. Subsequently, their outputs need to be yielded optionally. Because foo yields multiple outputs, we must structure our code to conditionally yield its outputs like in the code snippet above.

However, because baz only yields a singular output, Dagster will only run baz when its asset output baz_asset is selected. So, we don’t have to structure baz to return an optional output. Because bar does not yield any outputs that are returned from my_graph, its outputs do not have to be selectively returned.

We could define the asset using the code below. Notice that can_subset must be set to True in the asset definition to signify that the graph-backed asset can be subsetted.

@op(out={"bar_1": Out(), "bar_2": Out()})
def bar():
    return 1, 2


@op
def baz(foo_2, bar_2):
    return foo_2 + bar_2


@graph_multi_asset(
    outs={"foo_asset": AssetOut(), "baz_asset": AssetOut()}, can_subset=True
)
def my_graph_assets():
    bar_1, bar_2 = bar()
    foo_1, foo_2 = foo(bar_1)
    return {"foo_asset": foo_1, "baz_asset": baz(foo_2, bar_2)}


defs = Definitions(assets=[my_graph_assets], jobs=[define_asset_job("graph_asset")])

Depending on how outputs are returned from the ops within a graph-backed asset, there could be unexpected materializations. For example, the following foo implementation would unexpectedly materialize foo_asset if baz_asset was the only asset selected for execution.

@op(out={"foo_1": Out(), "foo_2": Out()})
def foo():
    return 1, 2


# Will unexpectedly materialize foo_asset
defs.get_job_def("my_graph_assets").execute_in_process(
    asset_selection=[AssetKey("baz_asset")]
)

This is because the foo op is an upstream dependency of baz_asset, and this implementation of foo returns both the foo_1 and foo_2 outputs. The foo_1 output is returned as the foo_asset output of the graph, causing an unexpected materialization of foo_asset.