Partition Sets

Dagster provides the partition set abstraction for pipelines where each run deals with a subset of data.

Users define a set of logical "partitions", usually time windows, along with a scheme for building pipeline config from a partition name. With this, they can kick off a pipeline run or set of pipeline runs by simply selecting a partition or set of partitions.

Partitions have two main uses:

  • Scheduling - e.g. a pipeline runs each day and processes the data that arrived during the previous day as a single partition.
  • Backfills - all at once, kick off a pipeline run per partition, e.g. to reprocess the data that arrived for each day in the history.

Defining a Partition Set

To define a partition set, we use the PartitionSetDefinition class.

A partition set targets a pipeline, and must define two functions:

  • partition_fn: Returns a list of Partition.
  • run_config_fn_for_partition: Given a Partition, returns the run configuration that parameterizes the execution for this partition, as a dict.

Example

For example, here's a pipeline that computes some data for a given date.

pipeline.py
@solid(config_schema={"date": str})
def process_data_for_date(context):
    date = context.solid_config["date"]
    context.log.info(f"processing data for {date}")


@solid
def post_slack_message(context):
    context.log.info("posting slack message")


@pipeline
def my_data_pipeline():
    process_data_for_date()
    post_slack_message()

The solid process_data_for_date takes, as config, a string date. This piece of config will define which date to compute data for. For example, if we wanted to compute for May 5th, 2020, we would execute the pipeline with the following config:

config.yaml
solids:
  process_data_for_date:
    config:
      date: "2020-05-05"

We can define a PartitionSetDefinition that defines the full set of partitions and how to define the run config for a given partition.

partition_definition.py
def get_date_partitions():
    """Every day in the month of May, 2020"""
    return [Partition(f"2020-05-{str(day).zfill(2)}") for day in range(1, 32)]


def run_config_for_date_partition(partition):
    date = partition.value
    return {"solids": {"process_data_for_date": {"config": {"date": date}}}}


date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_data_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
)

Partitions in Dagit

The Partitions Tab

You can view runs by partition in the Partitions tab of a Pipeline page.

In the "Run Matrix", each column corresponds to one of the partitions in the partition set. Each row corresponds to one of the steps in the pipeline.

Partitions Tab

You can click on individual boxes to see the history of runs for that step and partition.

Partition Step Modal

Launching Partitioned Runs from the Playground

You can view and use partitions in the Dagit playground view for a pipeline. In the top bar, you can select from the list of all available partition sets, then choose a specific partition. Within the config editor, the config for the selected partition will be populated.

In the screenshot below, we select the date_partition_set and the 2020-05-01 partition, and we can see that the correct run config for the partition has been populated in the editor.

Partitions in Dagit Playground