IO Managers

Dagster solids have inputs and outputs. When a solid produces an output, what happens to it? When a solid needs an input value, how is it loaded? IOManagers and RootInputManagers.

These APIs make it easy to separate code that's responsible for logical data transformation from code that's responsible for reading and writing the results. Solids can focus on business logic, while IO managers handle I/O. This separation makes it easier to test the business logic and run it in different environments.

IO managers

IOManagers are user-provided objects that are responsible for storing the output of a solid and loading it as input to downstream solids. For example, an IOManager might store and load objects from files on a filesystem. Each solid output can have its own IOManager, or multiple solid outputs can share an IOManager. The IOManager that's used for handling a particular solid output is automatically used for loading it in downstream solids.

I.e. an IOManager handles the teal boxes:

The default IOManager, mem_io_manager, stores outputs in memory, but this only works for the single process executor. Dagster provides out-of-the-box IOManagers that pickle objects and save them. These are fs_io_manager, s3_pickle_io_manager, adls2_pickle_io_manager, or gcs_pickle_io_manager.

IOManagers are resources, which means users can supply different IOManagers for the same solid outputs in different situations. For example, you might use an in-memory IOManager for unit-testing a pipeline and an S3IOManager in production.

Root input managers (experimental)

RootInputManagers are user-provided objects that are responsible for loading the inputs of solids whose inputs are not connected to an upstream solid's output.

Root input managers are currently an experimental feature.

Setting a pipeline-wide IO manager

By default, all the inputs and outputs in a pipeline use the same IOManager. This IOManager is determined by the ResourceDefinition provided for the "io_manager" resource key. "io_manager" is a resource key that Dagster reserves specifically for this purpose.

Here’s how to specify that all solid outputs are stored using the fs_io_manager, which pickles outputs and stores them on the local filesystem. It stores files in a directory with the run ID in the path, so that outputs from prior runs will never be overwritten.

default_io_manager.py
from dagster import ModeDefinition, fs_io_manager, pipeline, solid


@solid
def solid1(_):
    return 1


@solid
def solid2(_, a):
    return a + 1


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def my_pipeline():
    solid2(solid1())

Selecting an IO manager per output

Not all the outputs in a pipeline should necessarily be stored the same way. Maybe some of the outputs should live on the filesystem so they can be inspected, and others can be transiently stored in memory.

To select the IOManager for a particular output, you can set an io_manager_key on the OutputDefinition, and then refer to that io_manager_key when setting IO managers in your ModeDefinition. In this example, the output of solid1 will go to fs_io_manager and the output of solid2 will go to mem_io_manager.

io_manager_per_output.py
from dagster import ModeDefinition, OutputDefinition, fs_io_manager, mem_io_manager, pipeline, solid


@solid(output_defs=[OutputDefinition(io_manager_key="fs")])
def solid1(_):
    return 1


@solid(output_defs=[OutputDefinition(io_manager_key="mem")])
def solid2(_, a):
    return a + 1


@pipeline(mode_defs=[ModeDefinition(resource_defs={"fs": fs_io_manager, "mem": mem_io_manager})])
def my_pipeline():
    solid2(solid1())

Providing a custom IO manager

If you have specific requirements for where and how your outputs should be stored and retrieved, you can define your own IOManager. For example, if your solids produce Pandas DataFrames that populate tables in a data warehouse, you might write the following:

custom_io_manager.py
from dagster import IOManager, ModeDefinition, io_manager, pipeline


class DataframeTableIOManager(IOManager):
    def handle_output(self, context, obj):
        # name is the name given to the OutputDefinition that we're storing for
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        # upstream_output.name is the name given to the OutputDefinition that we're loading for
        table_name = context.upstream_output.name
        return read_dataframe_from_table(name=table_name)


@io_manager
def df_table_io_manager(_):
    return DataframeTableIOManager()


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": df_table_io_manager})])
def my_pipeline():
    solid2(solid1())

The io_manager decorator behaves nearly identically to the resource decorator. It yields an IOManagerDefinition, which is a subclass of ResourceDefinition that will produce an IOManager.

The provided context argument for handle_output is an OutputContext. The provided context argument for load_input is an InputContext. The linked API documentation lists all the fields that are available on these objects.

Providing per-output config to an IO manager

When launching a run, you might want to parameterize how particular outputs are stored.

For example, if your pipeline produces DataFrames to populate tables in a data warehouse, you might want to specify the table that each output goes to at run launch time.

To accomplish this, you can define an output_config_schema on the IO manager definition. The IOManager methods can access this config when storing or loading data, via the OutputContext.

output_config.py
class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.config["table"]
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        table_name = context.upstream_output.config["table"]
        return read_dataframe_from_table(name=table_name)


@io_manager(output_config_schema={"table": str})
def my_io_manager(_):
    return MyIOManager()

Then, when executing a pipeline, you can pass in this per-output config.

output_config.py
    @pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": my_io_manager})])
    def my_pipeline():
        solid2(solid1())

    execute_pipeline(
        my_pipeline,
        run_config={
            "solids": {
                "solid1": {"outputs": {"result": {"table": "table1"}}},
                "solid2": {"outputs": {"result": {"table": "table2"}}},
            }
        },
    )

Providing per-output metadata to an IO manager (experimental)

You might want to provide static metadata that controls how particular outputs are stored. You don't plan to change the metadata at runtime, so it makes more sense to attach it to a definition rather than expose it as a configuration option.

For example, if your pipeline produces DataFrames to populate tables in a data warehouse, you might want to specify that each output always goes to a particular table. To accomplish this, you can define metadata on each OutputDefinition:

metadata.py
@solid(output_defs=[OutputDefinition(metadata={"schema": "some_schema", "table": "some_table"})])
def solid1(_):
    """Return a Pandas DataFrame"""


@solid(output_defs=[OutputDefinition(metadata={"schema": "other_schema", "table": "other_table"})])
def solid2(_, _input_dataframe):
    """Return a Pandas DataFrame"""

The IOManager can then access this metadata when storing or retrieving data, via the OutputContext.

In this case, the table names are encoded in the pipeline definition. If, instead, you want to be able to set them at run time, the next section describes how.

metadata.py
class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.metadata["table"]
        schema = context.metadata["schema"]
        write_dataframe_to_table(name=table_name, schema=schema, dataframe=obj)

    def load_input(self, context):
        table_name = context.upstream_output.metadata["table"]
        schema = context.upstream_output.metadata["schema"]
        return read_dataframe_from_table(name=table_name, schema=schema)


@io_manager
def my_io_manager(_):
    return MyIOManager()

Providing an input manager for a root input (experimental)

When you have a solid at the beginning of a pipeline that operates on data from external source, you might wish to separate that I/O from your solid's business logic, in the same way you would with an IO manager if the solid were loading from an upstream output.

To accomplish this, you can define an RootInputManager.

root_input_manager.py
@solid(input_defs=[InputDefinition("dataframe", root_manager_key="my_root_manager")])
def my_solid(_, dataframe):
    """Do some stuff"""


@root_input_manager
def table1_loader(_):
    return read_dataframe_from_table(name="table1")


@pipeline(mode_defs=[ModeDefinition(resource_defs={"my_root_manager": table1_loader})])
def my_pipeline():
    my_solid()

Setting the root_manager_key on an InputDefinition controls how that input is loaded in pipelines where it has no upstream output.

The root_input_manager decorator behaves nearly identically to the resource decorator. It yields an RootInputManagerDefinition, which is a ResourceDefinition that will produce an RootInputManager.

Providing per-input config to a root input manager (experimental)

When launching a run, you might want to parameterize how particular root inputs are loaded.

To accomplish this, you can define an input_config_schema on the input manager definition. The load function can access this config when storing or loading data, via the InputContext.

config_input_manager.py
@root_input_manager(input_config_schema={"table_name": str})
def table_loader(context):
    return read_dataframe_from_table(name=context.config["table_name"])

Then, when executing a pipeline, you can pass in this per-input config.

config_input_manager.py
    @pipeline(mode_defs=[ModeDefinition(resource_defs={"my_root_manager": table_loader})])
    def my_pipeline():
        my_solid()

    execute_pipeline(
        my_pipeline,
        run_config={"solids": {"my_solid": {"inputs": {"dataframe": {"table_name": "table1"}}}}},
    )

Using a root input manager with subselection (experimental)

You might want to execute a subset of solids in your pipeline and control how the inputs of those solids are loaded. Root input managers also help in these situations, because the inputs at the beginning of the subset become the new "roots".

For example, you might have solid1 that normally produces a table that solid2 consumes. To debug solid2, you might want to run it on a different table than the one normally produced by solid1.

To accomplish this, you can set up the root_manager_key on solid2's InputDefinition to point to an input manager with the desired loading behavior. As before, setting the root_manager_key on an InputDefinition controls how that input is loaded when it has no upstream output.

subselection.py
@root_input_manager(input_config_schema={"table_name": str})
def my_root_input_manager(context):
    return read_dataframe_from_table(name=context.config["table_name"])


class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context):
        return read_dataframe_from_table(name=context.upstream_output.name)


@io_manager
def my_io_manager(_):
    return MyIOManager()


@solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")])
def solid1(_):
    """Do stuff"""


@solid(input_defs=[InputDefinition("dataframe", root_manager_key="my_root_input_manager")])
def solid2(_, dataframe):
    """Do stuff"""


@pipeline(
    mode_defs=[
        ModeDefinition(
            resource_defs={
                "my_io_manager": my_io_manager,
                "my_root_input_manager": my_root_input_manager,
            }
        )
    ]
)
def my_pipeline():
    solid2(solid1())

When running the full pipeline, solid2's input will be loaded using the IO manager on the output of solid1. When running the pipeline subset, solid2's input has no upstream output, so the input manager corresponding to its root_manager_key is used.

subselection.py
    execute_pipeline(
        my_pipeline,
        solid_selection=["solid2"],
        run_config={"solids": {"solid2": {"inputs": {"dataframe": {"table_name": "tableX"}}}}},
    )