IO Managers

IO Managers

IO managers are user-provided objects that store solid outputs and load them as inputs to downstream solids.

@dagster.io_manager(config_schema=None, description=None, output_config_schema=None, input_config_schema=None, required_resource_keys=None, version=None)[source]

Define an IO manager.

IOManagers are used to store solid outputs and load them as inputs to downstream solids.

The decorated function should accept an InitResourceContext and return an IOManager.

Parameters
  • config_schema (Optional[ConfigSchema]) – The schema for the resource config. Configuration data available in init_context.resource_config.

  • description (Optional[str]) – A human-readable description of the resource.

  • output_config_schema (Optional[ConfigSchema]) – The schema for per-output config.

  • input_config_schema (Optional[ConfigSchema]) – The schema for per-input config.

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the object manager.

  • version (Optional[str]) – (Experimental) The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.

Examples:

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        write_csv("some/path")

    def load_input(self, context):
        return read_csv("some/path")

@io_manager
def my_io_manager(init_context):
    return MyIOManager()

@solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager_key")])
def my_solid(_):
    return do_stuff()

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"my_io_manager_key": my_io_manager})]
)
def my_pipeline():
    my_solid()

execute_pipeline(my_pipeline)
class dagster.IOManager[source]

Base class for user-provided IO managers.

IOManagers are used to store solid outputs and load them as inputs to downstream solids.

Extend this class to handle how objects are loaded and stored. Users should implement handle_output to store an object and load_input to retrieve an object.

abstract handle_output(context, obj)[source]

User-defined method that stores an output of a solid.

Parameters
  • context (OutputContext) – The context of the step output that produces this object.

  • obj (Any) – The object, returned by the solid, to be stored.

abstract load_input(context)[source]

User-defined method that loads an input to a solid.

Parameters

context (InputContext) – The input context, which describes the input that’s being loaded and the upstream output that’s being loaded from.

Returns

The data object.

Return type

Any

class dagster.IOManagerDefinition(resource_fn=None, config_schema=None, description=None, required_resource_keys=None, version=None, input_config_schema=None, output_config_schema=None)[source]

Definition of an IO manager resource.

IOManagers are used to store solid outputs and load them as inputs to downstream solids.

An IOManagerDefinition is a ResourceDefinition whose resource_fn returns an IOManager.

The easiest way to create an IOManagerDefnition is with the @io_manager decorator.

property input_config_schema

The schema for per-input configuration for inputs that are managed by this input manager

property output_config_schema

The schema for per-output configuration for outputs that are managed by this manager

Input and Output Contexts

class dagster.InputContext[source]

The context object available to the load_input method of RootInputManager.

name

The name of the input that we’re loading.

Type

Optional[str]

pipeline_name

The name of the pipeline.

Type

str

solid_def

The definition of the solid that’s loading the input.

Type

Optional[SolidDefinition]

config

The config attached to the input that we’re loading.

Type

Optional[Any]

metadata

A dict of metadata that is assigned to the InputDefinition that we’re loading for.

Type

Optional[Dict[str, Any]]

upstream_output

Info about the output that produced the object we’re loading.

Type

Optional[OutputContext]

dagster_type

The type of this input.

Type

Optional[DagsterType]

log

The log manager to use for this input.

Type

Optional[DagsterLogManager]

resource_config

The config associated with the resource that initializes the RootInputManager.

Type

Optional[Dict[str, Any]]

resources

The resources required by the resource that initializes the input manager. If using the @root_input_manager() decorator, these resources correspond to those requested with the required_resource_keys parameter.

Type

ScopedResources

class dagster.OutputContext[source]

The context object that is available to the handle_output method of an IOManager.

step_key

The step_key for the compute step that produced the output.

Type

str

name

The name of the output that produced the output.

Type

str

pipeline_name

The name of the pipeline definition.

Type

str

run_id

The id of the run that produced the output.

Type

Optional[str]

metadata

A dict of the metadata that is assigned to the OutputDefinition that produced the output.

Type

Optional[Dict[str, Any]]

mapping_key

The key that identifies a unique mapped output. None for regular outputs.

Type

Optional[str]

config

The configuration for the output.

Type

Optional[Any]

solid_def

The definition of the solid that produced the output.

Type

Optional[SolidDefinition]

dagster_type

The type of this output.

Type

Optional[DagsterType]

log

The log manager to use for this output.

Type

Optional[DagsterLogManager]

version

(Experimental) The version of the output.

Type

Optional[str]

resources

The resources required by the output manager, specified by the required_resource_keys parameter.

Type

Optional[ScopedResources]

get_run_scoped_output_identifier() → List[str][source]

Utility method to get a collection of identifiers that as a whole represent a unique step output.

The unique identifier collection consists of

  • run_id: the id of the run which generates the output.

    Note: This method also handles the re-execution memoization logic. If the step that generates the output is skipped in the re-execution, the run_id will be the id of its parent run.

  • step_key: the key for a compute step.

  • name: the name of the output. (default: ‘result’).

Returns

A list of identifiers, i.e. run id, step key, and output name

Return type

List[str, ..]

Built-in IO Managers

dagster.mem_io_manager IOManagerDefinition[source]

Built-in IO manager that stores and retrieves values in memory.

dagster.fs_io_manager IOManagerDefinition[source]

Built-in filesystem IO manager that stores and retrieves values using pickling.

It allows users to specify a base directory where all the step outputs will be stored. It serializes and deserializes output values using pickling and automatically constructs the filepaths for the assets.

Example usage:

1. Specify a pipeline-level IO manager using the reserved resource key "io_manager", which will set the given IO manager on all solids across a pipeline.

@solid
def solid_a(context, df):
    return df

@solid
def solid_b(context, df):
    return df[:5]

@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def pipe():
    solid_b(solid_a())

2. Specify IO manager on OutputDefinition, which allows the user to set different IO managers on different step outputs.

@solid(output_defs=[OutputDefinition(io_manager_key="my_io_manager")])
def solid_a(context, df):
    return df

@solid
def solid_b(context, df):
    return df[:5]

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"my_io_manager": fs_io_manager})]
)
def pipe():
    solid_b(solid_a())
dagster.custom_path_fs_io_manager IOManagerDefinition[source]

Built-in IO manager that allows users to custom output file path per output definition.

It requires users to specify a base directory where all the step output will be stored in. It serializes and deserializes output values (assets) using pickling and stores the pickled object in the user-provided file paths.

Example usage:

@solid(
    output_defs=[
        OutputDefinition(
            io_manager_key="io_manager", metadata={"path": "path/to/sample_output"}
        )
    ]
)
def sample_data(context, df):
    return df[:5]

my_custom_path_fs_io_manager = custom_path_fs_io_manager.configured(
    {"base_dir": "path/to/basedir"}
)

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"io_manager": my_custom_path_fs_io_manager})],
)
def pipe():
    sample_data()

Root Input Managers (Experimental)

Root input managers are user-provided objects that specify how to load inputs that aren’t connected to upstream outputs.

@dagster.root_input_manager(config_schema=None, description=None, input_config_schema=None, required_resource_keys=None, version=None)[source]

Define a root input manager.

Root input managers load solid inputs that aren’t connected to upstream outputs.

The decorated function should accept a InputContext and resource config, and return a loaded object that will be passed into one of the inputs of a solid.

The decorator produces an RootInputManagerDefinition.

Parameters
  • config_schema (Optional[ConfigSchema]) – The schema for the resource-level config.

  • description (Optional[str]) – A human-readable description of the resource.

  • input_config_schema (Optional[ConfigSchema]) – A schema for the input-level config. Each input that uses this input manager can be configured separately using this config.

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the input manager.

  • version (Optional[str]) – (Experimental) the version of the input manager definition.

Examples:

@root_input_manager
def csv_loader(_):
    return read_csv("some/path")

@solid(input_defs=[InputDefinition("input1", root_manager_key="csv_loader_key")])
def my_solid(_, input1):
    do_stuff(input1)

@pipeline(mode_defs=[ModeDefinition(resource_defs={"csv_loader_key": csv_loader})])
def my_pipeline():
    my_solid()

@root_input_manager(config_schema={"base_dir": str})
def csv_loader(context):
    return read_csv(context.resource_config["base_dir"] + "/some/path")

@root_input_manager(input_config_schema={"path": str})
def csv_loader(context):
    return read_csv(context.config["path"])
class dagster.RootInputManager[source]

RootInputManagers are used to load inputs to solids at the root of a pipeline.

The easiest way to define an RootInputManager is with the @root_input_manager decorator.

abstract load_input(context)[source]

The user-defined read method that loads data given its metadata.

Parameters

context (InputContext) – The context of the step output that produces this asset.

Returns

The data object.

Return type

Any

class dagster.RootInputManagerDefinition(resource_fn=None, config_schema=None, description=None, input_config_schema=None, required_resource_keys=None, version=None)[source]

Definition of a root input manager resource.

Root input managers load solid inputs that aren’t connected to upstream outputs.

An RootInputManagerDefinition is a ResourceDefinition whose resource_fn returns an RootInputManager.

The easiest way to create an RootInputManagerDefinition is with the @root_input_manager decorator.

property input_config_schema

The schema for per-input configuration for inputs that are managed by this input manager