Asset Stores (Experimental)

Dagster solids have inputs and outputs. When a solid produces an output, where and how is it stored? AssetStores let the developer decide.

AssetStores are user-provided objects that know how to store solid outputs and retrieve solid inputs. Each solid output can have its own AssetStore, or multiple solid outputs can share an AssetStore. The AssetStore that’s used for storing a particular solid output is automatically used for retrieving it in subsequent solids.

I.e. an AssetStore handles the teal boxes:

The default AssetStore, mem_asset_store, stores outputs in memory, but this only works for the single process executor. Dagster provides out-of-the-box AssetStores that pickle objects and save them to a local filesystem, Amazon S3, Azure ADLS, or GCS.

AssetStores are resources, which means users can supply different AssetStores for the same solid outputs in different situations. For example, you might use an in-memory AssetStore for unit-testing a pipeline and an S3 AssetStore in production.

Setting a pipeline-wide asset store

By default, all the outputs in a pipeline use the same AssetStore. This AssetStore is determined by the ResourceDefinition provided for the "asset_store" resource key. "asset_store" is a resource key that Dagster reserves specifically for this purpose.

Here’s how to specify that all solid outputs are stored using the fs_asset_store, which pickles outputs and stores them on the local filesystem. It stores files in a directory with the run ID in the path, so that outputs from prior runs will never be overwritten.

default_asset_store.py
from dagster import ModeDefinition, fs_asset_store, pipeline, solid


@solid
def solid1(_):
    return 1


@solid
def solid2(_, a):
    return a + 1


@pipeline(mode_defs=[ModeDefinition(resource_defs={"asset_store": fs_asset_store})])
def my_pipeline():
    solid2(solid1())

Providing a custom asset store

If you have specific requirements for where and how your outputs should be stored and retrieved, you can define your own AssetStore. For example, if your solids output Pandas DataFrames to populate tables in a data warehouse, you might write the following:

custom_asset_store.py
from dagster import AssetStore, ModeDefinition, pipeline, resource


class MyAssetStore(AssetStore):
    def set_asset(self, context, obj):
        # output_name is the name given to the OutputDefinition that we're storing for
        table_name = context.output_name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def get_asset(self, context):
        # output_name is the name given to the OutputDefinition that we're retrieving for
        table_name = context.output_name
        return read_dataframe_from_table(name=table_name)


@resource
def my_asset_store(_):
    return MyAssetStore()


@pipeline(mode_defs=[ModeDefinition(resource_defs={"asset_store": my_asset_store})])
def my_pipeline():
    solid2(solid1())

The provided context argument is an AssetStoreContext. The API documentation for AssetStoreContext lists all the fields that are available to set_asset and get_asset implementations.

Selecting an AssetStore per output

Not all the outputs in a pipeline should necessarily be stored the same way. Maybe some of the outputs are Pandas DataFrames that should live in tables, and others are random Python objects that should be pickled on the filesystem.

To select the AssetStore for a particular output, you can set an asset_store_key on the OutputDefinition, and then refer to that asset_store_key when setting asset stores in your ModeDefinition. In this example, the output of solid1 will go to my_asset_store and the output of solid2 will go to fs_asset_store.

asset_store_per_output.py
from dagster import ModeDefinition, OutputDefinition, fs_asset_store, pipeline, solid


@solid(output_defs=[OutputDefinition(asset_store_key="db_asset_store")])
def solid1(_):
    """Return a Pandas DataFrame"""


@solid
def solid2(_, _input_dataframe):
    """Return some object"""


@pipeline(
    mode_defs=[
        ModeDefinition(
            resource_defs={
                "asset_store": fs_asset_store,
                "db_asset_store": my_asset_store,  # defined in code snippet above
            }
        )
    ]
)
def my_pipeline():
    solid2(solid1())

Providing per-output metadata to the asset store

You might want to provide metadata that controls how particular outputs are stored.

For example, if your pipeline produces DataFrames to populate tables in a data warehouse, you might want to specify the table that each output goes to. To accomplish this, you can define asset_metadata on each OutputDefinition:

asset_metadata.py
@solid(
    output_defs=[OutputDefinition(asset_metadata={"schema": "some_schema", "table": "some_table"})]
)
def solid1(_):
    """Return a Pandas DataFrame"""


@solid(
    output_defs=[
        OutputDefinition(asset_metadata={"schema": "other_schema", "table": "other_table"})
    ]
)
def solid2(_, _input_dataframe):
    """Return a Pandas DataFrame"""

The AssetStore can then access this metadata when storing or retrieving data, via the AssetStoreContext.

asset_metadata.py
class MyAssetStore(AssetStore):
    def set_asset(self, context, obj):
        table_name = context.asset_metadata["table"]
        schema = context.asset_metadata["schema"]
        write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)

    def get_asset(self, context):
        table_name = context.asset_metadata["table"]
        schema = context.asset_metadata["schema"]
        return read_dataframe_from_table(name=table_name, schema=schema)


@resource
def my_asset_store(_):
    return MyAssetStore()