Ask AI

Run Configuration
Legacy
#

This guide covers using legacy APIs for the Dagster config system. For docs on the new Pythonic config system introduced in Dagster 1.3, see the updated configuration guide. To migrate your code, refer to the migrating to Pythonic resources and config guide.

Run configuration allows providing parameters to jobs at the time they're executed.

It's often useful to provide user-chosen values to Dagster jobs or software-defined assets at runtime. For example, you might want to choose what dataset an op runs against, or provide a connection URL for a database resource. Dagster exposes this functionality through a configuration API.

Various Dagster entities (ops, assets, resources) can be individually configured. When launching a job that executes (ops), materializes (assets), or instantiates (resources) a configurable entity, you can provide run configuration for each entity. Within the function that defines the entity, you can access the passed-in configuration off of the context. Typically, the provided run configuration values correspond to a configuration schema attached to the op/asset/resource definition. Dagster validates the run configuration against the schema and proceeds only if validation is successful.

A common use of configuration is for a schedule or sensor to provide configuration to the job run it is launching. For example, a daily schedule might provide the day it's running on to one of the ops as a config value, and that op might use that config value to decide what day's data to read.


Relevant APIs#

NameDescription
ConfigSchemaSee details with code examples in the API documentation.

Defining and accessing configuration for an op, asset, or resource#

Configurable parameters accepted by an op, asset, or resource are specified by providing a config_schema to the corresponding decorator. The structure of a config_schema is flexible and fully documented in the API Reference. However, most of the time you will want to provide a Python dictionary, with keys the names of parameters and values the types of those parameters.

During execution, the specified parameters are accessible within the body of the op/asset/resource under context.op_config (for ops/assets) or context.resource_config (for resources). It might seem confusing that asset config is accessed under context.op_config instead of context.asset_config. However, assets are wrappers for ops, so when we access asset config we are literally just accessing config for the underlying op.

Below we define a simple op and asset with identical config_schemas defining a single configurable parameter, person_name, as well as a resource with a configurable url parameter:

@op(config_schema={"person_name": str})
def op_using_config(context: OpExecutionContext):
    return f'hello {context.op_config["person_name"]}'


@asset(config_schema={"person_name": str})
def asset_using_config(context: AssetExecutionContext):
    # Note how asset config is accessed with context.op_execution_context.op_config
    return f'hello {context.op_execution_context.op_config["person_name"]}'


@resource(config_schema={"url": str})
def resource_using_config(context: InitResourceContext):
    return MyDatabaseConnection(context.resource_config["url"])
It is technically possible to access context.op_config inside ops (not assets) without defining a config_schema. However, this is not recommended.

You can also build config into jobs.


Specifying runtime configuration#

If you want to execute op_using_config or materialize asset_using_config, we'll need to provide values for the parameters specified in config_schema. How we provide these values depends on the interface we are using:

Python#

From the Python API, we can use the run_config argument for JobDefinition.execute_in_process or materialize. This takes a dictionary where configuration values for ops/assets are specified under ops.<op_or_asset_name>.config (for resources under resources.<resource_name>.config):

@job
def example_job():
    op_using_config()

job_result = example_job.execute_in_process(
    run_config={"ops": {"op_using_config": {"config": {"person_name": "Alice"}}}}
)

asset_result = materialize(
    [asset_using_config],
    run_config={
        "ops": {"asset_using_config": {"config": {"person_name": "Alice"}}}
    },
)

Dagster UI#

From the UI's Launchpad, we supply config as YAML using the config editor. Refer to the config schema guide for using runtime config in the Dagster UI.

Command line#

Refer to the config schema guide for details on using config with Dagster's CLI.


Validation#

Dagster validates any provided run config against the corresponding config schemas. It will abort execution with a DagsterInvalidConfigError if validation fails. For example, both of the following will fail, because there is no nonexistent_config_value in the config schema:

@job
def example_job():
    op_using_config()

op_result = example_job.execute_in_process(
    run_config={
        "ops": {"op_using_config": {"config": {"nonexistent_config_value": 1}}}
    }
)

asset_result = materialize(
    [asset_using_config],
    run_config={
        "ops": {"asset_using_config": {"config": {"nonexistent_config_value": 1}}}
    },
)

Examples#

Passing secrets as configuration#

A common use case for configuration is passing secrets to connect to external services. Resources, which can be used to model connections to external services, accept secrets as configuration values. These secrets can be read from your environment variables:

from dagster import (
    InitResourceContext,
    OpExecutionContext,
    StringSource,
    job,
    op,
    resource,
)


@resource(config_schema={"username": StringSource, "password": StringSource})
def database_client(context: InitResourceContext):
    username = context.resource_config["username"]
    password = context.resource_config["password"]
    ...


@op(required_resource_keys={"database"})
def get_one(context: OpExecutionContext):
    context.resources.database.execute_query("SELECT 1")


@job(
    resource_defs={
        "database": database_client.configured(
            {
                "username": {"env": "SYSTEM_USER"},
                "password": {"env": "SYSTEM_PASSWORD"},
            }
        )
    }
)
def get_one_from_db():
    get_one()

By providing secrets through environment variables, your secrets won't be visible in your code or the UI's launchpad. Refer to the Using environment variables and secrets in Dagster code guide for more info and examples.

The Configured API page features an example of providing per-environment configuration.

Passing configuration to multiple ops in a job#

If you want multiple ops to share values, You can use make_values_resource to pass the values via a resource and reference that resource from any op that needs it.

It defaults to Any type, meaning Dagster will accept any config value provided for the resource:

@op(required_resource_keys={"file_dir"})
def add_file(context: OpExecutionContext):
    filename = f"{context.resources.file_dir}/new_file.txt"
    open(filename, "x", encoding="utf8").close()

    context.log.info(f"Created file: {filename}")


@op(required_resource_keys={"file_dir"})
def total_num_files(context: OpExecutionContext):
    files_in_dir = os.listdir(context.resources.file_dir)
    context.log.info(f"Total number of files: {len(files_in_dir)}")


@job(resource_defs={"file_dir": make_values_resource()})
def file_dir_job():
    add_file()
    total_num_files()

You can then specify your config under the file_dir resource:

result = file_dir_job.execute_in_process(
    run_config={"resources": {"file_dir": {"config": "/my_files/"}}}
)

Alternatively, if you want to provide different config values for each op within a job, you can also specify the schema of the values like:

@op(required_resource_keys={"file_dirs"})
def write_file(context: OpExecutionContext):
    filename = f"{context.resources.file_dirs['write_file_dir']}/new_file.txt"
    open(filename, "x", encoding="utf8").close()

    context.log.info(f"Created file: {filename}")


@op(required_resource_keys={"file_dirs"})
def total_num_files(context: OpExecutionContext):
    files_in_dir = os.listdir(context.resources.file_dirs["count_file_dir"])
    context.log.info(f"Total number of files: {len(files_in_dir)}")


@job(
    resource_defs={
        "file_dirs": make_values_resource(write_file_dir=str, count_file_dir=str)
    }
)
def file_dirs_job():
    write_file()
    total_num_files()

You can specify run config like so:

result = file_dirs_job.execute_in_process(
    run_config={
        "resources": {
            "file_dirs": {
                "config": {
                    "write_file_dir": "/write_files/",
                    "count_file_dir": "/count_files/",
                }
            }
        }
    }
)

Or pass these values via a YAML file:

resources:
  file_dirs:
    config:
      write_file_dir: /write_files/
      count_file_dir: /count_files/

See it in action#

For more examples of jobs, check out the following in our Hacker News example: