Asset Stores (Experimental)

Asset Stores

Asset stores are user-provided objects that specify how to store step outputs and retrieve step inputs.

class dagster.AssetStore[source]

Base class for user-provided asset store.

Extend this class to handle asset operations. Users should implement set_asset to store a data object that can be tracked by the Dagster machinery and get_asset to retrieve a data object.

abstract get_asset(context)[source]

The user-defined read method that loads data given its metadata.

Parameters

context (AssetStoreContext) – The context of the step output that produces this asset.

Returns

The data object.

Return type

Any

abstract set_asset(context, obj)[source]

The user-definied write method that stores a data object.

Parameters
  • context (AssetStoreContext) – The context of the step output that produces this asset.

  • obj (Any) – The data object to be stored.

Asset Stores as Resources

Asset stores are used as resources, which enables users to supply different asset stores for the same solid outputs in different modes.

dagster.mem_asset_store()[source]
dagster.fs_asset_store()[source]

Built-in filesystem asset store that stores and retrieves values using pickling.

It allows users to specify a base directory where all the step output will be stored in. It serializes and deserializes output values (assets) using pickling and automatically constructs the filepaths for the assets.

Example usage:

1. Specify a pipeline-level asset store using the reserved resource key "asset_store", which will set the given asset store on all solids across a pipeline.

@solid
def solid_a(context, df):
    return df

@solid
def solid_b(context, df):
    return df[:5]

@pipeline(mode_defs=[ModeDefinition(resource_defs={"asset_store": fs_asset_store})])
def pipe():
    solid_b(solid_a())

2. Specify asset store on OutputDefinition, which allows the user to set different asset stores on different step outputs.

@solid(output_defs=[OutputDefinition(asset_store_key="my_asset_store")])
def solid_a(context, df):
    return df

@solid
def solid_b(context, df):
    return df[:5]

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"my_asset_store": fs_asset_store})]
)
def pipe():
    solid_b(solid_a())
dagster.custom_path_fs_asset_store()[source]

Built-in asset store that allows users to custom output file path per output definition.

It also allows users to specify a base directory where all the step output will be stored in. It serializes and deserializes output values (assets) using pickling and stores the pickled object in the user-provided file paths.

Example usage:

@solid(
    output_defs=[
        OutputDefinition(
            asset_store_key="asset_store", asset_metadata={"path": "path/to/sample_output"}
        )
    ]
)
def sample_data(context, df):
    return df[:5]

@pipeline(
    mode_defs=[
        ModeDefinition(resource_defs={"asset_store": custom_path_fs_asset_store}),
    ],
)
def pipe():
    sample_data()