Basics of Solids

You can find the code for this tutorial on Github.

Parametrizing Solids with Inputs

So far, we've only seen solids whose behavior is the same every time they're run:

hello_cereal.py
7
8
9
10
11
12
13
14
15
16
17
18
19
@solid
def hello_cereal(context):
    # Assuming the dataset is in the same directory as this file
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        # Read the rows in using the standard csv library
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(
        "Found {n_cereals} cereals".format(n_cereals=len(cereals))
    )

    return cereals

In general, though, rather than relying on hardcoded values like dataset_path, we'd like to be able to parametrize our solid logic. Appropriately parameterized solids are more testable and reusable. Consider the following more generic solid:

inputs.py
7
8
9
10
11
12
13
14
@solid
def read_csv(context, csv_path):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

Here, rather than hard-coding the value of dataset_path, we use an input, csv_path. It's easy to see why this is better. We can reuse the same solid in all the different places we might need to read in a CSV from a filepath. We can test the solid by pointing it at some known test CSV file. And we can use the output of another upstream solid to determine which file to load.

Let's rebuild a pipeline we've seen before, but this time using our newly parameterized solid.

inputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import csv
import os

from dagster import execute_pipeline, pipeline, solid


@solid
def read_csv(context, csv_path):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines


@solid
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )
    )
    return {
        "least_caloric": sorted_cereals[0],
        "most_caloric": sorted_cereals[-1],
    }


@pipeline
def inputs_pipeline():
    sort_by_calories(read_csv())

Specifying Config for Pipeline Execution

As you can see above, what's missing from this setup is a way to specify the csv_path input to our new read_csv solid in the absence of any upstream solids whose outputs we can rely on. Dagster provides the ability to stub inputs to solids that aren't satisfied by the pipeline topology as part of its flexible configuration facility.

We can specify config for a pipeline execution regardless of which modality we use to execute the pipeline — the Python API, the Dagit GUI, or the command line:

Specifying config in the Python API

We previously encountered the execute_pipeline()function. Pipeline run config is specified by the second argument to this function, which must be a dict.

This dict contains all of the user-provided configuration with which to execute a pipeline. As such, it can have a lot of sections, but we'll only use one of them here: per-solid configuration, which is specified under the key solids:

inputs.py
41
42
43
44
45
run_config = {
    "solids": {
        "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}
    }
}

The solids dict is keyed by solid name, and each solid is configured by a dict that may itself have several sections. In this case we are only interested in the inputs section, so that we can specify the value of the input csv_path.

Now you can pass this run config to execute_pipeline():

inputs.py
49
50
result = execute_pipeline(inputs_pipeline, run_config=run_config)
assert result.success
Specifying config using YAML fragments and the Dagster CLI

When executing pipelines with the Dagster CLI, we'll need to provide the run config in a file. We use YAML for the file-based representation of configuration, but the values are the same as before:

inputs_env.yaml
1
2
3
4
5
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"

We can pass config files in this format to the Dagster CLI tool with the -c flag.

dagster pipeline execute -f inputs.py -c inputs_env.yaml

In practice, you might have different sections of your run config in different yaml filesif, for instance, some sections change more often (e.g. in test and prod) while other are more static. In this case, you can set multiple instances of the -c flag on CLI invocations, and the CLI tools will assemble the YAML fragments into a single run config.

Using the Dagit config editor

Dagit provides a powerful, schema-aware, typeahead-enabled config editor to enable rapid experimentation with and debugging of parameterized pipeline executions. As always, run:

dagit -f inputs.py

Notice that no execution plan appears in the bottom pane of the Playground.

inputs_figure_one.png

Because Dagit is schema-aware, it knows that this pipeline now requires configuration in order to run without errors. In this case, since the pipeline is relatively trivial, it wouldn't be especially costly to run the pipeline and watch it fail. But when pipelines are complex and slow, it's invaluable to get this kind of feedback up front rather than have an unexpected failure deep inside a pipeline.

Recall that the execution plan, which you will ordinarily see above the log viewer in the Execute tab, is the concrete pipeline that Dagster will actually execute. Without a valid config, Dagster can't construct a parametrization of the logical pipelineso no execution plan is available for us to preview.

Press Ctrl + Space in order to bring up the typeahead assistant.

inputs_figure_two.png

Here you can see all of the sections available in the run config. Don't worry, we'll get to them all later.

Let's enter the config we need in order to execute our pipeline.

inputs_figure_three.png

Note that as you type and edit the config, the config minimap hovering on the right side of the editor pane changes to provide contextyou always know where in the nested config schema you are while making changes.


Parametrizing Solids with Config

Solids often depend in predictable ways on features of the external world or the pipeline in which they're invoked. For example, consider an extended version of our CSV-reading solid that implements more of the options available in the underlying Python API:

config_bad_1.py
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@solid
def read_csv(context, csv_path):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [
            row
            for row in csv.DictReader(
                fd,
                delimiter=",",
                doublequote=False,
                escapechar="\\",
                quotechar='"',
                quoting=csv.QUOTE_MINIMAL,
                skipinitialspace=False,
                strict=False,
            )
        ]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))

    return lines

We obviously don't want to have to write a separate solid for each permutation of these parameters that we use in our pipelinesespecially because, in more realistic cases like configuring a Spark job or even parametrizing the read_csv function from a popular package like Pandas, we might have dozens or hundreds of parameters like these.

But hoisting all of these parameters into the signature of the solid function as inputs isn't the right answer either:

config_bad_2.py
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@solid
def read_csv(
    context,
    csv_path,
    delimiter,
    doublequote,
    escapechar,
    quotechar,
    quoting,
    skipinitialspace,
    strict,
):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [
            row
            for row in csv.DictReader(
                fd,
                delimiter=delimiter,
                doublequote=doublequote,
                escapechar=escapechar,
                quotechar=quotechar,
                quoting=quoting,
                skipinitialspace=skipinitialspace,
                strict=strict,
            )
        ]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))

    return lines

Defaults are often sufficient for configuration values like these, and sets of parameters are often reusable. Moreover, it's unlikely that values like this will be provided dynamically by the outputs of other solids in a pipeline.

Inputs, on the other hand, will usually be provided by the outputs of other solids in a pipeline, even though we might sometimes want to stub them using the config facility.

For all these reasons, it's bad practice to mix configuration values like these with true input values.

The solution is to define a config schema for our solid:

config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import csv
import os

from dagster import (
    Bool,
    Field,
    Int,
    String,
    execute_pipeline,
    pipeline,
    solid,
)


@solid(
    config_schema={
        "delimiter": Field(
            String,
            default_value=",",
            is_required=False,
            description=("A one-character string used to separate fields."),
        ),
        "doublequote": Field(
            Bool,
            default_value=False,
            is_required=False,
            description=(
                "Controls how instances of quotechar appearing inside a field "
                "should themselves be quoted. When True, the character is "
                "doubled. When False, the escapechar is used as a prefix to "
                "the quotechar."
            ),
        ),
        "escapechar": Field(
            String,
            default_value="\\",
            is_required=False,
            description=(
                "On reading, the escapechar removes any special meaning from "
                "the following character."
            ),
        ),
        "quotechar": Field(
            String,
            default_value='"',
            is_required=False,
            description=(
                "A one-character string used to quote fields containing "
                "special characters, such as the delimiter or quotechar, "
                "or which contain new-line characters."
            ),
        ),
        "quoting": Field(
            Int,
            default_value=csv.QUOTE_MINIMAL,
            is_required=False,
            description=(
                "Controls when quotes should be generated by the writer and "
                "recognised by the reader. It can take on any of the "
                "csv.QUOTE_* constants"
            ),
        ),
        "skipinitialspace": Field(
            Bool,
            default_value=False,
            is_required=False,
            description=(
                "When True, whitespace immediately following the delimiter "
                "is ignored. The default is False."
            ),
        ),
        "strict": Field(
            Bool,
            default_value=False,
            is_required=False,
            description=("When True, raise exception on bad CSV input."),
        ),
    }
)
def read_csv(context, csv_path: str):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [
            row
            for row in csv.DictReader(
                fd,
                delimiter=context.solid_config["delimiter"],
                doublequote=context.solid_config["doublequote"],
                escapechar=context.solid_config["escapechar"],
                quotechar=context.solid_config["quotechar"],
                quoting=context.solid_config["quoting"],
                skipinitialspace=context.solid_config["skipinitialspace"],
                strict=context.solid_config["strict"],
            )
        ]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))

    return lines


@pipeline
def config_pipeline():
    read_csv()

First, we pass the config argument to the @solid decorator. This tells Dagster to give our solid a config field structured as a dictionary, whose keys are the keys of this argument, and the types of whose values are defined by the values of this argument (instances of Field).

Then, we define one of these fields, escapechar, to be a string, setting a default value, making it optional, and setting a human-readable description.

Finally, inside the body of the solid function, we access the config value set by the user using the solid_config field on the familiar context object. When Dagster executes our pipeline, the framework will make validated config for each solid available on this object.

Let's see how all of this looks in Dagit. As usual, run:

dagit -f config.py

config_figure_one.png

As you may by now expect, Dagit provides a fully type-aware and schema-aware config editing environment with a typeahead. The human-readable descriptions we provided on our config fields appear in the config context minimap, as well as in typeahead tooltips and in the Explore pane when clicking into the individual solid definition.

config_figure_two.png

You can see that we've added a new section to the solid config. In addition to the inputs section, which we'll still use to set the csv_path input, we now have a config section, where we can set values defined in the config argument to @solid.

config_env_bad.yaml
solids:
  read_csv:
    config:
      delimiter: ";"
    inputs:
      csv_path:
        value: "cereal.csv"

Of course, this config won't give us the results we're expecting. The values in cereal.csv are comma-separated, not semicolon-separated, as they might be if this were a CSV from Europe, where commas are frequently used in place of the decimal point.

We'll see later how we can use Dagster's facilities for automatic data quality checks to guard against semantic issues like this, which won't be caught by the type system.


Multiple and Conditional Outputs

Solids can have arbitrarily many outputs, and downstream solids can depend on any number of these.

What's more, solids can have outputs that are optional and don't have to be yielded, which lets us write pipelines where some solids conditionally execute based on the presence of an upstream output.

Suppose we're interested in splitting hot and cold cereals into separate datasets and processing them separately, based on config.

multiple_outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@solid(
    config_schema={
        "process_hot": Field(Bool, is_required=False, default_value=True),
        "process_cold": Field(Bool, is_required=False, default_value=True),
    },
    output_defs=[
        OutputDefinition(name="hot_cereals", is_required=False),
        OutputDefinition(name="cold_cereals", is_required=False),
    ],
)
def split_cereals(context, cereals):
    if context.solid_config["process_hot"]:
        hot_cereals = [cereal for cereal in cereals if cereal["type"] == "H"]
        yield Output(hot_cereals, "hot_cereals")
    if context.solid_config["process_cold"]:
        cold_cereals = [cereal for cereal in cereals if cereal["type"] == "C"]
        yield Output(cold_cereals, "cold_cereals")

Solids that yield multiple outputs must declare, and name, their outputs (passing output_defs to the @solid decorator). Output names must be unique and each Output yielded by a solid's compute function must have a name that corresponds to one of these declared outputs.

We'll define two downstream solids and hook them up to the multiple outputs from split_cereals.

multiple_outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@solid
def sort_hot_cereals_by_calories(context, cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric hot cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )


@solid
def sort_cold_cereals_by_calories(context, cereals):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cold cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )


@pipeline
def multiple_outputs_pipeline():
    hot_cereals, cold_cereals = split_cereals(read_csv())
    sort_hot_cereals_by_calories(hot_cereals)
    sort_cold_cereals_by_calories(cold_cereals)

As usual, we can visualize this in Dagit:

multiple_outputs.png

Notice that the logical DAG corresponding to the pipeline definition includes both dependencies -- we won't know about the conditionality in the pipeline until runtime, when split_cereal might not yield both of the outputs.

multiple_outputs_zoom.png

Zooming in, Dagit shows us the details of the multiple outputs from split_cereals and their downstream dependencies.

When we execute this pipeline with the following config, we'll see that the cold cereals output is omitted and that the execution step corresponding to the downstream solid is marked skipped in the execution pane:

multiple_outputs.yaml
1
2
3
4
5
6
7
8
9
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"
  split_cereals:
    config:
      process_hot: true
      process_cold: false

conditional_outputs.png