Airline Demo

You can find the code for this example on Github.

Airline Demo

This example is intended to provide a fleshed-out demo of Dagster and Dagit capabilities. It defines two realistic data pipelines corresponding to download/ingest and analysis phases of typical data science workflows, using real-world airline data. Although the view of the pipelines provided by the Dagster tooling is unified, in typical practice we expect that each pipeline is likely to be the responsibility of individuals with more or less clearly distinguished roles. Use the airline demo to familiarize yourself with the features of Dagster in a more fleshed-out context than the introductory tutorial, and as a reference when building your own first production pipelines in the system. Comments and suggestions are enthusiastically encouraged.

Getting Started

To run the airline demo pipelines locally, you'll need:

  • To be running python 3.5 or greater (the airline demo uses python 3 type annotations)
  • AWS credentials in the ordinary boto3 credential chain.
  • A running Postgres database available at postgresql://test:test@127.0.0.1:5432/test. (A docker-compose file is provided in this repo at dagster/examples/airline_demo/).

To get up and running:

# Clone Dagster
git clone git@github.com:dagster-io/dagster.git
cd dagster/examples/airline_demo

# Install all dependencies
pip install -e .

# Start a local PostgreSQL database
docker-compose up -d

# Load the airline demo in Dagit
cd airline_demo
dagit

You can now view the airline demo in Dagit at http://127.0.0.1:3000/.

Pipelines & Config

The demo defines a single repository with two pipelines, in airline_demo/pipelines.py:

  • airline_demo_ingest_pipeline: This pipeline grabs data archives from S3 and unzips them, reads the raw data into Spark, performs some typical manipulations on the data, and then loads tables into a data warehouse.
  • airline_demo_warehouse_pipeline: This pipeline performs typical in-warehouse analysis and manipulations using SQL, and then generates and archives analysis artifacts and plots using Jupyter notebooks.

Default configuration is provided for both of these pipelines using Presets, and the corresponding YAML files can be found under the environments/ folder.

The Ingest Pipeline

ingest-pipeline.png

The airline_demo_ingest_pipeline models the first stage of most project-oriented data science workflows, in which raw data is consumed from a variety of sources.

For demo purposes, we've put our source files in a publicly-readable S3 bucket. In practice, these might be files in S3 or other cloud storage systems; publicly available datasets downloaded over http; or batch files in an SFTP drop.

Running the pipeline locally with test config

You can run this pipeline in Dagit by selecting the "Execute" tab, clicking the preset-load button on the top right of the editor, selecting the local_fast preset, and then selecting "Start Execution":

execution.png

Note: local pipeline execution requires a running PostgreSQL database via the docker-compose command shown above.

This preset configures the pipeline to consume cut-down versions of the original data files on S3. It can be good practice to maintain similar test sets so that you can run fast versions of your data pipelines locally or in test environments. While there are many faults that will not be caught by using small cut-down or synthetic data sets---for example, data issues that may only appear in production data or at scale---this practice will allow you to verify the integrity of your pipeline construction and to catch at least some semantic issues.

The pipeline should now execute and complete in the "Runs" tab of Dagit:

run.png

Reusable Components in Dagster

Dagster heavily emphasizes building reusable, composable pipelines, and we rely on that functionality in several places throughout this pipeline.

Let's start by looking at the pipeline definition (in airline_demo/pipelines.py); you'll notice that we use solid aliasing to reuse the s3_to_df solid for several ingest steps:

pipelines.py
123
124
125
126
127
128
129
130
load_data_to_database_from_spark.alias("load_q2_on_time_data")(
    data_frame=join_q2_data(
        april_data=s3_to_df.alias("april_on_time_s3_to_df")(),
        may_data=s3_to_df.alias("may_on_time_s3_to_df")(),
        june_data=s3_to_df.alias("june_on_time_s3_to_df")(),
        master_cord_data=s3_to_df.alias("master_cord_s3_to_df")(),
    )
)

In general, you won't want every data science user in your organization to have to roll their own implementation of common functionality like downloading and unzipping files. Instead, you'll want to abstract common functionality into reusable solids, separating task-specific parameters out into declarative config, and building up a library of building blocks for new data pipelines. Here, that's been accomplished by implementing the shared functionality in a s3_to_df solid and then re-using that for ingesting several datasets.

Digging deeper, you'll notice that these solids in Dagit are shown with an "Expand" button:

composite-solid-1.png

That's because these are actually Composite Solid: solids that contain other solids. Dagster supports arbitrarily nested, composed solid hierarchies, which can be really useful for encapsulating functionality as we've done here. Click Expand and you'll see the solids contained within s3_to_df:

composite-solid-2.png

You can also names and types of all the inputs/outputs of this composite solid and how they are connected to the inputs/outputs of the solids it contains.

Strongly typed config and outputs

The config for each of our solids specifies everything it needs in order to interact with the external environment. In YAML, an entry in the config for one of our solids aliasing s3_to_df looks like this:

local_fast_ingest.yaml
  april_on_time_s3_to_df:
    inputs:
      s3_coordinate:
        bucket: dagster-airline-demo-source-data
        key: test/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_4.zip
      archive_member:
        value: On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_4.csv

Because each of these values is strongly typed, we'll get rich error information in the Dagit config editor (or when running pipelines from the command line) when a config value is incorrectly specified:

config-error.png

While this may seem like a mere convenience, in production contexts it can dramatically reduce avoidable errors. Consider boto3's S3.Client.put_object() method, which has 28 parameters, many restricted in obscure ways (for example, ServerSideEncryption must be one of 'AES256'|'aws:kms'). Strongly typed config schemas can catch any error of this type.

By setting the description on each of our config members, we also get easily navigable documentation in Dagit. Users of library solids no longer need to investigate implementations in order to understand what values are required, or what they do---enabling more confident reuse.

config-des.png

Using Modes for Production Data

Don't worry, we've got plenty of big(gish) data to run through this pipeline. Instead of the local_fast_ingest.yaml config fragment, use local_full_ingest.yaml---but be prepared to wait. You can use this pattern to run your Dagster pipelines against synthetic, anonymized, or subsampled datasets in test and development environments.

In practice, you'll want to run your pipelines in environments that vary widely in their available facilities. For example, when running an ingestion pipeline locally for test or development, you may want to use a local Spark cluster; but when running in production, you will probably want to target something heftier, like an ephemeral EMR cluster or your organization's on-prem Spark cluster. Or, you may want to target a locally-running Postgres as your "data warehouse" in local test/dev, but target a Redshift cluster in production and production tests. Ideally, we want this to be a matter of flipping a config switch.

Dagster decouples the instantiation of external resources like these from the business logic of your data pipelines. In Dagster, a set of resources configured for a particular environment is called a Mode.

Let's look at how we make configurable resources available to our pipelines with modes. In airline_demo/pipelines.py, you'll find that we define multiple modes within which our pipelines may run:

pipelines.py
88
89
90
@pipeline(
    # ordered so the local is first and therefore the default
    mode_defs=[local_mode, test_mode, prod_mode],

This is intended to mimic a typical setup where you may have pipelines running locally on developer machines, often with a (anonymized or scrubbed) subset of data and with limited compute resources; remotely in CI/CD, with access to a production or replica environment, but where speed is of the essence; and remotely in production on live data.

pipelines.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
test_mode = ModeDefinition(
    name="test",
    resource_defs={
        "pyspark_step_launcher": no_step_launcher,
        "pyspark": pyspark_resource,
        "db_info": redshift_db_info_resource,
        "tempfile": tempfile_resource,
        "s3": s3_resource,
        "file_cache": fs_file_cache,
        "file_manager": local_file_manager,
    },
    intermediate_storage_defs=s3_plus_default_intermediate_storage_defs,
)


local_mode = ModeDefinition(
    name="local",
    resource_defs={
        "pyspark_step_launcher": no_step_launcher,
        "pyspark": pyspark_resource,
        "s3": s3_resource,
        "db_info": postgres_db_info_resource,
        "tempfile": tempfile_resource,
        "file_cache": fs_file_cache,
        "file_manager": local_file_manager,
    },
    intermediate_storage_defs=s3_plus_default_intermediate_storage_defs,
)


prod_mode = ModeDefinition(
    name="prod",
    resource_defs={
        "pyspark_step_launcher": emr_pyspark_step_launcher,
        "pyspark": pyspark_resource,
        "s3": s3_resource,
        "db_info": redshift_db_info_resource,
        "tempfile": tempfile_resource,
        "file_cache": s3_file_cache,
        "file_manager": s3_file_manager,
    },
    intermediate_storage_defs=s3_plus_default_intermediate_storage_defs,
)

Here we've defined a db_info resource that exposes a unified API to our solid logic, but that wraps two very different underlying assets---in one case, a Postgres database, and in the other, a Redshift cluster. Let's look more closely at how this is implemented for the Postgres case.

First, we define the db_info resource itself:

resources.py
8
DbInfo = namedtuple("DbInfo", "engine url jdbc_url dialect load_table host db_name")

This resource exposes a SQLAlchemy engine, the URL of the database (in two forms), metadata about the SQL dialect that the database speaks, and a utility function, load_table, which loads a Spark data frame into the target database. In practice, we would probably find that over time we wanted to add or subtract from this interface, rework its implementations, or factor it into multiple resources. Because the type definition, config definitions, and implementations are centralized---rather than spread out across the internals of many solids---this is a relatively easy task.

Next, we define the config required to instantiate our resource:

resources.py
101
102
103
104
105
106
107
    {
        "username": Field(StringSource),
        "password": Field(StringSource),
        "hostname": Field(StringSource),
        "port": Field(IntSource, is_required=False, default_value=5432),
        "db_name": Field(StringSource),
    }

Obviously, this config will differ for Redshift, as it might if we had to reach our database through a proxy server, or using a different authentication schema.

Finally, we bring it all together in the postgres_db_info_resource:

resources.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@resource(
    {
        "username": Field(StringSource),
        "password": Field(StringSource),
        "hostname": Field(StringSource),
        "port": Field(IntSource, is_required=False, default_value=5432),
        "db_name": Field(StringSource),
    }
)
def postgres_db_info_resource(init_context):
    host = init_context.resource_config["hostname"]
    db_name = init_context.resource_config["db_name"]

    db_url_jdbc = create_postgres_db_url(
        username=init_context.resource_config["username"],
        password=init_context.resource_config["password"],
        hostname=host,
        port=init_context.resource_config["port"],
        db_name=db_name,
    )

    db_url = create_postgres_db_url(
        username=init_context.resource_config["username"],
        password=init_context.resource_config["password"],
        hostname=host,
        port=init_context.resource_config["port"],
        db_name=db_name,
        jdbc=False,
    )

    def _do_load(data_frame, table_name):
        data_frame.write.option("driver", "org.postgresql.Driver").mode("overwrite").jdbc(
            db_url_jdbc, table_name
        )

    return DbInfo(
        url=db_url,
        jdbc_url=db_url_jdbc,
        engine=create_postgres_engine(db_url),
        dialect="postgres",
        load_table=_do_load,
        host=host,
        db_name=db_name,
    )

By providing strongly typed configuration fields to the @resource decorator, we now have typeahead support in Dagit and rich error messages for the configuration of our external resources. This can be extremely valuable in the case of notoriously complex configuration formats, such as Spark's.

Note that Dagit is aware of all modes defined for your pipelines, and will present these to you (along with the resources they provide) in the pipeline view sidebar:

pipeline-info.png

Ingesting data to Spark data frames

Returning to our s3_to_df solid, note the type signature of these solids.

s3_to_df.png

The output has type DataFrame, a Dagster type imported from dagster_pyspark that ensures a that the underlying output type is the python class pyspark.sql.DataFrame.

The transformation solids that follow all use the DataFrame for their intermediate results.

Loading data to the warehouse

The terminal nodes of this pipeline are all aliased instances of load_data_to_database_from_spark:

solids.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@solid(
    output_defs=[OutputDefinition(name="table_name", dagster_type=String)],
    config_schema={"table_name": String},
    required_resource_keys={"db_info", "pyspark_step_launcher"},
)
def load_data_to_database_from_spark(context, data_frame: DataFrame):
    context.resources.db_info.load_table(data_frame, context.solid_config["table_name"])

    table_name = context.solid_config["table_name"]
    yield AssetMaterialization(
        asset_key="table:{table_name}".format(table_name=table_name),
        description=(
            "Persisted table {table_name} in database configured in the db_info resource."
        ).format(table_name=table_name),
        metadata_entries=[
            EventMetadataEntry.text(label="Host", text=context.resources.db_info.host),
            EventMetadataEntry.text(label="Db", text=context.resources.db_info.db_name),
        ],
    )
    yield Output(value=table_name, output_name="table_name")

which abstracts the operation of loading a Spark data frame to a database---either our production Redshift cluster or our local Postgres in test.

Note how using the db_info resource simplifies this operation. There's no need to pollute the implementation of our DAGs with specifics about how to connect to outside databases, credentials, formatting details, retry or batching logic, etc. This greatly reduces the opportunities for implementations of these core external operations to drift and introduce subtle bugs, and cleanly separates infrastructure concerns from the logic of any particular data processing pipeline.

The Warehouse Pipeline

warehouse-pipeline.png

The airline_demo_warehouse_pipeline models the analytics stage of a typical data science workflow. This is a heterogeneous-by-design process in which analysts, data scientists, and ML engineers incrementally derive and formalize insights and analytic products (charts, data frames, models, and metrics) using a wide range of tools---from SQL run directly against the warehouse to Jupyter notebooks in Python or R.

The sql_solid: wrapping foreign code in a solid

How do we actually package the SQL for execution and display with Dagster and Dagit? We've built a function, sql_solid, that is designed to take a SQL query and return a solid.

Fundamentally, all this is doing is invoking execute on the db_info.engine provided by the context:

solids.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@solid(
    name=name,
    input_defs=input_defs,
    output_defs=[
        OutputDefinition(
            materialization_strategy_output_types[materialization_strategy],
            description=output_description,
        )
    ],
    description=description,
    required_resource_keys={"db_info"},
    tags={"kind": "sql", "sql": sql_statement},
)
def _sql_solid(context, **input_defs):  # pylint: disable=unused-argument
    """Inner function defining the new solid.

    Args:
        context (SolidExecutionContext): Must expose a `db` resource with an `execute` method,
            like a SQLAlchemy engine, that can execute raw SQL against a database.

    Returns:
        str:
            The table name of the newly materialized SQL select statement.
    """
    context.log.info(
        "Executing sql statement:\n{sql_statement}".format(sql_statement=sql_statement)
    )
    context.resources.db_info.engine.execute(text(sql_statement))
    yield Output(value=table_name, output_name="result")

sql-solid.png

Also, note how straightforward the interface to sql_solid is. To define a new solid executing a relatively complex computation against the data warehouse, an analyst only needs to supply light metadata along with their SQL query:

solids.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
westbound_delays = sql_solid(
    "westbound_delays",
    """
    select
        avg(cast(cast(arrdelay as float) as integer)) as avg_arrival_delay,
        avg(cast(cast(depdelay as float) as integer)) as avg_departure_delay,
        origin,
        dest as destination,
        count(1) as num_flights,
        avg(cast(dest_latitude as float)) as dest_latitude,
        avg(cast(dest_longitude as float)) as dest_longitude,
        avg(cast(origin_latitude as float)) as origin_latitude,
        avg(cast(origin_longitude as float)) as origin_longitude
    from q2_on_time_data
    where
        cast(origin_longitude as float) > cast(dest_longitude as float) and
        originstate != 'HI' and
        deststate != 'HI' and
        originstate != 'AK' and
        deststate != 'AK'
    group by (origin,destination)
    order by num_flights desc
    limit 100;
    """,
    "table",
    table_name="westbound_delays",
)

This kind of interface can supercharge the work of analysts who are highly skilled in SQL, but for whom fully general-purpose programming in Python may be uncomfortable. Analysts need only master a very constrained interface in order to define their SQL statements' data dependencies and outputs in a way that allows them to be orchestrated and explored in heterogeneous DAGs containing other forms of computation, and must make only minimal changes to code (in some cases, no changes) in order to take advantage of centralized infrastructure work.

Dagstermill: executing and checkpointing notebooks

Notebooks are a flexible way and increasingly ubiquitous way to explore datasets and build data products, from transformed or augmented datasets to charts and metrics to deployable machine learning models. But their very flexibility---the way they allow analysts to move seamlessly from interactive exploration of data to scratch code and finally to actionable results---can make them difficult to productionize.

Our approach, built on papermill, embraces the diversity of code in notebooks. Rather than requiring notebooks to be rewritten or "cleaned up" in order to consume inputs from other nodes in a pipeline or to produce outputs for downstream nodes, we can just add a few cells to provide a thin wrapper around an existing notebook.

Notebook solids can be identified by the small red ipynb tag in Dagit. As with SQL solids, they can be explored from within Dagit by drilling down on the tag:

dagstermill.png

Let's start with the definition of our notebook_solid helper:

solids.py
50
51
52
53
54
55
56
57
def notebook_solid(name, notebook_path, input_defs, output_defs, required_resource_keys):
    return define_dagstermill_solid(
        name,
        _notebook_path(notebook_path),
        input_defs,
        output_defs,
        required_resource_keys=required_resource_keys,
    )

This is just a wrapper around Dagstermill's define_dagstermill_solid which tells Dagstermill where to look for the notebooks. We define a new solid by using this function and referencing a notebook file:

solids.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
delays_by_geography = notebook_solid(
    "delays_by_geography",
    "Delays_by_Geography.ipynb",
    input_defs=[
        InputDefinition(
            "westbound_delays",
            SqlTableName,
            description="The SQL table containing westbound delays.",
        ),
        InputDefinition(
            "eastbound_delays",
            SqlTableName,
            description="The SQL table containing eastbound delays.",
        ),
    ],
    output_defs=[
        OutputDefinition(
            dagster_type=FileHandle,
            # name='plots_pdf_path',
            description="The saved PDF plots.",
        )
    ],
    required_resource_keys={"db_info"},
)

As usual, we define the inputs and outputs of the new solid. Within the notebook itself, we only need to add a single line as the first cell:

import dagstermill

Then, in a cell with the parameters tag, we write:

context = dagstermill.get_context()

eastbound_delays = 'eastbound_delays'
westbound_delays = 'westbound_delays'

When dagstermill executes this notebook as part of a pipeline, values for these inputs will be injected from the output of upstream solids.

Finally, at the very end of the notebook, we yield the result back to the pipeline:

dagstermill.yield_result(LocalFileHandle(pdf_path))

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/airline_demo
cd airline_demo