Basics of Pipelines
Our pipelines wouldn't be very interesting if they were limited to solids acting in isolation from each other. Pipelines are useful because they let us connect solids into arbitrary DAGs of computation.
Let's Get Serial¶
We'll add a second solid to the pipeline we worked with in the first section of the tutorial.
This new solid will consume the output of the first solid, which read the cereal dataset in from disk, and in turn will sort the list of cereals by their calorie content per serving.
import csv
import os
from dagster import execute_pipeline, pipeline, solid
@solid
def load_cereals(context):
csv_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
with open(csv_path, "r") as fd:
cereals = [row for row in csv.DictReader(fd)]
context.log.info(
"Found {n_cereals} cereals".format(n_cereals=len(cereals))
)
return cereals
@solid
def sort_by_calories(context, cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["calories"])
)
context.log.info(
"Least caloric cereal: {least_caloric}".format(
least_caloric=sorted_cereals[0]["name"]
)
)
context.log.info(
"Most caloric cereal: {most_caloric}".format(
most_caloric=sorted_cereals[-1]["name"]
)
)
@pipeline
def serial_pipeline():
sort_by_calories(load_cereals())
You'll see that we've modified our existing load_cereals
solid to return an output, in this case
the list of dicts into which csv.DictReader <python:csv.DictReader>
reads the cereals dataset.
We've defined our new solid, sort_by_calories
, to take a user-defined input, cereals
, in
addition to the system-provided context
object.
We can use inputs and outputs to connect solids to each other. Here we tell Dagster that although
load_cereals
doesn't depend on the output of any other solid, sort_by_calories
does—it
depends on the output of load_cereals
.
Let's visualize this pipeline in Dagit:
dagit -f serial_pipeline.py
Navigate to http://127.0.0.1:3000/pipeline/serial_pipeline/ or choose "serial_pipeline" from the left sidebar:
A More Complex DAG¶
Solids don't need to be wired together serially. The output of one solid can be consumed by any number of other solids, and the outputs of several different solids can be consumed by a single solid.
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@solid
def load_cereals(_):
dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
with open(dataset_path, "r") as fd:
cereals = [row for row in csv.DictReader(fd)]
return cereals
@solid
def sort_by_calories(_, cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["calories"])
)
least_caloric = sorted_cereals[0]["name"]
most_caloric = sorted_cereals[-1]["name"]
return (least_caloric, most_caloric)
@solid
def sort_by_protein(_, cereals):
sorted_cereals = list(
sorted(cereals, key=lambda cereal: cereal["protein"])
)
least_protein = sorted_cereals[0]["name"]
most_protein = sorted_cereals[-1]["name"]
return (least_protein, most_protein)
@solid
def display_results(context, calorie_results, protein_results):
context.log.info(
"Least caloric cereal: {least_caloric}".format(
least_caloric=calorie_results[0]
)
)
context.log.info(
"Most caloric cereal: {most_caloric}".format(
most_caloric=calorie_results[-1]
)
)
context.log.info(
"Least protein-rich cereal: {least_protein}".format(
least_protein=protein_results[0]
)
)
context.log.info(
"Most protein-rich cereal: {most_protein}".format(
most_protein=protein_results[-1]
)
)
@pipeline
def complex_pipeline():
cereals = load_cereals()
display_results(
calorie_results=sort_by_calories(cereals),
protein_results=sort_by_protein(cereals),
)
First we introduce the intermediate variable cereals
into our pipeline definition to represent the
output of the load_cereals
solid. Then we make both sort_by_calories
and sort_by_protein
consume this output. Their outputs are in turn both consumed by display_results
.
Let's visualize this pipeline in Dagit:
dagit -f complex_pipeline.py
When you execute this example from Dagit, you'll see that load_cereals
executes first, followed by
sort_by_calories
and sort_by_protein
—in any order—and that display_results
executes last, only after sort_by_calories
and sort_by_protein
have both executed.
In more sophisticated execution environments, sort_by_calories
and sort_by_protein
could execute
not just in any order, but at the same time, since they don't depend on each other's
outputs—but both would still have to execute after load_cereals
(because they depend on its
output) and before display_results
(because display_results
depends on both of their outputs).
We'll write a simple test for this pipeline showing how we can assert that all four of its solids executed successfully.
73
74
75
76
77
78
def test_complex_pipeline():
res = execute_pipeline(complex_pipeline)
assert res.success
assert len(res.solid_result_list) == 4
for solid_res in res.solid_result_list:
assert solid_res.success