Ask AI

Testing#

Dagster enables you to build testable and maintainable data applications. It provides ways to allow you to unit-test your data applications, separate business logic from environments, and set explicit expectations on uncontrollable inputs.

In data applications, testing computations and jobs is notoriously challenging. Because of this, they often go relatively untested before hitting production. If there is testing in place, these tests are often slow, not run during common developer workflows, and have limited value because of the inability to simulate conditions in the production environment.

We believe the underlying fact is that data applications encode much of their business logic in heavy, external systems. Examples include processing systems like Spark and data warehouses such as Snowflake and Redshift. It is difficult to structure software to isolate these dependencies or nearly impossible to run them in a lightweight manner.

This page demonstrates how Dagster addresses these challenges:


Unit tests in data applications#

Consider the following:

  • Principal: Errors that can be caught by unit tests should be caught by unit tests.
  • Corollary: Do not attempt to unit test for errors that unit tests cannot catch.

Using unit tests without keeping these principles in mind is why the data community frequently treats unit tests with skepticism. It is too often interpreted as simulating an external system such as Spark or a data warehouse in a granular manner. Those are very complex systems that are impossible to emulate faithfully. Do not try to do so.

Unit tests are not acceptance tests. They should not be the judge of whether a computation is correct. However, unit testing -- when properly scoped -- is still valuable in data applications. There are massive classes of errors that we can address without interacting with external services and catch earlier in the process: refactoring errors, syntax errors in interpreted languages, configuration errors, graph structure errors, and so on. Errors caught in a fast feedback loop of unit testing can be addressed orders of magnitude faster than those caught during an expensive batch computation in staging or production.

So, unit tests should be viewed primarily as productivity and code quality tools, leading to more correct calculations. Here we will demonstrate how Dagster conveniently enables unit tests.

Testing a job execution#

The workhorse function for unit-testing a job is the JobDefinition.execute_in_process function. Using this function you can execute a job in process and then test execution properties using the ExecuteInProcessResult object that it returns.

def test_job():
    result = do_math_job.execute_in_process()

    # return type is ExecuteInProcessResult
    assert isinstance(result, ExecuteInProcessResult)
    assert result.success
    # inspect individual op result
    assert result.output_for_node("add_one") == 2
    assert result.output_for_node("add_two") == 3
    assert result.output_for_node("subtract") == -1

Separating business logic from environments#

As noted above, data applications often rely on and encode their business logic in code that is executed by heavy, external dependencies. It means that it is easy and natural to couple your application to a single operating environment. However, then, if you do this, any testing requires your production environment.

To make local testing possible, you may structure your software to, as much as possible, cleanly separate this business logic from your operating environment. This is one of the reasons why Dagster flows through a context object throughout its entire computation.

Attached to the context is a set of user-defined resources. Examples of resources include APIs to data warehouses, Spark clusters, s3 sessions, or some other external dependency or service. Each job contains a set of resources, and multiple jobs can be defined for a given Dagster graph for each set of resources (production, local, testing, etc).

For example, to skip external dependencies in tests, you may find yourself needing to constantly comment and uncomment like:

from dagster import op


@op
def get_data_without_resource():
    dummy_data = [1, 2, 3]
    # Do not call external apis in tests
    # return call_api()
    return dummy_data

Testing graphs#

Dagster allows you to define multiple "jobs" from the same computation graph. With resources, you can modify the op above to:

from dagster import graph, op, ConfigurableResource, OpExecutionContext


class MyApi(ConfigurableResource):
    def call(self): ...


@op
def get_data(api: MyApi):
    return api.call()


@op
def do_something(context: OpExecutionContext, data):
    output = process(data)
    return output


@graph
def download():
    do_something(get_data())


# The prod job for the download graph.
download_job = download.to_job(resource_defs={"api": MyApi()})

In this example, we define the business logic (i.e., jobs and ops) within a computation graph, independent of any particular environment. From this computation graph, we define a production job using the resources that define our production environment.

This is extremely helpful when it comes to testing. We can execute the computation graph with mocked versions of resources, since the computation graph is not tied to any particular enviroment. In order to mock the api resource, we use a helper method mock_resource from the ResourceDefinition class.

def test_local():
    # Since we have access to the computation graph independent of the set of resources, we can
    # test it locally.
    result = download.execute_in_process(
        resources={"api": ResourceDefinition.mock_resource()}
    )
    assert result.success


def run_in_prod():
    download_job.execute_in_process()

For more information, refer to the Resources documentation.


Testing Software-defined Assets#

@asset-decorated functions can be directly invoked, which will invoke the underlying op computation.

A basic asset, with no dependencies:

from dagster import asset


@asset
def basic_asset():
    return 5


# An example unit test for basic_asset.
def test_basic_asset():
    assert basic_asset() == 5

An asset with dependencies:

from dagster import asset


@asset
def asset_with_inputs(x, y):
    return x + y


# An example unit test for asset_with_inputs.
def test_asset_with_inputs():
    assert asset_with_inputs(5, 6) == 11

Testing assets with config#

If your asset has a config schema, you can pass a config value to the invocation. The following asset relies on attached config:

from dagster import asset, Config


class MyAssetConfig(Config):
    my_string: str


@asset
def asset_requires_config(config: MyAssetConfig) -> str:
    return config.my_string


def test_asset_requires_config():
    result = asset_requires_config(config=MyAssetConfig(my_string="foo"))
    ...

Testing assets with resources#

If your asset requires resources, you can specify them as arguments when invoking the asset directly.

Consider the following asset, which requires a resource bar.

from dagster import asset, ConfigurableResource


class BarResource(ConfigurableResource):
    my_string: str


@asset
def asset_requires_bar(bar: BarResource) -> str:
    return bar.my_string


def test_asset_requires_bar():
    result = asset_requires_bar(bar=BarResource(my_string="bar"))
    ...

Testing assets with complex resources#

In order to test assets which rely on complex resources, such as those that build separate clients, a common pattern is to use tools such as the mock library to fake your resource and associated client. See the section on Testing ops with complex resources for an example.

Testing multiple assets together#

You may want to test multiple assets together, to more closely mirror actual materialization. This can be done using the materialize_to_memory method, which loads the materialized results of assets into memory:

from dagster import asset, materialize_to_memory


@asset
def data_source():
    return get_data_from_source()


@asset
def structured_data(data_source):
    return extract_structured_data(data_source)


# An example unit test using materialize_to_memory
def test_data_assets():
    result = materialize_to_memory([data_source, structured_data])
    assert result.success
    # Materialized objects can be accessed in terms of the underlying op
    materialized_data = result.output_for_node("structured_data")
    ...

Mock resources can be provided directly using materialize_to_memory:

from dagster import asset, materialize_to_memory, ConfigurableResource
import mock


class MyServiceResource(ConfigurableResource): ...


@asset
def asset_requires_service(service: MyServiceResource): ...


@asset
def other_asset_requires_service(service: MyServiceResource): ...


def test_assets_require_service():
    # Mock objects can be provided directly.
    result = materialize_to_memory(
        [asset_requires_service, other_asset_requires_service],
        resources={"service": mock.MagicMock()},
    )
    assert result.success
    ...

Testing asset checks#

Functions decorated with @asset_check can be directly invoked. For example:

import pandas as pd

from dagster import AssetCheckResult, asset_check


@asset_check(asset=orders)
def orders_id_has_no_nulls():
    return AssetCheckResult(passed=True)


def test_orders_check():
    assert orders_id_has_no_nulls().passed

Testing ops#

While using the @op decorator on a function does change its signature, the invocation mirrors closely the underlying decorated function.

Consider the following op.

@op
def my_op_to_test():
    return 5

Since it has no arguments, we can invoke it directly.

def test_op_with_invocation():
    assert my_op_to_test() == 5

Consider the following op with inputs.

@op
def my_op_with_inputs(x, y):
    return x + y

We can directly provide values for these inputs to the invocation.

def test_inputs_op_with_invocation():
    assert my_op_with_inputs(5, 6) == 11

Testing ops with config#

If your op has a config schema, you can pass a config value to the invocation. The following op relies on attached config:

from dagster import Config


class MyOpConfig(Config):
    my_int: int


@op
def op_requires_config(config: MyOpConfig):
    return config.my_int * 2

That config can be passed to the invocation.

def test_op_with_config():
    assert op_requires_config(MyOpConfig(my_int=5)) == 10

Testing ops with resources#

If your op requires resources, you can specify them as arguments when invoking the op.

Consider the following op, which requires a resource foo.

from dagster import ConfigurableResource


class FooResource(ConfigurableResource):
    my_string: str


@op
def op_requires_foo(foo: FooResource):
    return f"found {foo.my_string}"

We can directly instantiate and pass an instance of the resource to the invocation.

def test_op_with_resource():
    assert op_requires_foo(FooResource(my_string="bar")) == "found bar"

Note that when directly invoking ops, I/O managers specified on inputs and outputs are not used.

Testing ops with complex resources#

If your ops rely on more complex resources, such as those that build separate clients, a common pattern is to use tools such as the mock library to fake your resource and associated client.

from dagster import ConfigurableResource, op
import mock

class MyClient:
    ...

    def query(self, body: str): ...

class MyClientResource(ConfigurableResource):
    username: str
    password: str

    def get_client(self):
        return MyClient(self.username, self.password)

@op
def my_op(client: MyClientResource):
    return client.get_client().query("SELECT * FROM my_table")

def test_my_op():
    class FakeClient:
        def query(self, body: str):
            assert body == "SELECT * FROM my_table"
            return "my_result"

    mocked_client_resource = mock.Mock()
    mocked_client_resource.get_client.return_value = FakeClient()

    assert my_op(mocked_client_resource) == "my_result"

Testing ops that rely on context#

Finally, if your op relies on context, you can build and pass a context. The following op uses the context logger:

@op
def context_op(context: OpExecutionContext):
    context.log.info(f"My run ID is {context.run_id}")

We can build a context and pass it to the invocation using the build_op_context utility:

def test_op_with_context():
    context = build_op_context()
    context_op(context)

Testing jobs#

Testing job execution with config#

Sometimes, you may want to test with different configuration. You can execute job with a specified run config via the run_config:

from dagster import RunConfig


def test_job_with_config():
    result = do_math_job.execute_in_process(
        run_config=RunConfig(
            ops={
                "add_one": AddOneConfig(num=2),
                "add_two": AddTwoConfig(num=3),
            }
        )
    )

    assert result.success

    assert result.output_for_node("add_one") == 3
    assert result.output_for_node("add_two") == 5
    assert result.output_for_node("subtract") == -2

Testing jobs with top-level inputs#

You can wire inputs from the top-level of a job to the constituent ops. Consider the following op and graph:

from dagster import graph, op


@op
def op_with_input(x):
    return do_something(x)


@graph
def wires_input(x):
    op_with_input(x)

Turn the graph into a job by calling GraphDefinition.to_job, and provide a value to the input x using the input_values argument:

the_job = wires_input.to_job(input_values={"x": 5})

You can also provide input values using JobDefinition.execute_in_process or GraphDefinition.execute_in_process:

graph_result = wires_input.execute_in_process(input_values={"x": 5})

job_result = the_job.execute_in_process(
    input_values={"x": 6}
)  # Overrides existing input value

Testing the event stream#

The event stream is the most generic way that an op communicates what happened during its computation. Ops communicate events for starting, input/output type checking, and user-provided events such as expectations, materializations, and outputs.

def test_event_stream():
    job_result = emit_events_job.execute_in_process()

    assert job_result.success

    # when one op has multiple outputs, you need to specify output name
    assert job_result.output_for_node("emit_events_op", output_name="a_num") == 2

    events_for_step = job_result.events_for_node("emit_events_op")
    assert [se.event_type for se in events_for_step] == [
        DagsterEventType.STEP_START,
        DagsterEventType.STEP_EXPECTATION_RESULT,
        DagsterEventType.ASSET_MATERIALIZATION,
        DagsterEventType.STEP_OUTPUT,
        DagsterEventType.HANDLED_OUTPUT,
        DagsterEventType.STEP_SUCCESS,
    ]

    # ops communicate what they did via the event stream, viewable in tools (e.g. the Dagster UI)
    (
        _start,
        expectation_event,
        materialization_event,
        _num_output_event,
        _num_handled_output_operation,
        _success,
    ) = events_for_step

    # apologies for verboseness here! we can do better.
    expectation_result = expectation_event.event_specific_data.expectation_result
    assert isinstance(expectation_result, ExpectationResult)
    assert expectation_result.success
    assert expectation_result.label == "positive"

    materialization = materialization_event.event_specific_data.materialization
    assert isinstance(materialization, AssetMaterialization)
    assert materialization.label == "persisted_string"

APIs in this guide#

NameDescription
JobDefinition.execute_in_processA method to execute a job synchronously, typically for testing job execution or running standalone scripts.
build_op_contextA method to construct an OpExecutionContext, typically to provide to the invocation of an op for testing.
build_asset_contextA method to construct an AssetExecutionContext, typically to provide to the invocation of an asset for testing.
materialize_to_memoryEphemerally materializes a provided list of assets for testing.