Configured API
In many cases, the option to configure an entity at runtime is more distracting than helpful, and it's preferable to supply the entity's configuration at definition time.
The configured
API offers a way to do this. When invoked on a ResourceDefinition
, ExecutorDefinition
, SolidDefinition
, CompositeSolidDefinition
, LoggerDefinition
,
it returns an interchangeable object with the given configuration "baked in".
from dagster import resource
@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
"""Connect to S3"""
east_unsigned_s3_session = s3_session.configured(
{"region": "us-east-1", "use_unsigned_session": False}
)
In other cases, it's useful to partially fill out configuration at definition time and leave other
configuration for runtime. For these cases, configured
can be used as a decorator, accepting a
function that translates from runtime config to config that satisfies the entity's config schema.
It returns an entity with the "outer" config schema as its schema.
from dagster import configured, resource
@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
"""Connect to S3"""
@configured(s3_session, config_schema={"region": str})
def unsigned_s3_session(config):
return {"region": config["region"], "use_unsigned_session": False}
The configured
API can be used with any definition type in the same way. To configure a solid, for
example, simply invoke configured
on the solid definition:
from dagster import Field, configured, solid
@solid(
config_schema={"iterations": int, "word": Field(str, is_required=False, default_value="hello")}
)
def example_solid(context):
for _ in range(context.solid_config["iterations"]):
context.log.info(context.solid_config["word"])
# This example is fully configured. With this syntax, a name must be explicitly provided.
configured_example = configured(example_solid, name="configured_example")(
{"iterations": 6, "word": "wheaties"}
)
# This example is partially configured: `iterations` is passed through
# The decorator yields a solid named 'another_configured_example' (from the decorated function)
# with `int` as the `config_schema`.
@configured(example_solid, int)
def another_configured_example(config):
return {"iterations": config, "word": "wheaties"}
When using the decorator syntax (@configured
), the resulting solid definition will inherit the
name of the function being decorated (like another_configured_example
in the above example). When
configuring a solid completely with a config dictionary rather than with a function (as with
configured_example
), you must add a keyword argument name
in the call to configured
.
When naming solids, remember that solid definitions must have unique names within a repository or
pipeline.
from dagster import Field, InputDefinition, Int, List, configured, execute_pipeline, pipeline, solid
# start_configured_named
@solid(
config_schema={"is_sample": Field(bool, is_required=False, default_value=False),},
input_defs=[InputDefinition("xs", List[Int])],
)
def variance(context, xs):
n = len(xs)
mean = sum(xs) / n
summed = sum((mean - x) ** 2 for x in xs)
result = summed / (n - 1) if context.solid_config["is_sample"] else summed / n
return result ** (1 / 2)
# If we want to use the same solid configured in multiple ways in the same pipeline,
# we have to specify unique names when configuring them:
sample_variance = configured(variance, name="sample_variance")({"is_sample": True})
population_variance = configured(variance, name="population_variance")({"is_sample": False})
@pipeline
def stats_pipeline():
sample_variance()
population_variance()
# end_configured_named
def run_pipeline():
result = execute_pipeline(
stats_pipeline,
{
"solids": {
"sample_variance": {"inputs": {"xs": [4, 8, 15, 16, 23, 42]}},
"population_variance": {
"inputs": {"xs": [33, 30, 27, 29, 32, 30, 27, 28, 30, 30, 30, 31]}
},
}
},
)
return result