Pipeline Unit Testing

You can find the code for this example on Github.

This demonstrates how to execute a pipeline in a manner suitable for unit testing. Here we focus on the mechanics and what APIs to use, rather than how to design a pipeline such that resources are abstracted away.

The workhouse function for unit-testing a pipeline is the execute_pipeline function. Using this function one can execute a pipeline in process and then test properties of the execution using the PipelineExecutionResult object that it returns.

This function can also be used to execute a subset of a pipeline.

Finally we demonstrate how one can test against the event stream, which is the most generic way that a solid communicates what happened during its computation. Solids communicate events for starting, input and output type checking, and for user-provided events such as expectations, materializations, and outputs.


def add_one(_, num: int) -> int:
    return num + 1

def add_two(_, num: int) -> int:
    return num + 2

def subtract(_, left: int, right: int) -> int:
    return left - right

def do_math():
    subtract(add_one(), add_two())

    input_defs=[InputDefinition(name="input_num", dagster_type=int)],
    # with multiple outputs, you must specify your outputs via
    # OutputDefinitions, rather than type annotations
        OutputDefinition(name="a_num", dagster_type=int),
        OutputDefinition(name="a_string", dagster_type=str),
def emit_events_solid(_, input_num):
    a_num = input_num + 1
    a_string = "foo"
    yield ExpectationResult(
        success=a_num > 0, label="positive", description="A num must be positive"
    yield AssetMaterialization(
        asset_key="persisted_string", description="Let us pretend we persisted the string somewhere"
    yield Output(value=a_num, output_name="a_num")
    yield Output(value=a_string, output_name="a_string")

def emit_events_pipeline():


def test_full_execution():
    result = execute_pipeline(
        do_math, {"solids": {"add_one": {"inputs": {"num": 2}}, "add_two": {"inputs": {"num": 3}}}}

    # return type is PipelineExecutionResult
    assert isinstance(result, PipelineExecutionResult)

    assert result.success

    assert result.output_for_solid("add_one") == 3
    assert result.output_for_solid("add_two") == 5
    assert result.output_for_solid("subtract") == -2

def test_subset_execution():
    result = execute_pipeline(
        {"solids": {"add_one": {"inputs": {"num": 2}}, "add_two": {"inputs": {"num": 3}}}},
        solid_selection=["add_one", "add_two"],

    assert result.success
    assert result.output_for_solid("add_one") == 3
    assert result.output_for_solid("add_two") == 5

    # solid_result_list returns List[SolidExecutionResult]
    # this checks to see that only two were executed
    assert {solid_result.solid.name for solid_result in result.solid_result_list} == {

def test_event_stream():
    pipeline_result = execute_pipeline(
        emit_events_pipeline, {"solids": {"emit_events_solid": {"inputs": {"input_num": 1}}}}
    assert pipeline_result.success

    solid_result = pipeline_result.result_for_solid("emit_events_solid")

    assert isinstance(solid_result, SolidExecutionResult)

    # when one has multiple outputs, you need to specify output name
    assert solid_result.output_value(output_name="a_num") == 2
    assert solid_result.output_value(output_name="a_string") == "foo"

    assert [se.event_type for se in solid_result.step_events] == [

    # solids communicate what they did via the event stream, viewable in tools (e.g. dagit)
    ) = solid_result.step_events

    # apologies for verboseness here! we can do better.
    expectation_result = expectation_event.event_specific_data.expectation_result
    assert isinstance(expectation_result, ExpectationResult)
    assert expectation_result.success
    assert expectation_result.label == "positive"

    materialization = materialization_event.event_specific_data.materialization
    assert isinstance(materialization, AssetMaterialization)
    assert materialization.label == "persisted_string"

Open in a playground

Open in Gitpod


curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/unittesting
cd unittesting