Asset Catalog

"Asset" is Dagster's word for an entity, external to solids, that is mutated or created by solids. An asset might be a table in a database that solids append to, an ML model in a model store that solids overwrite, or even slack channel that solids write messages to.

The Asset Catalog is an interface inside Dagit that centers on assets. Each entry in the catalog is an asset and includes:

  • Runs that mutated or created the asset
  • Metadata logged by developers about the asset

Developers place entries in the Asset Catalog by yielding AssetMaterialization events from inside their solids or output managers. The act of mutating or creating an asset is called a "materialization". AssetMaterialization events may include arbitrary metadata that describes the asset at the time of the materialization.

There are two general patterns for dealing with assets when using Dagster:

  • Materialize assets from inside the body of a solid.
  • Focus solid on pure business logic, and delegate the materialization of assets to IOManagers.

Recording Asset Materializations in Solids

To make the Asset Catalog aware that we materialized an asset in our solid, we can yield an AssetMaterialization event. This would involve changing the following solid:

materialization_solids.py
@solid
def my_simple_solid(_):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    return remote_storage_path

into something like this:

materialization_solids.py
@solid
def my_materialization_solid(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(asset_key="my_dataset", description="Persisted result to storage")
    yield Output(remote_storage_path)

Note: Our materialization solid must now explicitly yield an Output event instead of relying on the implicit conversion of the return value into an Output event.

We should now see a materialization event in the event log when we execute this solid, as well as an entry in the asset catalog.

Recording Asset Materializations in Output Managers

To record that an OutputManager or IOManager has mutated or created an asset, we can yield an AssetMaterialization event from its handle_output method.

materialization_asset_stores.py
class PandasCsvIOManager(IOManager):
    def load_input(self, context):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
        return pd.read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])

        obj.to_csv(file_path)

        yield AssetMaterialization(
            asset_key=AssetKey(file_path), description="Persisted result to storage."
        )

Attaching Metadata to the Asset Materialization

There are a variety of types of metadata that can be associated with a materialization event, all through the EventMetadataEntry class. Each materialization event optionally takes a list of metadata entries that are then displayed in the event log and the asset catalog.

Example with a solid:

materialization.py
@solid
def my_metadata_materialization_solid(context):
    df = read_df()
    remote_storage_path = persist_to_storage(df)
    yield AssetMaterialization(
        asset_key="my_dataset",
        description="Persisted result to storage",
        metadata_entries=[
            EventMetadataEntry.text("Text-based metadata for this event", label="text_metadata"),
            EventMetadataEntry.fspath(remote_storage_path),
            EventMetadataEntry.url("http://mycoolsite.com/url_for_my_data", label="dashboard_url"),
            EventMetadataEntry.float(calculate_bytes(df), "size (bytes)"),
        ],
    )
    yield Output(remote_storage_path)

Example with an IOManager:

materialization_asset_stores.py
class PandasCsvIOManagerWithMetadata(IOManager):
    def load_input(self, context):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])
        return pd.read_csv(file_path)

    def handle_output(self, context, obj):
        file_path = os.path.join(["my_base_dir", context.step_key, context.output_name])

        obj.to_csv(file_path)

        yield AssetMaterialization(
            asset_key=AssetKey(file_path),
            description="Persisted result to storage.",
            metadata_entries=[
                EventMetadataEntry.int(obj.shape[0], label="number of rows"),
                EventMetadataEntry.float(obj["some_column"].mean(), "some_column mean"),
            ],
        )

Check our API docs for EventMetadataEntry for more details on they types of event metadata available.