Graph-backed assets
Basic assets are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and assembling them into an op graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries, but doesn't require you to link each intermediate value to an asset in persistent storage.
Relevant APIs
Name | Description |
---|---|
@dg.graph_asset | Decorator for defining an asset that's computed using a graph of ops. The dependencies between the ops are specified inside the body of the decorated function. |
@dg.graph_multi_asset | Decorator for defining a set of assets that are computed using a graph of ops. The dependencies between the ops are specified inside the body of the decorated function. |
AssetsDefinition.from_graph | Constructs an asset, given a graph definition. Useful if you have a single graph that you want to use to power multiple different assets. |
Defining graph-backed assets
To define a graph-backed asset, use the @dg.graph_asset
decorator. The decorated function defines the dependencies between a set of ops, which are combined to compute the asset.
In the example below, when you tell Dagster to materialize the slack_files_table
asset, Dagster will invoke fetch_files_from_slack
and then invoke store_files
after fetch_files_from_slack
has completed:
import pandas as pd
from dagster import graph_asset, op
from dagster_slack import SlackResource
@op
def fetch_files_from_slack(slack: SlackResource) -> pd.DataFrame:
files = slack.get_client().files_list(channel="#random")
return pd.DataFrame(
[
{
"id": file.get("id"),
"created": file.get("created"),
"title": file.get("title"),
"permalink": file.get("permalink"),
}
for file in files
]
)
@op
def store_files(files):
return files.to_sql(name="slack_files", con=create_db_connection())
@graph_asset
def slack_files_table():
return store_files(fetch_files_from_slack())
Defining managed-loading dependencies for graph-backed assets
Similar to single-op asset definitions, Dagster infers the upstream assets from the names of the arguments to the decorated function. Dagster will then delegate loading the data to an I/O manager.
The example below includes an asset named middle_asset
. middle_asset
depends on upstream_asset
, and downstream_asset
depends on middle_asset
:
from dagster import asset, graph_asset, op
@asset
def upstream_asset():
return 1
@op
def add_one(input_num):
return input_num + 1
@op
def multiply_by_two(input_num):
return input_num * 2
@graph_asset
def middle_asset(upstream_asset):
return multiply_by_two(add_one(upstream_asset))
@asset
def downstream_asset(middle_asset):
return middle_asset + 7
Graph-backed multi-assets
Using the @dg.graph_multi_asset
, you can create a combined definition of multiple assets that are computed using the same graph of ops and same upstream assets.
In the below example, two_assets
accepts upstream_asset
and outputs two assets, first_asset
and second_asset
:
from dagster import AssetOut, graph_multi_asset
@graph_multi_asset(outs={"first_asset": AssetOut(), "second_asset": AssetOut()})
def two_assets(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"first_asset": one, "second_asset": two}
Advanced: Subsetting graph-backed assets
By default, when executing a graph-backed asset, every asset produced by the graph must be materialized. This means that attempting to selectively execute a subset of assets defined in the graph-backed asset will result in an error.
If the underlying computation is sufficiently flexible to selectively output a subset of assets, a graph-backed asset can be subsetted. For example, let’s say we wanted to define a graph-backed asset with the structure depicted in the image below. In this case, we want to independently materialize foo_asset
and baz_asset
.
In order to selectively output an asset from a graph-backed asset, Dagster will run each op that is a dependency of the outputted asset. In the example, if we wanted to selectively materialize foo_asset
, Dagster would run foo
and bar
. If we wanted to selectively materialize baz_asset
, Dagster would run all three ops (foo
, bar
, and baz
).
Because the foo
op yields an asset output (foo_asset
) and is an upstream dependency of another asset generated from the graph (baz_asset
), we need to structure foo
to selectively return outputs depending on the asset subset selected for execution. We can do this by defining foo
to have optional outputs that are yielded conditionally. Dagster provides a context.selected_output_names
object on the op context that will return the outputs necessary to generate the asset subset.
During execution, if we select just baz_asset
for materialization, the below implementation of foo
will return {"foo_2"}
for context.selected_output_names
, preventing foo_asset
from being materialized.
@dg.op(out={"foo_1": dg.Out(is_required=False), "foo_2": dg.Out(is_required=False)})
def foo(context: dg.OpExecutionContext, bar_1):
# Selectively returns outputs based on selected assets
if "foo_1" in context.selected_output_names:
yield dg.Output(bar_1 + 1, output_name="foo_1")
if "foo_2" in context.selected_output_names:
yield dg.Output(bar_1 + 2, output_name="foo_2")
Because Dagster flattens each op graph into a flat input/output mapping between ops under the hood, any op that produces an output of the graph must be structured to yield its outputs optionally, enabling the outputs to be returned independently.
In the example, foo
and baz
produce outputs of my_graph
. Subsequently, their outputs need to be yielded optionally. Because foo
yields multiple outputs, we must structure our code to conditionally yield its outputs like in the code snippet above.
However, because baz
only yields a singular output, Dagster will only run baz
when its asset output baz_asset
is selected. So, we don’t have to structure baz
to return an optional output. Because bar
does not yield any outputs that are returned from my_graph
, its outputs do not have to be selectively returned.
We could define the asset using the code below. Notice that can_subset
must be set to True
in the asset definition to signify that the graph-backed asset can be subsetted.
@dg.op(out={"bar_1": dg.Out(), "bar_2": dg.Out()})
def bar():
return 1, 2
@dg.op
def baz(foo_2, bar_2):
return foo_2 + bar_2
@dg.graph_multi_asset(
outs={"foo_asset": dg.AssetOut(), "baz_asset": dg.AssetOut()}, can_subset=True
)
def my_graph_assets():
bar_1, bar_2 = bar()
foo_1, foo_2 = foo(bar_1)
return {"foo_asset": foo_1, "baz_asset": baz(foo_2, bar_2)}
defs = dg.Definitions(
assets=[my_graph_assets], jobs=[dg.define_asset_job("graph_asset")]
)
Depending on how outputs are returned from the ops within a graph-backed asset, there could be unexpected materializations. For example, the following foo
implementation would unexpectedly materialize foo_asset
if baz_asset
was the only asset selected for execution.
@op(out={"foo_1": Out(), "foo_2": Out()})
def foo():
return 1, 2
# Will unexpectedly materialize foo_asset
defs.get_job_def("my_graph_assets").execute_in_process(
asset_selection=[AssetKey("baz_asset")]
)
This is because the foo
op is an upstream dependency of baz_asset
, and this implementation of foo
returns both the foo_1
and foo_2
outputs. The foo_1
output is returned as the foo_asset
output of the graph, causing an unexpected materialization of foo_asset
.