Basics of Solids
Parametrizing Solids with Inputs¶
So far, we've only seen solids whose behavior is the same every time they're run:
7
8
9
10
11
12
13
14
15
16
17
18
19
@solid
def hello_cereal(context):
# Assuming the dataset is in the same directory as this file
dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
with open(dataset_path, "r") as fd:
# Read the rows in using the standard csv library
cereals = [row for row in csv.DictReader(fd)]
context.log.info(
"Found {n_cereals} cereals".format(n_cereals=len(cereals))
)
return cereals
In general, though, rather than relying on hardcoded values like dataset_path
, we'd like to be
able to parametrize our solid logic. Appropriately parameterized solids are more testable and
reusable. Consider the following more generic solid:
7
8
9
10
11
12
13
14
@solid
def read_csv(context, csv_path):
csv_path = os.path.join(os.path.dirname(__file__), csv_path)
with open(csv_path, "r") as fd:
lines = [row for row in csv.DictReader(fd)]
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
return lines
Here, rather than hard-coding the value of dataset_path
, we use an input, csv_path
. It's easy to
see why this is better. We can reuse the same solid in all the different places we might need to
read in a CSV from a filepath. We can test the solid by pointing it at some known test CSV file. And
we can use the output of another upstream solid to determine which file to load.
Let's rebuild a pipeline we've seen before, but this time using our newly parameterized solid.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import csv
import os
from dagster import execute_pipeline, pipeline, solid
@solid
def read_csv(context, csv_path):
csv_path = os.path.join(os.path.dirname(__file__), csv_path)
with open(csv_path, "r") as fd:
lines = [row for row in csv.DictReader(fd)]
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
return lines
@solid
def sort_by_calories(context, cereals):
sorted_cereals = sorted(
cereals, key=lambda cereal: int(cereal["calories"])
)
context.log.info(
"Least caloric cereal: {least_caloric}".format(
least_caloric=sorted_cereals[0]["name"]
)
)
context.log.info(
"Most caloric cereal: {most_caloric}".format(
most_caloric=sorted_cereals[-1]["name"]
)
)
return {
"least_caloric": sorted_cereals[0],
"most_caloric": sorted_cereals[-1],
}
@pipeline
def inputs_pipeline():
sort_by_calories(read_csv())
Specifying Config for Pipeline Execution¶
As you can see above, what's missing from this setup is a way to specify the csv_path
input to our
new read_csv
solid in the absence of any upstream solids whose outputs we can rely on. Dagster
provides the ability to stub inputs to solids that aren't satisfied by the pipeline topology as part
of its flexible configuration facility.
We can specify config for a pipeline execution regardless of which modality we use to execute the pipeline — the Python API, the Dagit GUI, or the command line:
Specifying config in the Python API
We previously encountered the execute_pipeline()
function. Pipeline run config is specified by the second
argument to this function, which must be a dict.
This dict contains all of the user-provided configuration with which to execute a pipeline. As such,
it can have a lot of sections, but we'll only use one of
them here: per-solid configuration, which is specified under the key solids
:
46 47 48 49 50
run_config = { "solids": { "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}} } }
The solids
dict is keyed by solid name, and each solid is configured by a dict that may itself
have several sections. In this case we are only interested in the inputs
section, so that we can
specify the value of the input csv_path
.
Now you can pass this run config to execute_pipeline()
:
53
result = execute_pipeline(inputs_pipeline, run_config=run_config)
Specifying config using YAML fragments and the Dagster CLI
When executing pipelines with the Dagster CLI, we'll need to provide the run config in a file. We use YAML for the file-based representation of configuration, but the values are the same as before:
1
2
3
4
5
solids:
read_csv:
inputs:
csv_path:
value: "cereal.csv"
We can pass config files in this format to the Dagster CLI tool with the -c
flag.
dagster pipeline execute -f inputs.py -c inputs_env.yaml
In practice, you might have different sections of your run config in different yaml files—if,
for instance, some sections change more often (e.g. in test and prod) while other are more static.
In this case, you can set multiple instances of the -c
flag on CLI invocations, and the CLI tools
will assemble the YAML fragments into a single run config.
Using the Dagit config editor
Dagit provides a powerful, schema-aware, typeahead-enabled config editor to enable rapid experimentation with and debugging of parameterized pipeline executions. As always, run:
dagit -f inputs.py
Notice that the launch execution button is disabled and the solids are red in the bottom right corner of the Playground.
Because Dagit is schema-aware, it knows that this pipeline now requires configuration in order to run without errors. In this case, since the pipeline is relatively trivial, it wouldn't be especially costly to run the pipeline and watch it fail. But when pipelines are complex and slow, it's invaluable to get this kind of feedback up front rather than have an unexpected failure deep inside a pipeline.
Recall that the execution plan, which you will ordinarily see above the log viewer in the Execute tab, is the concrete pipeline that Dagster will actually execute. Without a valid config, Dagster can't construct a parametrization of the logical pipeline—so no execution plan is available for us to preview.
Press Ctrl + Space in order to bring up the typeahead assistant.
Here you can see all of the sections available in the run config. Don't worry, we'll get to them all later.
Let's enter the config we need in order to execute our pipeline.
Note that as you type and edit the config, the config minimap hovering on the right side of the editor pane changes to provide context—you always know where in the nested config schema you are while making changes.
Parametrizing Solids with Config¶
Solids often depend in predictable ways on features of the external world or the pipeline in which they're invoked. We obviously don't want to have to write a separate solid for each permutation of these parameters that we use in our pipelines— especially because, in more realistic cases like configuring a Spark job, we might have dozens or hundreds of parameters to configure.
But hoisting all of these parameters into the signature of the solid function as inputs isn't the right answer.
Defaults are often sufficient for configuration values, and sets of parameters are often reusable. Moreover, it's unlikely that values like this will be provided dynamically by the outputs of other solids in a pipeline.
Inputs, on the other hand, will usually be provided by the outputs of other solids in a pipeline, even though we might sometimes want to stub them using the config facility.
For all these reasons, it's bad practice to mix configuration values like these with true input values.
The solution is to define a config schema for our solid:
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@solid(config_schema={"reverse": bool})
def sort_by_calories(context, cereals):
sorted_cereals = sorted(
cereals,
key=lambda cereal: int(cereal["calories"]),
reverse=context.solid_config["reverse"],
)
if context.solid_config["reverse"]: # find the most caloric cereal
context.log.info(
"{x} caloric cereal: {first_cereal_after_sort}".format(
x="Most", first_cereal_after_sort=sorted_cereals[0]["name"]
)
)
return {
"most_caloric": sorted_cereals[0],
"least_caloric": sorted_cereals[-1],
}
else: # find the least caloric cereal
context.log.info(
"{x} caloric cereal: {first_cereal_after_sort}".format(
x="Least", first_cereal_after_sort=sorted_cereals[0]["name"]
)
)
return {
"least_caloric": sorted_cereals[0],
"most_caloric": sorted_cereals[-1],
}
We want to control the way our solid sorts the cereals.
First, we pass the config_schema
argument to the @solid
decorator.
This tells Dagster to give our solid a config field structured as a dictionary.
Then, we use the expected type of reverse
, setting it to be a boolean.
Finally, inside the body of the solid function sort_by_calories
, we access the config value set by the user using the
solid_config
field on the familiar context
object.
To execute this config_pipeline
, we can specify the pipeline run config in the Python API:
60 61 62 63 64 65
run_config = { "solids": { "read_csv": {"inputs": {"csv_path": {"value": "cereal.csv"}}}, "sort_by_calories": {"config": {"reverse": True}}, } }
You can see that we've added a new section to the solid config. In addition to the inputs
section for read_csv
,
we now have a config
section for sort_by_calories
, where we can set values defined in the config_schema
argument
to @solid
.
We can also pass the run config in a YAML file to the Dagster CLI tool like below. Since we want to sort the cereals by calories in a reversed order
and find the most caloric cereal, we set the reverse value to True
in the config
section.
dagster pipeline execute -f config.py -c config_env.yaml
1
2
3
4
5
6
7
8
solids:
read_csv:
inputs:
csv_path:
value: "cereal.csv"
sort_by_calories:
config:
reverse: True
In the console, we'll see the most caloric cereal is Mueslix Crispy Blend.
dagster - INFO - system - 0039ddb3-4b13-4d0c-8c01-4eb6fb1b90c6 - sort_by_calories.compute - Most caloric cereal: Mueslix Crispy Blend
In this case, we have only defined one key value pair, reverse
being the key. We can define more key value pairs like this.
What's more, we can define the value as a Field
object to make the config optional:
18 19 20 21 22 23 24 25 26 27
@solid( config_schema={ "reverse": Field( Bool, default_value=False, is_required=False, description="If `True`, cereals will be sorted in reverse order. Default: `False`", ) } )
Here, we can see the Dagster config type Bool
is equivalent to the ordinary bool
type.
The default_value
and is_required
are set to be False
, and a human-readable description for this config is provided.
Let's see how the config section looks in Dagit. As usual, run:
dagit -f config.py
Dagit provides a fully type-aware and schema-aware config editing environment with a typeahead.
The human-readable description of reverse
we provided on our config fields
appears in the config section on the right when we click into the sort_by_calories
solid definition.
Multiple and Conditional Outputs¶
Solids can have arbitrarily many outputs, and downstream solids can depend on any number of these.
What's more, solids can have outputs that are optional and don't have to be yielded, which lets us write pipelines where some solids conditionally execute based on the presence of an upstream output.
Suppose we're interested in splitting hot and cold cereals into separate datasets and processing them separately, based on config.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@solid(
config_schema={
"process_hot": Field(Bool, is_required=False, default_value=True),
"process_cold": Field(Bool, is_required=False, default_value=True),
},
output_defs=[
OutputDefinition(name="hot_cereals", is_required=False),
OutputDefinition(name="cold_cereals", is_required=False),
],
)
def split_cereals(context, cereals):
if context.solid_config["process_hot"]:
hot_cereals = [cereal for cereal in cereals if cereal["type"] == "H"]
yield Output(hot_cereals, "hot_cereals")
if context.solid_config["process_cold"]:
cold_cereals = [cereal for cereal in cereals if cereal["type"] == "C"]
yield Output(cold_cereals, "cold_cereals")
Solids that yield multiple outputs must declare, and name, their outputs (passing output_defs
to
the @solid
decorator). Output names must
be unique and each Output
yielded by a solid's compute
function must have a name that corresponds to one of these declared outputs.
We'll define two downstream solids and hook them up to the multiple outputs from split_cereals
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@solid
def sort_hot_cereals_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(
"Least caloric hot cereal: {least_caloric}".format(
least_caloric=sorted_cereals[0]["name"]
)
)
@solid
def sort_cold_cereals_by_calories(context, cereals):
sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
context.log.info(
"Least caloric cold cereal: {least_caloric}".format(
least_caloric=sorted_cereals[0]["name"]
)
)
@pipeline
def multiple_outputs_pipeline():
hot_cereals, cold_cereals = split_cereals(read_csv())
sort_hot_cereals_by_calories(hot_cereals)
sort_cold_cereals_by_calories(cold_cereals)
As usual, we can visualize this in Dagit:
Notice that the logical DAG corresponding to the pipeline definition includes both dependencies --
we won't know about the conditionality in the pipeline until runtime, when split_cereal
might not
yield both of the outputs.
Zooming in, Dagit shows us the details of the multiple outputs from split_cereals
and their
downstream dependencies, hot_cereals
and cold_cereals
.
When we execute this pipeline with the following config, we'll see that the cold cereals output is omitted and that the execution step corresponding to the downstream solid is marked skipped in the execution pane:
1
2
3
4
5
6
7
8
9
solids:
read_csv:
inputs:
csv_path:
value: "cereal.csv"
split_cereals:
config:
process_hot: true
process_cold: false