Configuration

Run Config

Several dimensions of pipeline execution can be determined at execution time through configuration. We call this set of chosen values run config. The run config is passed as a dictionary in the python api or as a yaml document when using dagit or the CLI. The following top level keys in the run config allow you to configure different aspects:

  • execution: Determine and configure the Executor to be used to control execution of the pipeline.

  • storage: Determine and configure the SystemStorageDefinition to be used to control how data is persisted as it is handed off from solid to solid.

  • loggers : Determine and configure the LoggerDefinition to be used when logging.

  • solids : Configure solids that belong to the pipeline. In addition to providing values for solid specific configuration, inputs may also be configured here, when dependencies on upstream solids outputs have not been set in the pipeline.

  • resources : Configure resources that belong to the pipeline that have defined configuration schema.

Configuration Schema

The Dagster library includes a system for defining the schema that configuration values must abide by. The most common objects to specify ConfigSchema for are SolidDefinition and ResourceDefinition.

The following example shows how config_schema can be used on a solid to control its behavior:

from dagster import Field, execute_pipeline, pipeline, solid


@solid(
    config_schema={
        # can just use the expected type as short hand
        "iterations": int,
        # otherwise use Field for optionality, defaults, and descriptions
        "word": Field(str, is_required=False, default_value="hello"),
    }
)
def config_example_solid(context):
    for _ in range(context.solid_config["iterations"]):
        context.log.info(context.solid_config["word"])


@pipeline
def config_example_pipeline():
    config_example_solid()


def run_bad_example():
    # This run will fail to start since there is required config not provided
    return execute_pipeline(config_example_pipeline, run_config={})


def run_other_bad_example():
    # This will also fail to start since iterations is the wrong type
    execute_pipeline(
        config_example_pipeline,
        run_config={"solids": {"config_example_solid": {"config": {"iterations": "banana"}}}},
    )


def run_good_example():
    return execute_pipeline(
        config_example_pipeline,
        run_config={"solids": {"config_example_solid": {"config": {"iterations": 1}}}},
    )

Configured

In many cases, the option to configure an entity at runtime is more distracting than helpful, and it's preferable to supply the entity's configuration at definition time.

The configured API offers a way to do this. When invoked on a ResourceDefinition, ExecutorDefinition, SolidDefinition, CompositeSolidDefinition, LoggerDefinition, IntermediateStorageDefinition, or SystemStorageDefinition, it returns an interchangeable object with the given configuration "baked in".

from dagster import resource


@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
    """Connect to S3"""


east_unsigned_s3_session = s3_session.configured(
    {"region": "us-east-1", "use_unsigned_session": False}
)

In other cases, it's useful to partially fill out configuration at definition time and leave other configuration for runtime. For these cases, configured can be used as a decorator, accepting a function that translates from runtime config to config that satisfies the entity's config schema. It returns an entity with the "outer" config schema as its schema.

from dagster import configured, resource


@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
    """Connect to S3"""


@configured(s3_session, config_schema={"region": str})
def unsigned_s3_session(config):
    return {"region": config["region"], "use_unsigned_session": False}

The configured API can be used with any definition type in the same way. To configure a solid, for example, simply invoke configured on the solid definition:

from dagster import Field, configured, solid


@solid(
    config_schema={"iterations": int, "word": Field(str, is_required=False, default_value="hello")}
)
def example_solid(context):
    for _ in range(context.solid_config["iterations"]):
        context.log.info(context.solid_config["word"])


# This example is fully configured. With this syntax, a name must be explicitly provided.
configured_example = configured(example_solid, name="configured_example")(
    {"iterations": 6, "word": "wheaties"}
)

# This example is partially configured: `iterations` is passed through
# The decorator yields a solid named 'another_configured_example' (from the decorated function)
# with `int` as the `config_schema`.
@configured(example_solid, int)
def another_configured_example(config):
    return {"iterations": config, "word": "wheaties"}

When using the decorator syntax (@configured), the resulting solid definition will inherit the name of the function being decorated (like another_configured_example in the above example). When configuring a solid completely with a config dictionary rather than with a function (as with configured_example), you must add a keyword argument name in the call to configured. When naming solids, remember that solid definitions must have unique names within a repository or pipeline.

from dagster import Field, InputDefinition, Int, List, configured, execute_pipeline, pipeline, solid


@solid(
    config_schema={"is_sample": Field(bool, is_required=False, default_value=False),},
    input_defs=[InputDefinition("xs", List[Int])],
)
def variance(context, xs):
    n = len(xs)
    mean = sum(xs) / n
    summed = sum((mean - x) ** 2 for x in xs)
    result = summed / (n - 1) if context.solid_config["is_sample"] else summed / n
    return result ** (1 / 2)


# If we want to use the same solid configured in multiple ways in the same pipeline,
# we have to specify unique names when configuring them:
sample_variance = configured(variance, name="sample_variance")({"is_sample": True})
population_variance = configured(variance, name="population_variance")({"is_sample": False})


@pipeline
def stats_pipeline():
    sample_variance()
    population_variance()


def run_pipeline():
    result = execute_pipeline(
        stats_pipeline,
        {
            "solids": {
                "sample_variance": {"inputs": {"xs": [4, 8, 15, 16, 23, 42]}},
                "population_variance": {
                    "inputs": {"xs": [33, 30, 27, 29, 32, 30, 27, 28, 30, 30, 30, 31]}
                },
            }
        },
    )