Config Mapping

You can find the code for this example on Github.

This example demonstrates how to use config mapping in Dagster to simplify complicated solid config schemas. Imagine you are launching many different invocations of a Spark job with similar cluster configuration, you might need to write something like:

solids:
  solid_a:
    config:
      driver_cores: 2
      driver_memory: "4g"
      num_executors: 4
      executor_cores: 4
      executor_memory: "8g"
      name: "job_a"
      args: ["--record-src", "foo"]

  solid_b:
    config:
      driver_cores: 2
      driver_memory: "4g"
      num_executors: 4
      executor_cores: 4
      executor_memory: "8g"
      name: "job_b"
      args: ["--record-src", "bar"]
  ...

As you can see, most of the configuration remains unchanged for solid_a and solid_b, and this can get really tedious if you have hundreds of copies.

With config mapping, you can create a @composite_solid to wrap your complicated solid(s), pin the shared config, and only expose name to users of the composite solid:

repo.py
from dagster import Field, Shape, composite_solid, pipeline, repository, seven, solid


@solid(
    config_schema={
        "cluster_cfg": Shape(
            {
                "num_mappers": Field(int),
                "num_reducers": Field(int),
                "master_heap_size_mb": Field(int),
                "worker_heap_size_mb": Field(int),
            }
        ),
        "name": Field(str),
    }
)
def hello(context):
    context.log.info(seven.json.dumps(context.solid_config["cluster_cfg"]))
    return "Hello, %s!" % context.solid_config["name"]


def config_mapping_fn(cfg):
    return {
        "hello": {
            "config": {
                "cluster_cfg": {
                    "num_mappers": 100,
                    "num_reducers": 20,
                    "master_heap_size_mb": 1024,
                    "worker_heap_size_mb": 8192,
                },
                "name": cfg["name"],
            }
        }
    }


@composite_solid(
    config_fn=config_mapping_fn,
    config_schema={"name": Field(str, is_required=False, default_value="Sam")},
)
def hello_external():
    return hello()


@pipeline
def example_pipeline():
    hello_external()


@repository
def config_mapping():
    return [example_pipeline]

In this example, the hello solid has a complicated cluster config. With hello_external, we've pre-configured the cluster config, and expose only a simplified config which we pass through to the inner hello solid.

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/config_mapping
cd config_mapping