Building Pipelines with Dagster

We now have the basic building blocks for our cereal pipeline. To construct and execute a data pipeline using Dagster, we will need solids as our computational units containing business logic, a pipeline to connect solids, and a way to execute the pipeline.

If you haven't already, grab the dataset for this tutorial from Github:

curl -O https://raw.githubusercontent.com/dagster-io/dagster/master/examples/docs_snippets/docs_snippets/intro_tutorial/cereal.csv

Executing Your First Pipeline

You can find the code for this tutorial on Github.

Hello, Solid!

Let's write our first Dagster solid and save it as hello_cereal.py.

A solid is a unit of computation in a data pipeline. Typically, you'll define solids by annotating ordinary Python functions with the @solid decorator.

The logic in our first solid is very straightforward: it just reads in the csv from a hardcoded path and logs the number of rows it finds.

hello_cereal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import csv
import os

from dagster import execute_pipeline, pipeline, solid


@solid
def hello_cereal(context):
    # Assuming the dataset is in the same directory as this file
    dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv")
    with open(dataset_path, "r") as fd:
        # Read the rows in using the standard csv library
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(
        "Found {n_cereals} cereals".format(n_cereals=len(cereals))
    )

    return cereals

In this simple case, our solid takes no inputs except for the context in which it executes (provided by the Dagster framework as the first argument to every solid), and also returns no outputs. Don't worry, we'll soon encounter solids that are much more dynamic.


Hello, Pipeline!

To execute our solid, we'll embed it in an equally simple pipeline. A pipeline is a set of solids arranged into a DAG of computation. You'll typically define pipelines by annotating ordinary Python functions with the @pipeline decorator.

hello_cereal.py
22
23
24
@pipeline
def hello_cereal_pipeline():
    hello_cereal()

Here you'll see that we call hello_cereal(). This call doesn't actually execute the solidwithin the body of functions decorated with @pipeline, we use function calls to indicate the dependency structure of the solids making up the pipeline. Here, we indicate that the execution of hello_cereal doesn't depend on any other solids by calling it with no arguments.


Executing Our First Pipeline

Assuming you’ve saved this pipeline as hello_cereal.py, we can execute it via any of three different mechanisms:

From the command line, use the Dagster CLI

From the directory in which you've saved the pipeline file, just run:

dagster pipeline execute -f hello_cereal.py

You'll see the full stream of events emitted by Dagster appear in the console, including our call to the logging machinery, which will look like:

2019-10-10 11:46:50 - dagster - INFO - system - a91a4cc4-d218-4c2b-800c-aac50fced1a5
- Found 77 cereals
    solid             = "hello_cereal"
    solid_definition  = "hello_cereal"
    step_key          = "hello_cereal.compute"

Success!

From Python, use Dagster’s Python API

If you'd rather execute your pipelines as a script, you can do that without using the Dagster CLI at all. Just add a few lines to hello_cereal.py

hello_cereal.py
27
28
if __name__ == "__main__":
    result = execute_pipeline(hello_cereal_pipeline)

Now you can just run:

python hello_cereal.py

The execute_pipeline() function called here is the core Python API for executing Dagster pipelines from code.

From a GUI, use the Dagit tool

To visualize your pipeline (which only has one node) in Dagit, from the directory in which you've saved the pipeline file, just run:

dagit -f hello_cereal.py

You'll see output like

Loading repository... Serving on http://127.0.0.1:3000

You should be able to navigate to http://127.0.0.1:3000/pipeline/hello_cereal_pipeline/ in your web browser and view your pipeline. It isn't very interesting yet, because it only has one node.

hello_cereal_figure_one.png

Click on the "Playground" tab and you'll see the view below.

hello_cereal_figure_two.png

The top pane is empty here, but in more complicated pipelines, this is where you'll be able to edit pipeline configuration on the fly.

The bottom pane shows the concrete execution plan corresponding to the logical structure of the pipelinewhich also only has one node, hello_cereal.compute.

Click the "Launch Execution" button to execute this plan directly from Dagit. A new window should open, and you'll see a much more structured view of the stream of Dagster events start to appear in the left-hand pane.

If you have pop-up blocking enabled, you may need to tell your browser to allow pop-ups from 127.0.0.1or, just navigate to the "Runs" tab to see this, and every run of your pipeline.

hello_cereal_figure_three.png

In this view, you can filter and search through the logs corresponding to your pipeline run.