Pipeline Unit Testing

You can find the code for this example on Github.

This demonstrates how to execute a pipeline in a manner suitable for unit testing. Here we focus on the mechanics and what APIs to use, rather than how to design a pipeline such that resources are abstracted away.

The workhouse function for unit-testing a pipeline is the execute_pipeline function. Using this function one can execute a pipeline in process and then test properties of the execution using the PipelineExecutionResult object that it returns.

This function can also be used to execute a subset of a pipeline.

Finally we demonstrate how one can test against the event stream, which is the most generic way that a solid communicates what happened during its computation. Solids communicate events for starting, input and output type checking, and for user-provided events such as expectations, materializations, and outputs.

Pipelines:

repo.py
@solid
def add_one(_, num: int) -> int:
    return num + 1


@solid
def add_two(_, num: int) -> int:
    return num + 2


@solid
def subtract(_, left: int, right: int) -> int:
    return left - right


@pipeline
def do_math():
    subtract(add_one(), add_two())


@solid(
    input_defs=[InputDefinition(name="input_num", dagster_type=int)],
    # with multiple outputs, you must specify your outputs via
    # OutputDefinitions, rather than type annotations
    output_defs=[
        OutputDefinition(name="a_num", dagster_type=int),
        OutputDefinition(name="a_string", dagster_type=str),
    ],
)
def emit_events_solid(_, input_num):
    a_num = input_num + 1
    a_string = "foo"
    yield ExpectationResult(
        success=a_num > 0, label="positive", description="A num must be positive"
    )
    yield AssetMaterialization(
        asset_key="persisted_string", description="Let us pretend we persisted the string somewhere"
    )
    yield Output(value=a_num, output_name="a_num")
    yield Output(value=a_string, output_name="a_string")


@pipeline
def emit_events_pipeline():
    emit_events_solid()

Tests:

repo.py
def test_full_execution():
    result = execute_pipeline(
        do_math, {"solids": {"add_one": {"inputs": {"num": 2}}, "add_two": {"inputs": {"num": 3}}}}
    )

    # return type is PipelineExecutionResult
    assert isinstance(result, PipelineExecutionResult)

    assert result.success

    assert result.output_for_solid("add_one") == 3
    assert result.output_for_solid("add_two") == 5
    assert result.output_for_solid("subtract") == -2


def test_subset_execution():
    result = execute_pipeline(
        do_math,
        {"solids": {"add_one": {"inputs": {"num": 2}}, "add_two": {"inputs": {"num": 3}}}},
        solid_selection=["add_one", "add_two"],
    )

    assert result.success
    assert result.output_for_solid("add_one") == 3
    assert result.output_for_solid("add_two") == 5

    # solid_result_list returns List[SolidExecutionResult]
    # this checks to see that only two were executed
    assert {solid_result.solid.name for solid_result in result.solid_result_list} == {
        "add_one",
        "add_two",
    }


def test_event_stream():
    pipeline_result = execute_pipeline(
        emit_events_pipeline, {"solids": {"emit_events_solid": {"inputs": {"input_num": 1}}}}
    )
    assert pipeline_result.success

    solid_result = pipeline_result.result_for_solid("emit_events_solid")

    assert isinstance(solid_result, SolidExecutionResult)

    # when one has multiple outputs, you need to specify output name
    assert solid_result.output_value(output_name="a_num") == 2
    assert solid_result.output_value(output_name="a_string") == "foo"

    assert [se.event_type for se in solid_result.step_events] == [
        DagsterEventType.STEP_START,
        DagsterEventType.STEP_INPUT,
        DagsterEventType.STEP_EXPECTATION_RESULT,
        DagsterEventType.STEP_MATERIALIZATION,
        DagsterEventType.STEP_OUTPUT,
        DagsterEventType.OBJECT_STORE_OPERATION,
        DagsterEventType.STEP_OUTPUT,
        DagsterEventType.OBJECT_STORE_OPERATION,
        DagsterEventType.STEP_SUCCESS,
    ]

    # solids communicate what they did via the event stream, viewable in tools (e.g. dagit)
    (
        _start,
        _input_event,
        expectation_event,
        materialization_event,
        _num_output_event,
        _num_object_store_operation,
        _str_output_event,
        _str_object_store_operation,
        _success,
    ) = solid_result.step_events

    # apologies for verboseness here! we can do better.
    expectation_result = expectation_event.event_specific_data.expectation_result
    assert isinstance(expectation_result, ExpectationResult)
    assert expectation_result.success
    assert expectation_result.label == "positive"

    materialization = materialization_event.event_specific_data.materialization
    assert isinstance(materialization, AssetMaterialization)
    assert materialization.label == "persisted_string"

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/unittesting
cd unittesting