Partition Sets

Dagster provides the partition set abstraction for pipelines where each run deals with a subset of data.

Users define a set of logical "partitions", usually time windows, along with a scheme for building pipeline config from a partition name. With this, they can kick off a pipeline run or set of pipeline runs by simply selecting a partition or set of partitions.

Partitions have two main uses:

  • Scheduling - e.g. a pipeline runs each day and processes the data that arrived during the previous day as a single partition.
  • Backfills - all at once, kick off a pipeline run per partition, e.g. to reprocess the data that arrived for each day in the history.

Defining a Partition Set

To define a partition set, we use the PartitionSetDefinition class.

A partition set targets a pipeline, and must define the two functions:

  • partition_fn: Returns a list of Partition.
  • run_config_fn_for_partition: Given a Partition, returns the run configuration that parameterizes the execution for this partition, as a dict.

Example

For example, we have a pipeline that computes metrics data for a given day of the week.

pipeline.py
@solid(config_schema={"day_of_week": str})
def process_data_for_day(context):
    day_of_week = context.solid_config["day_of_week"]
    context.log.info(day_of_week)


@pipeline
def my_pipeline():
    process_data_for_day()

The solid process_data_for_day takes, as config, a string day_of_week. This piece of config will define which day to compute metrics data for. For example, if we wanted to compute metrics for Monday ("M"), we would execute the pipeline with the following config:

config.yaml
solids:
  process_data_for_day:
    config:
      day_of_week: "M"

We can define a PartitionSetDefinition that defines the full set of partitions and how to define the run config for a given partition.

partition_definition.py
def get_day_partition():
    return [
        Partition("M"),
        Partition("Tu"),
        Partition("W"),
        Partition("Th"),
        Partition("F"),
        Partition("Sa"),
        Partition("Su"),
    ]


def run_config_for_day_partition(partition):
    day = partition.value
    return {"solids": {"process_data_for_day": {"config": {"day_of_week": day}}}}


day_partition_set = PartitionSetDefinition(
    name="day_partition_set",
    pipeline_name="my_pipeline",
    partition_fn=get_day_partition,
    run_config_fn_for_partition=run_config_for_day_partition,
)

Using Partitions in Dagit

You can view and use partitions in the Dagit playground view for a pipeline. In the top bar, you can select from the list of all available partition sets, then choose a specific partition. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the day_partition_set and the W partition, and we can see that the correct run config for the partition has been populated in the editor.

Partitions in Dagit Playground