Run configuration allows providing parameters to jobs at the time they're executed.
It's often useful to provide user-chosen values to Dagster jobs or asset definitions at runtime. For example, you might want to choose what dataset an op runs against, or provide a connection URL for a database resource. Dagster exposes this functionality through a configuration API.
Various Dagster entities (ops, assets, resources) can be individually configured. When launching a job that executes (ops), materializes (assets), or instantiates (resources) a configurable entity, you can provide run configuration for each entity. Within the function that defines the entity, you can access the passed-in configuration off of the context. Typically, the provided run configuration values correspond to a configuration schema attached to the op/asset/resource definition. Dagster validates the run configuration against the schema and proceeds only if validation is successful.
A common use of configuration is for a schedule or sensor to provide configuration to the job run it is launching. For example, a daily schedule might provide the day it's running on to one of the ops as a config value, and that op might use that config value to decide what day's data to read.
See details with code examples in the API documentation.
Defining and accessing configuration for an op, asset, or resource#
Configurable parameters accepted by an op, asset, or resource are specified by providing a config_schema to the corresponding decorator. The structure of a config_schema is flexible and fully documented in the API Reference. However, most of the time you will want to provide a Python dictionary, with keys the names of parameters and values the types of those parameters.
During execution, the specified parameters are accessible within the body of the op/asset/resource under context.op_config (for ops/assets) or context.resource_config (for resources). It might seem confusing that asset config is accessed under context.op_config instead of context.asset_config. However, assets are wrappers for ops, so when we access asset config we are literally just accessing config for the underlying op.
Below we define a simple op and asset with identical config_schemas defining a single configurable parameter, person_name, as well as a resource with a configurable url parameter:
@op(config_schema={"person_name":str})defop_using_config(context: OpExecutionContext):returnf'hello {context.op_config["person_name"]}'@asset(config_schema={"person_name":str})defasset_using_config(context: AssetExecutionContext):# Note how asset config is accessed with context.op_execution_context.op_configreturnf'hello {context.op_execution_context.op_config["person_name"]}'@resource(config_schema={"url":str})defresource_using_config(context: InitResourceContext):return MyDatabaseConnection(context.resource_config["url"])
It is technically possible to access context.op_config inside ops (not assets) without defining a config_schema. However, this is not recommended.
If you want to execute op_using_config or materialize asset_using_config, we'll need to provide values for the parameters specified in config_schema. How we provide these values depends on the interface we are using:
From the Python API, we can use the run_config argument for JobDefinition.execute_in_process or materialize. This takes a dictionary where configuration values for ops/assets are specified under ops.<op_or_asset_name>.config (for resources under resources.<resource_name>.config):
From the UI's Launchpad, we supply config as YAML using the config editor. Refer to the config schema guide for using runtime config in the Dagster UI.
Dagster validates any provided run config against the corresponding config schemas. It will abort execution with a DagsterInvalidConfigError if validation fails. For example, both of the following will fail, because there is no nonexistent_config_value in the config schema:
A common use case for configuration is passing secrets to connect to external services. Resources, which can be used to model connections to external services, accept secrets as configuration values. These secrets can be read from your environment variables:
If you want multiple ops to share values, You can use make_values_resource to pass the values via a resource and reference that resource from any op that needs it.
It defaults to Any type, meaning Dagster will accept any config value provided for the resource:
@op(required_resource_keys={"file_dir"})defadd_file(context: OpExecutionContext):
filename =f"{context.resources.file_dir}/new_file.txt"open(filename,"x", encoding="utf8").close()
context.log.info(f"Created file: {filename}")@op(required_resource_keys={"file_dir"})deftotal_num_files(context: OpExecutionContext):
files_in_dir = os.listdir(context.resources.file_dir)
context.log.info(f"Total number of files: {len(files_in_dir)}")@job(resource_defs={"file_dir": make_values_resource()})deffile_dir_job():
add_file()
total_num_files()
You can then specify your config under the file_dir resource:
result = file_dir_job.execute_in_process(
run_config={"resources":{"file_dir":{"config":"/my_files/"}}})
Alternatively, if you want to provide different config values for each op within a job, you can also specify the schema of the values like:
result = file_dirs_job.execute_in_process(
run_config={"resources":{"file_dirs":{"config":{"write_file_dir":"/write_files/","count_file_dir":"/count_files/",}}}})