Basics of Solids

You can find the code for this tutorial on Github.

Parametrizing Solids with Inputs

So far, we've only seen solids whose behavior is the same every time they're run:

hello_cereal.py
7
8
9
10
11
12
13
14
15
16
17
18
19
@solid
def hello_cereal(context):
    # Assuming the dataset is in the same directory as this file
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        # Read the rows in using the standard csv library
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(
        "Found {n_cereals} cereals".format(n_cereals=len(cereals))
    )

    return cereals

In general, though, rather than relying on hardcoded values like dataset_path, we'd like to be able to parametrize our solid logic. Appropriately parameterized solids are more testable and reusable. Consider the following more generic solid:

inputs.py
7
8
9
10
11
12
13
14
@solid
def read_csv(context, csv_path):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

Here, rather than hard-coding the value of dataset_path, we use an input, csv_path. It's easy to see why this is better. We can reuse the same solid in all the different places we might need to read in a CSV from a filepath. We can test the solid by pointing it at some known test CSV file. And we can use the output of another upstream solid to determine which file to load.

Let's rebuild a pipeline we've seen before, but this time using our newly parameterized solid.

inputs.py
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import csv
import os

from dagster import execute_pipeline, pipeline, solid


@solid
def read_csv(context, csv_path):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines


@solid
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )
    )
    return {
        "least_caloric": sorted_cereals[0],
        "most_caloric": sorted_cereals[-1],
    }


@pipeline
def inputs_pipeline():
    sort_by_calories(read_csv())

Specifying Config for Pipeline Execution

As you can see above, what's missing from this setup is a way to specify the csv_path input to our new read_csv solid in the absence of any upstream solids whose outputs we can rely on. Dagster provides the ability to stub inputs to solids that aren't satisfied by the pipeline topology as part of its flexible configuration facility.

We can specify config for a pipeline execution regardless of which modality we use to execute the pipeline — the Python API, the Dagit GUI, or the command line:

Specifying config in the Python API

We previously encountered the execute_pipeline()function. Pipeline run config is specified by the second argument to this function, which must be a dict.

This dict contains all of the user-provided configuration with which to execute a pipeline. As such, it can have a lot of sections, but we'll only use one of them here: per-solid configuration, which is specified under the key solids:

inputs.py
46
47
48
49
50
run_config = {
    "solids": {
        "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}
    }
}

The solids dict is keyed by solid name, and each solid is configured by a dict that may itself have several sections. In this case we are only interested in the inputs section, so that we can specify the value of the input csv_path.

Now you can pass this run config to execute_pipeline():

inputs.py
53
result = execute_pipeline(inputs_pipeline, run_config=run_config)
Specifying config using YAML fragments and the Dagster CLI

When executing pipelines with the Dagster CLI, we'll need to provide the run config in a file. We use YAML for the file-based representation of configuration, but the values are the same as before:

inputs_env.yaml
1
2
3
4
5
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"

We can pass config files in this format to the Dagster CLI tool with the -c flag.

dagster pipeline execute -f inputs.py -c inputs_env.yaml

In practice, you might have different sections of your run config in different yaml filesif, for instance, some sections change more often (e.g. in test and prod) while other are more static. In this case, you can set multiple instances of the -c flag on CLI invocations, and the CLI tools will assemble the YAML fragments into a single run config.

Using the Dagit config editor

Dagit provides a powerful, schema-aware, typeahead-enabled config editor to enable rapid experimentation with and debugging of parameterized pipeline executions. As always, run:

dagit -f inputs.py

Notice that no execution plan appears in the bottom pane of the Playground.

inputs_figure_one.png

Because Dagit is schema-aware, it knows that this pipeline now requires configuration in order to run without errors. In this case, since the pipeline is relatively trivial, it wouldn't be especially costly to run the pipeline and watch it fail. But when pipelines are complex and slow, it's invaluable to get this kind of feedback up front rather than have an unexpected failure deep inside a pipeline.

Recall that the execution plan, which you will ordinarily see above the log viewer in the Execute tab, is the concrete pipeline that Dagster will actually execute. Without a valid config, Dagster can't construct a parametrization of the logical pipelineso no execution plan is available for us to preview.

Press Ctrl + Space in order to bring up the typeahead assistant.

inputs_figure_two.png

Here you can see all of the sections available in the run config. Don't worry, we'll get to them all later.

Let's enter the config we need in order to execute our pipeline.

inputs_figure_three.png

Note that as you type and edit the config, the config minimap hovering on the right side of the editor pane changes to provide contextyou always know where in the nested config schema you are while making changes.


Parametrizing Solids with Config

Solids often depend in predictable ways on features of the external world or the pipeline in which they're invoked. We obviously don't want to have to write a separate solid for each permutation of these parameters that we use in our pipelines especially because, in more realistic cases like configuring a Spark job, we might have dozens or hundreds of parameters to configure.

But hoisting all of these parameters into the signature of the solid function as inputs isn't the right answer.

Defaults are often sufficient for configuration values, and sets of parameters are often reusable. Moreover, it's unlikely that values like this will be provided dynamically by the outputs of other solids in a pipeline.

Inputs, on the other hand, will usually be provided by the outputs of other solids in a pipeline, even though we might sometimes want to stub them using the config facility.

For all these reasons, it's bad practice to mix configuration values like these with true input values.

The solution is to define a config schema for our solid:

config.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@solid(config_schema={"reverse": bool})
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(
        cereals,
        key=lambda cereal: int(cereal["calories"]),
        reverse=context.solid_config["reverse"],
    )

    if context.solid_config["reverse"]:  # find the most caloric cereal
        context.log.info(
            "{x} caloric cereal: {first_cereal_after_sort}".format(
                x="Most", first_cereal_after_sort=sorted_cereals[0]["name"]
            )
        )
        return {
            "most_caloric": sorted_cereals[0],
            "least_caloric": sorted_cereals[-1],
        }
    else:  # find the least caloric cereal
        context.log.info(
            "{x} caloric cereal: {first_cereal_after_sort}".format(
                x="Least", first_cereal_after_sort=sorted_cereals[0]["name"]
            )
        )
        return {
            "least_caloric": sorted_cereals[0],
            "most_caloric": sorted_cereals[-1],
        }

We want to control the way our solid sorts the cereals. First, we pass the config_schema argument to the @solid decorator. This tells Dagster to give our solid a config field structured as a dictionary. Then, we use the expected type of reverse, setting it to be a boolean. Finally, inside the body of the solid function sort_by_calories, we access the config value set by the user using the solid_config field on the familiar context object.

To execute this config_pipeline, we can specify the pipeline run config in the Python API:

config.py
60
61
62
63
64
65
run_config = {
    "solids": {
        "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}},
        "sort_by_calories": {"config": {"reverse": True}},
    }
}

You can see that we've added a new section to the solid config. In addition to the inputs section for read_csv, we now have a config section for sort_by_calories, where we can set values defined in the config_schema argument to @solid.

We can also pass the run config in a YAML file to the Dagster CLI tool like below. Since we want to sort the cereals by calories in a reversed order and find the most caloric cereal, we set the reverse value to True in the config section.

dagster pipeline execute -f config.py -c config_env.yaml
config_env.yaml
1
2
3
4
5
6
7
8
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"
  sort_by_calories:
    config:
      reverse: True

In the console, we'll see the most caloric cereal is Mueslix Crispy Blend.

dagster - INFO - system - 0039ddb3-4b13-4d0c-8c01-4eb6fb1b90c6 - sort_by_calories.compute - Most caloric cereal: Mueslix Crispy Blend

In this case, we have only defined one key value pair, reverse being the key. We can define more key value pairs like this. What's more, we can define the value as a Field object to make the config optional:

config_more_details.py
18
19
20
21
22
23
24
25
26
27
@solid(
    config_schema={
        "reverse": Field(
            Bool,
            default_value=False,
            is_required=False,
            description="If `True`, cereals will be sorted in reverse order. Default: `False`",
        )
    }
)

Here, we can see the Dagster config type Bool is equivalent to the ordinary bool type. The default_value and is_required are set to be False, and a human-readable description for this config is provided.

Let's see how the config section looks in Dagit. As usual, run:

dagit -f config.py

config_figure_one.png

Dagit provides a fully type-aware and schema-aware config editing environment with a typeahead. The human-readable description of reverse we provided on our config fields appears in the config section on the right when we click into the sort_by_calories solid definition.


Multiple and Conditional Outputs

Solids can have arbitrarily many outputs, and downstream solids can depend on any number of these.

What's more, solids can have outputs that are optional and don't have to be yielded, which lets us write pipelines where some solids conditionally execute based on the presence of an upstream output.

Suppose we're interested in splitting hot and cold cereals into separate datasets and processing them separately, based on config.

multiple_outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@solid(
    config_schema={
        "process_hot": Field(Bool, is_required=False, default_value=True),
        "process_cold": Field(Bool, is_required=False, default_value=True),
    },
    output_defs=[
        OutputDefinition(name="hot_cereals", is_required=False),
        OutputDefinition(name="cold_cereals", is_required=False),
    ],
)
def split_cereals(context, cereals):
    if context.solid_config["process_hot"]:
        hot_cereals = [cereal for cereal in cereals if cereal["type"] == "H"]
        yield Output(hot_cereals, "hot_cereals")
    if context.solid_config["process_cold"]:
        cold_cereals = [cereal for cereal in cereals if cereal["type"] == "C"]
        yield Output(cold_cereals, "cold_cereals")

Solids that yield multiple outputs must declare, and name, their outputs (passing output_defs to the @solid decorator). Output names must be unique and each Output yielded by a solid's compute function must have a name that corresponds to one of these declared outputs.

We'll define two downstream solids and hook them up to the multiple outputs from split_cereals.

multiple_outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@solid
def sort_hot_cereals_by_calories(context, cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric hot cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )


@solid
def sort_cold_cereals_by_calories(context, cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cold cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )


@pipeline
def multiple_outputs_pipeline():
    hot_cereals, cold_cereals = split_cereals(read_csv())
    sort_hot_cereals_by_calories(hot_cereals)
    sort_cold_cereals_by_calories(cold_cereals)

As usual, we can visualize this in Dagit:

multiple_outputs.png

Notice that the logical DAG corresponding to the pipeline definition includes both dependencies -- we won't know about the conditionality in the pipeline until runtime, when split_cereal might not yield both of the outputs.

multiple_outputs_zoom.png

Zooming in, Dagit shows us the details of the multiple outputs from split_cereals and their downstream dependencies, hot_cereals and cold_cereals .

When we execute this pipeline with the following config, we'll see that the cold cereals output is omitted and that the execution step corresponding to the downstream solid is marked skipped in the execution pane:

multiple_outputs.yaml
1
2
3
4
5
6
7
8
9
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"
  split_cereals:
    config:
      process_hot: true
      process_cold: false

conditional_outputs.png