Making Your Pipelines Testable and Maintainable
Data applications are notoriously difficult to test and are therefore typically un- or under-tested. Besides, pipeline authors generally do not have control over their input data, which means that even if data pipelines are covered by sophisticated tests, pipeline breakage could still happen.
Creating testable and verifiable data applications is one of the focuses of Dagster. We believe ensuring data quality is critical for managing the complexity of data systems. This section will talk about how you can build maintainable and testable data applications with Dagster.
Testing solids and pipelines¶
Let's go back to our first solid and pipeline in hello_cereal.py
and ensure they're working as expected by writing some tests. We'll use execute_pipeline()
to test our pipeline, as well as execute_solid()
to test our solid in isolation.
These functions synchronously execute a pipeline or solid and return results objects (SolidExecutionResult
and PipelineExecutionResult
) whose methods let us investigate, in detail, the success or
failure of execution, the outputs produced by solids, and (as we'll see later) other events
associated with execution.
32
33
34
35
36
37
38
39
40
41
def test_hello_cereal_solid():
res = execute_solid(hello_cereal)
assert res.success
assert len(res.output_value()) == 77
def test_hello_cereal_pipeline():
res = execute_pipeline(hello_cereal_pipeline)
assert res.success
assert len(res.result_for_solid("hello_cereal").output_value()) == 77
Now you can use pytest, or your test runner of choice, to run unit tests as you develop your data applications.
pytest hello_cereal_with_tests.py
Note: by convention, pytest tests are typically kept in separate files prefixed with test_
.
We've put them in the same file just to simplify the tutorial code.
Obviously, in production we'll often execute pipelines in a parallel, streaming way that doesn't admit this kind of API, which is intended to enable local tests like this.
Dagster is written to make testing easy in a domain where it has historically been very difficult. Throughout the rest of this tutorial, we'll explore the writing of unit tests for each piece of the framework as we learn about it. You can learn more about Testing in Dagster by reading Testing Example.
Type-checking Inputs with Python Type Annotations¶
Note: this section requires Python 3.
If you zoom in on the Definition tab in Dagit and click on one of our pipeline solids, you'll see that its inputs and outputs are annotated with types.
By default, every untyped value in Dagster is assigned the catch-all type Any
.
This means that any errors in the config won't be surfaced until the pipeline
is executed.
For example, when we execute our pipeline with this config, it'll fail at runtime:
solids:
read_csv:
inputs:
csv_path:
value: 2343
When we enter this mistyped config in Dagit and execute our pipeline, we'll see that an error appears in the structured log viewer:
Click on "View Full Message" or on the red dot on the execution step that failed and a detailed stack trace will pop up.
It would be better if we could catch this error earlier, when we specify the config. So let's make the inputs typed.
A user can apply types to inputs and outputs using Python 3's type annotation syntax. In this case,
we just want to type the input as the built-in str
.
7
8
9
10
11
12
13
14
@solid
def read_csv(context, csv_path: str):
csv_path = os.path.join(os.path.dirname(__file__), csv_path)
with open(csv_path, "r") as fd:
lines = [row for row in csv.DictReader(fd)]
context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
return lines
By using typed input, we can catch this error prior to execution, and reduce the surface area we need to test.