DagsterDocs

Partitions#

Dagster provides the Partition Set abstraction for pipelines where each run deals with a subset of data.

Relevant APIs#

NameDescription
PartitionClass defining a logical slice of pipeline configuration.
PartitionSetDefinitionClass defining a set of Partition objects.
validate_run_configA function used to verify that a run config dictionary is valid against a pipeline's config schema.

Overview#

Partition sets let you define a set of logical "partitions", usually time windows, along with a scheme for building pipeline run config for a partition. Having defined a partition set, you can kick off a pipeline run or set of pipeline runs by simply selecting partitions in the set.

Partitions have two main uses:

  • Partitioned Schedules: You can construct a schedule that targets a single partition for each run it launches. For example, a pipeline might run each day and process the data that arrived during the previous day.
  • Backfills: You can launch a set of pipeline runs all at once, each run targeting one of the partitions in the set. For example, after making a code change, you might want to re-run your pipeline on every date that it has run on in the past.

Defining a Partition Set#

Here's a pipeline that computes some data for a given date.

@solid(config_schema={"date": str})
def process_data_for_date(context):
    date = context.solid_config["date"]
    context.log.info(f"processing data for {date}")


@solid
def post_slack_message(context):
    context.log.info("posting slack message")


@pipeline
def my_data_pipeline():
    process_data_for_date()
    post_slack_message()

The solid process_data_for_date takes, as config, a string date. This piece of config will define which date to compute data for. For example, if we wanted to compute for May 5th, 2020, we would execute the pipeline with the following config:

solids:
  process_data_for_date:
    config:
      date: "2020-05-05"

You can define a PartitionSetDefinition that defines the full set of partitions and how to define the run config for a given partition.

def get_date_partitions():
    """Every day in the month of May, 2020"""
    return [Partition(f"2020-05-{str(day).zfill(2)}") for day in range(1, 32)]


def run_config_for_date_partition(partition):
    date = partition.value
    return {"solids": {"process_data_for_date": {"config": {"date": date}}}}


date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_data_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
)

To add the partition set to the repository and view it in Dagit, you must first include it in the repository definition.

@repository
def my_repository():
    return [
        my_data_pipeline,
        date_partition_set,
    ]

Creating Schedules from Partition Sets#

To create a schedule from a partition set, you must first define a partition selector. A partition selector is a function which takes a ScheduleEvaluationContext and a PartitionSetDefinition and returns a Partition object or a list of Partition objects. Every time the schedule fires, a pipeline run will be submitted for each partition returned by this partition selector.

Once you have created your partition selector, you can manually create a schedule using the method PartitionSetDefinition.create_schedule_definition.

In the following example, we first define a static partition set with partitions representing days of the week. We then create a partition selector, which maps the schedule execution time to the appropriate partition. Finally, we create a schedule that kicks off a pipeline run for the selected partition.

weekday_partition_set = PartitionSetDefinition(
    name="weekday_partition_set",
    pipeline_name="my_data_pipeline",
    partition_fn=lambda: [
        Partition("Monday"),
        Partition("Tuesday"),
        Partition("Wednesday"),
        Partition("Thursday"),
        Partition("Friday"),
        Partition("Saturday"),
        Partition("Sunday"),
    ],
    run_config_fn_for_partition=_weekday_run_config_for_partition,
)


def weekday_partition_selector(
    ctx: ScheduleEvaluationContext, partition_set: PartitionSetDefinition
) -> Union[Partition, List[Partition]]:
    """Maps a schedule execution time to the corresponding partition or list of partitions that
    should be executed at that time"""
    partitions = partition_set.get_partitions(ctx.scheduled_execution_time)
    weekday = ctx.scheduled_execution_time.weekday() if ctx.scheduled_execution_time else 0
    return partitions[weekday]


my_schedule = weekday_partition_set.create_schedule_definition(
    "my_schedule",
    "5 0 * * *",
    partition_selector=weekday_partition_selector,
    execution_timezone="US/Eastern",
)


@repository
def my_repository_with_partitioned_schedule():
    return [
        my_data_pipeline,
        weekday_partition_set,
        my_schedule,
    ]

For partition sets consisting of datetime objects, the create_offset_partition_selector is a useful factory function for selecting partitions derived from the execution time. For example, you might want to fire your schedule every night and select the previous day's partition. The identity_partition_selector is also a useful partition selector, where the selected partition's value is the schedule execution time.

Dagster provides a number of decorators (@hourly_schedule, @daily_schedule, @weekly_schedule, @monthly_schedule) that produce partitioned schedules. These schedule decorators construct both the partition set that defines the configuration space of a specific pipeline as well as the schedule that it runs against. See our Schedules overview for more information.

Testing a Partition Set#

You can test a partition set by initializing the set of partitions, and then evaluating the run config produced by your partition set against your pipeline using the validate_run_config function.

from dagster import validate_run_config


def test_my_partition_set():
    for partition in date_partition_set.partition_fn():
        run_config = date_partition_set.run_config_for_partition(partition)
        assert validate_run_config(my_data_pipeline, run_config)

Partitions in Dagit#

The Partitions Tab#

In Dagit, you can view runs by partition in the Partitions tab of a Pipeline page.

In the "Run Matrix", each column corresponds to one of the partitions in the partition set. Each row corresponds to one of the steps in the pipeline.

Partitions Tab

You can click on individual boxes to see the history of runs for that step and partition.

Partition Step Modal

Launching Partitioned Runs from the Playground#

You can view and use partitions in the Dagit playground view for a pipeline. In the top bar, you can select from the list of all available partition sets, then choose a specific partition. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the date_partition_set and the 2020-05-01 partition, and we can see that the correct run config for the partition has been populated in the editor.

Partitions in Dagit Playground