Testing partitioned config and jobs#

This guide is applicable to both asset definitions and ops.

In this guide, we'll cover a few ways to test your partitioned config and jobs.


Before continuing, you should be familiar with:

Testing partitioned config#

Invoking a PartitionedConfig object directly invokes the decorated function.

If you want to check whether the generated run config is valid for the config of a job, you can use the validate_run_config function.

from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime

@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}

def test_my_partitioned_config():
    # assert that the decorated function returns the expected output
    run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
    assert run_config == {
        "ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}

    # assert that the output of the decorated function is valid configuration for the
    # partitioned_op_job job
    assert validate_run_config(partitioned_op_job, run_config)

If you want to test that a PartitionedConfig creates the partitions you expect, use the get_partition_keys or get_run_config_for_partition_key functions:

from dagster import Config, OpExecutionContext

@daily_partitioned_config(start_date=datetime(2020, 1, 1), minute_offset=15)
def my_offset_partitioned_config(start: datetime, _end: datetime):
    return {
        "ops": {
            "process_data": {
                "config": {
                    "start": start.strftime("%Y-%m-%d-%H:%M"),
                    "end": _end.strftime("%Y-%m-%d-%H:%M"),

class ProcessDataConfig(Config):
    start: str
    end: str

def process_data(context: OpExecutionContext, config: ProcessDataConfig):
    s = config.start
    e = config.end"processing data for {s} - {e}")

def do_more_stuff_partitioned():

def test_my_offset_partitioned_config():
    # test that the partition keys are what you expect
    keys = my_offset_partitioned_config.get_partition_keys()
    assert keys[0] == "2020-01-01"
    assert keys[1] == "2020-01-02"

    # test that the run_config for a partition is valid for partitioned_op_job
    run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])
    assert validate_run_config(do_more_stuff_partitioned, run_config)

    # test that the contents of run_config are what you expect
    assert run_config == {
        "ops": {
            "process_data": {
                "config": {"start": "2020-01-01-00:15", "end": "2020-01-02-00:15"}

Testing partitioned jobs#

To run a partitioned job in-process on a particular partition, supply a value for the partition_key argument of JobDefinition.execute_in_process. For example:

def test_partitioned_op_job():
    assert partitioned_op_job.execute_in_process(partition_key="2020-01-01").success