Making Your Pipelines Testable and Maintainable

You can find the code for this tutorial on Github.

Data applications are notoriously difficult to test and are therefore typically un- or under-tested. Besides, pipeline authors generally do not have control over their input data, which means that even if data pipelines are covered by sophisticated tests, pipeline breakage could still happen.

Creating testable and verifiable data applications is one of the focuses of Dagster. We believe ensuring data quality is critical for managing the complexity of data systems. This section will talk about how you can build maintainable and testable data applications with Dagster.

Testing solids and pipelines

Let's go back to our first solid and pipeline in hello_cereal.py and ensure they're working as expected by writing some tests. We'll use execute_pipeline() to test our pipeline, as well as execute_solid() to test our solid in isolation.

These functions synchronously execute a pipeline or solid and return results objects (SolidExecutionResult and PipelineExecutionResult) whose methods let us investigate, in detail, the success or failure of execution, the outputs produced by solids, and (as we'll see later) other events associated with execution.

hello_cereal_with_tests.py
32
33
34
35
36
37
38
39
40
41
def test_hello_cereal_solid():
    res = execute_solid(hello_cereal)
    assert res.success
    assert len(res.output_value()) == 77


def test_hello_cereal_pipeline():
    res = execute_pipeline(hello_cereal_pipeline)
    assert res.success
    assert len(res.result_for_solid("hello_cereal").output_value()) == 77

Now you can use pytest, or your test runner of choice, to run unit tests as you develop your data applications.

pytest hello_cereal_with_tests.py

Note: by convention, pytest tests are typically kept in separate files prefixed with test_. We've put them in the same file just to simplify the tutorial code.

Obviously, in production we'll often execute pipelines in a parallel, streaming way that doesn't admit this kind of API, which is intended to enable local tests like this.

Dagster is written to make testing easy in a domain where it has historically been very difficult. Throughout the rest of this tutorial, we'll explore the writing of unit tests for each piece of the framework as we learn about it. You can learn more about Testing in Dagster by reading Testing Example.


Type-checking Inputs with Python Type Annotations

Note: this section requires Python 3.

If you zoom in on the Definition tab in Dagit and click on one of our pipeline solids, you'll see that its inputs and outputs are annotated with types.

inputs_figure_four.png

By default, every untyped value in Dagster is assigned the catch-all type Any. This means that any errors in the config won't be surfaced until the pipeline is executed.

For example, when we execute our pipeline with this config, it'll fail at runtime:

inputs_env_bad.yaml
solids:
  read_csv:
    inputs:
      csv_path:
        value: 2343

When we enter this mistyped config in Dagit and execute our pipeline, we'll see that an error appears in the structured log viewer:

inputs_figure_five.png

Click on "View Full Message" or on the red dot on the execution step that failed and a detailed stack trace will pop up.

inputs_figure_six.png

It would be better if we could catch this error earlier, when we specify the config. So let's make the inputs typed.

A user can apply types to inputs and outputs using Python 3's type annotation syntax. In this case, we just want to type the input as the built-in str.

inputs_typed.py
7
8
9
10
11
12
13
14
@solid
def read_csv(context, csv_path: str):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

By using typed input, we can catch this error prior to execution, and reduce the surface area we need to test.

inputs_figure_seven.png