DagsterDocs

Asset Materializations #

"Asset" is Dagster's word for an entity, external to ops, that is mutated or created by an op. An asset might be a table in a database that an op appends to, an ML model in a model store that an op overwrites, or even a slack channel that an op writes messages to.

Op outputs often correspond to assets. For example, an op might be responsible for recreating a table, and one of its outputs might be a dataframe containing the contents of that table.

Assets can also have partitions, which refer to slices of the overall asset. The simplest example would be a table that has a partition for each day. A given op execution may simply write a single day's worth of data to that table, rather than dropping the entire table and replacing it with new data.

Dagster lets you track the interactions between ops, outputs, and assets over time and view them in the Dagit Asset Catalog. Every asset has a "key", which serves as a unique identifier for that particular entity. The act of creating or updating the contents of an asset is called a "materialization", and Dagster tracks these materializations using AssetMaterialization events. These events can either be yielded by the user at runtime, or automatically created by Dagster in cases where an AssetKey has been referenced by an op output.

Relevant APIs #

NameDescription
AssetMaterializationDagster event indicating that an op has materialized an asset.
AssetKeyA unique identifier for a particular external asset

Overview #

There are two general patterns for dealing with assets when using Dagster:

  • Put the logic to write/store assets inside the body of an op.
  • Focus the op purely on business logic, and delegate the logic to write/store assets to an IOManager.

Regardless of which pattern you are using, AssetMaterialization events are used to communicate to Dagster that a materialization has occurred. You can create these events either by explicitly yielding them at runtime, or (using an experimental interface), have Dagster automatically generate them by defining that a given op output corresponds to a given AssetKey.

Explicit AssetMaterializations #

One way of recording materialization events is to explicitly yield AssetMaterialization events at runtime. These events should be co-located with your materialization logic, meaning if you store your object within your op body, then you should yield from within that op, and if you store your object using an IOManager, then you should yield the event from your manager.

Yielding an AssetMaterialization from a Op #

To make Dagster aware that we materialized an asset in our op, we can yield an AssetMaterialization event. This would involve changing the following op:

from dagster import op


@op
def my_simple_op():
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    return remote_storage_path

into something like this:

from dagster import op, Output


@op
def my_materialization_op(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage")
    yield Output(remote_storage_path)

Note: Our materialization op must now explicitly yield an Output event instead of relying on the implicit conversion of the return value into an Output event.

We should now see a materialization event in the event log when we execute this op.

asset-materialization

Yielding an AssetMaterialization from an IOManager #

To record that an IOManager has mutated or created an asset, we can yield an AssetMaterialization event from its handle_output method.

class PandasCsvIOManager(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        yield AssetMaterialization(
            asset_key=AssetKey(file_path), description="Persisted result to storage."
        )

Attaching Metadata to an AssetMaterialization #

There are a variety of types of metadata that can be associated with a materialization event, all through the EventMetadataEntry class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the Asset Catalog.

Example: Op body #

from dagster import op, AssetMaterialization, Output, EventMetadata


@op
def my_metadata_materialization_op(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(
        asset_key="my_dataset",
        description="Persisted result to storage",
        metadata={
            "text_metadata": "Text-based metadata for this event",
            "path": EventMetadata.path(remote_storage_path),
            "dashboard_url": EventMetadata.url("http://mycoolsite.com/url_for_my_data"),
            "size (bytes)": calculate_bytes(df),
        },
    )
    yield Output(remote_storage_path)

Example: IOManager #

class PandasCsvIOManagerWithAsset(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        yield AssetMaterialization(
            asset_key=AssetKey(file_path),
            description="Persisted result to storage.",
            metadata={
                "number of rows": obj.shape[0],
                "some_column mean": obj["some_column"].mean(),
            },
        )

Check our API docs for EventMetadataEntry for more details on they types of event metadata available.

Specifying a partition for an AssetMaterialization #

If you are materializing a single slice of an asset (e.g. a single day's worth of data on a larger table), rather than mutating or creating it entirely, you can indicate this to Dagster by including the partition argument on the object.

from dagster import op, AssetMaterialization, Output


@op(config_schema={"date": str})
def my_partitioned_asset_op(context):
    partition_date = context.op_config["date"]
    df = read_df_for_date(partition_date)
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(asset_key="my_dataset", partition=partition_date)
    yield Output(remote_storage_path)

Linking Op Outputs to Assets
Experimental
#

It is fairly common for an asset to correspond to an op output. In the following simplified example, our op produces a dataframe, persists it to storage, and then passes the dataframe along as an output:

from dagster import op, Output, AssetMaterialization


@op
def my_asset_op(context):
    df = read_df()
    persist_to_storage(df)
    yield AssetMaterialization(asset_key="my_dataset")
    yield Output(df)

In this case, the AssetMaterialization and the Output events both correspond to the same data, the dataframe that we have created. With this in mind, we can simplify the above code, and provide useful information to the Dagster framework, by making this link between the my_dataset asset and the output of this op explicit.

Just as there are two places in which you can yield runtime AssetMaterialization events (within an op body and within an IOManager), we provide two different interfaces for linking an op output to to an asset. Regardless of which you choose, every time the op runs and yields that output, an AssetMaterialization event will automatically be created to record this information.

If you specified any metadata entries on the Output event while yielding it (see: Op Event Docs), these entries will automatically be attached to the materialization event for this asset.

Linking assets to an Output Definition
Experimental
#

For cases where you are storing your asset within the body of an op, the easiest way of linking an asset to an op output is with the asset_key parameter on the relevant OutputDefinition in your op.

This parameter can be one of two things. For simple cases, where an op will always be writing to the same asset, you can define a constant AssetKey that identifies the asset you are linking.

from dagster import op, Output, Out, AssetKey


@op(out=Out(asset_key=AssetKey("my_dataset")))
def my_constant_asset_op(context):
    df = read_df()
    persist_to_storage(df)
    yield Output(df)

For cases where the asset that you want to link to an output might change based on some context in the job (such as the job name), you can pass in a function that takes OutputContext and returns an AssetKey.

from dagster import op, OutputContext, Out, Output


def get_asset_key(context: OutputContext):
    job_name = context.step_context.job_name
    return AssetKey(f"my_dataset_{job_name}")


@op(out=Out(asset_key=get_asset_key))
def my_variable_asset_op(context):
    df = read_df()
    persist_to_storage(df)
    yield Output(df)

Linking assets to outputs with an IOManager
Experimental
#

If you've defined a custom IOManager to handle storing your op's outputs, the IOManager will likely be the most natural place to define which asset a particular output will be written to. To do this, you can implement the get_output_asset_key function on your IOManager.

Similar to the above interface, this function takes an OutputContext and returns an AssetKey. The following example functions nearly identically to PandasCsvIOManagerWithMetadata from the runtime example above.

class PandasCsvIOManagerWithOutputAsset(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        yield EventMetadataEntry.int(obj.shape[0], label="number of rows")
        yield EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean")

    def get_output_asset_key(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return AssetKey(file_path)

When an output is linked to an asset in this way, the generated AssetMaterialization event will contain any EventMetadataEntry information yielded from the handle_output function (in addiition to all of the metadata specified on the corresponding Output event).

See the IOManager docs for more information on yielding these entries from an IOManager.

Specifying partitions for an output-linked asset #

If you are already specifying a get_output_asset_key function on your IOManager, you can optionally specify a set of partitions that this manager will be updating or creating by also defining a get_output_asset_partitions function. If you do this, an AssetMaterialization will be created for each of the specified partitions. One useful pattern to pass this partition information (which will likely vary each run) to the manager, is to specify the set of partitions on the configuration of the output. You can do this by providing per-output configuration on the IOManager.

Then, you can calculate the asset partitions that a particular output will correspond to by reading this output configuration in get_output_asset_partitions:

class PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
    def load_input(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)

        obj.to_csv(file_path)

        yield EventMetadataEntry.int(obj.shape[0], label="number of rows")
        yield EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean")

    def get_output_asset_key(self, context):
        file_path = os.path.join("my_base_dir", context.step_key, context.name)
        return AssetKey(file_path)

    def get_output_asset_partitions(self, context):
        return set(context.config["partitions"])

Asset Lineage
Experimental
#

When an op output is linked to an AssetKey, Dagster can automatically generate lineage information that describes how this asset relates to other output-linked assets.

As a simplified example, imagine a two-op job that first scrapes some user data from an API, storing it to a table, then trains an ML model on that data, persisting it to a model store:

from dagster import op, job, AssetKey, Out


@op(out=Out(asset_key=AssetKey("my_db.users")))
def scrape_users():
    users_df = some_api_call()
    persist_to_db(users_df)
    return users_df


@op(out=Out(asset_key=AssetKey("ml_models.user_prediction")))
def get_prediction_model(users_df):
    my_ml_model = train_prediction_model(users_df)
    persist_to_model_store(my_ml_model)
    return my_ml_model


@job
def my_user_model_job():
    get_prediction_model(scrape_users())

In this case, it's certainly fair to say that this ML model, which we have assigned the key ml_models.user_prediction, depends on the table that we created, my_db.users (it uses the data in the table to train the model).

Why is that? By specifying the structure of your job, you have already defined data depedencies between these ops. By linking the output of scrape_users to the input of get_prediction_model, we can now infer that whatever this second op outputs will be some function of its input. Furthermore, since we have linked each of these outputs to external assets, we can use this knowledge to say that the asset associated with the output of get_prediction_model depends on the asset associated with the output of scrape_users.

This feature is still in its early stages, but for now, this lineage information is surfaced in the Asset Catalog page for each asset (Latest parent assets):

asset-lineage