Skip to main content

Partition backfill strategies

In this example, we'll explore three different strategies for backfilling partitioned assets. When you need to materialize multiple partitions (for initial setup or reprocessing), you can choose between Dagster's default one-run-per-partition approach, a batched approach, or a single-run approach using BackfillPolicy. Each strategy has distinct trade-offs in terms of overhead, fault isolation, and resource utilization.

FactorOne per partitionBatchedSingle run
Run overheadHigh (N runs)Medium (N/batch runs)Low (1 run)
Fault isolationBestModerateNone
Retry cost1 partition1 batchAll partitions
ObservabilityPer partitionPer batchAggregate only

Problem: Backfilling 100 days of historical data

Imagine you need to backfill 100 days of historical event data. Each day's data needs to be processed and stored. Without optimization, this could mean launching 100 separate runs, each with its own overhead. But processing everything in one run means a single failure requires reprocessing all 100 days.

The key question is: How should you batch your partitions to balance overhead, fault isolation, and performance?

SolutionBest for
One run per partition (default)Unreliable data sources, API rate limits, fine-grained retry capability, per-partition observability
Batched runsReducing overhead while maintaining fault isolation, short processing times, initial backfills
Single runSpark/Snowflake/Databricks, range-based queries, minimizing Dagster Cloud credits

Solution 1: One run per partition (default)

By default, Dagster launches one run per partition. This provides maximum observability and fault isolation—if one partition fails, others continue independently, and partitions are individually retried. For 100 partitions, this creates 100 separate runs, each with its own startup overhead. This approach is best when your data source is unreliable (API rate limits, transient failures), you need fine-grained retry capability for individual partitions, or per-partition observability is critical.

src/project_mini/defs/partition_backfill_strategies/multi_run_backfill.py
import dagster as dg

daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")


@dg.asset(partitions_def=daily_partitions)
def daily_events(context: dg.AssetExecutionContext):
"""Process events for a single day. Each partition runs separately."""
partition_date = context.partition_key
context.log.info(f"Processing events for {partition_date}")

# Process data for this single partition
events = fetch_events_for_date(partition_date)
processed = transform_events(events)

context.log.info(f"Processed {len(processed)} events for {partition_date}")
return processed


def fetch_events_for_date(date: str) -> list:
# Simulate fetching events for a specific date
return [{"date": date, "event_id": i} for i in range(100)]


def transform_events(events: list) -> list:
# Simulate transformation
return [{"processed": True, **e} for e in events]

Solution 2: Batched runs

With BackfillPolicy.multi_run, Dagster groups partitions into batches. For example, with max_partitions_per_run=10, 100 partitions become 10 runs of 10 partitions each. This reduces overhead by 90% while maintaining moderate fault isolation—if one partition fails, only its batch of 10 needs to retry. This approach works well when you want to reduce overhead while maintaining some fault isolation, processing time per partition is short (seconds to a few minutes), or you're doing an initial backfill of many partitions.

src/project_mini/defs/partition_backfill_strategies/batched_backfill.py
import dagster as dg

daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")


@dg.asset(
partitions_def=daily_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def daily_events(context: dg.AssetExecutionContext):
"""Process events for a batch of days in each run."""
partition_keys = context.partition_keys
context.log.info(f"Processing {len(partition_keys)} partitions in this run")

# Process data for all partitions in this batch
all_events = []
for partition_key in partition_keys:
events = fetch_events_for_date(partition_key)
all_events.extend(events)

processed = transform_events(all_events)
context.log.info(f"Processed {len(processed)} total events across {len(partition_keys)} days")
return processed


def fetch_events_for_date(date: str) -> list:
# Simulate fetching events for a specific date
return [{"date": date, "event_id": i} for i in range(100)]


def transform_events(events: list) -> list:
# Simulate transformation
return [{"processed": True, **e} for e in events]

When using BackfillPolicy.multi_run, consider:

  • Overhead reduction: Batch size of 10 reduces runs by 90%
  • Failure blast radius: If one partition fails, the entire batch retries
  • Memory usage: More partitions per run may require more memory
  • Processing model: Sequential processing means larger batches take longer

Recommended starting points:

Partition processing timeSuggested batch size
Under 1 minute20-50
1-5 minutes10-20
5-15 minutes5-10
Over 15 minutes1-5 or single run

Adjust based on observed failure rates and infrastructure constraints. For more information on parallelization within batched runs, see Parallelization within batched runs.

Solution 3: Single run

With BackfillPolicy.single_run, Dagster processes all selected partitions in one run, eliminating per-run overhead entirely. For 100 partitions, this creates just 1 run. However, a failure requires retrying all partitions together. This approach is ideal when you're using a parallel-processing engine (Spark, Snowflake, Databricks), your queries naturally operate on date ranges, or you want to minimize Dagster Cloud credit consumption.

src/project_mini/defs/partition_backfill_strategies/single_run_backfill.py
import dagster as dg

daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")


@dg.asset(
partitions_def=daily_partitions,
backfill_policy=dg.BackfillPolicy.single_run(),
)
def daily_events(context: dg.AssetExecutionContext):
"""Process events for multiple days in a single run."""
start_datetime, end_datetime = context.partition_time_window
context.log.info(f"Processing events from {start_datetime} to {end_datetime}")

# Process data for the entire partition range at once
events = fetch_events_for_range(start_datetime, end_datetime)
processed = transform_events(events)

context.log.info(f"Processed {len(processed)} events in single run")
return processed


def fetch_events_for_range(start, end) -> list:
# Simulate fetching events for a date range (e.g., SQL WHERE clause)
return [{"start": str(start), "end": str(end), "event_id": i} for i in range(1000)]


def transform_events(events: list) -> list:
# Simulate transformation
return [{"processed": True, **e} for e in events]
ScenarioRecommended strategy
API with rate limits or transient failuresOne per partition
Short processing time, reliable sourceBatched (10-50 per run)
Spark/Snowflake with range queriesSingle run
Cost optimization in Dagster CloudSingle run or large batches
Initial backfill of 1000+ partitionsBatched (50-100 per run)

Parallelization within batched runs

When using BackfillPolicy.multi_run, you get multiple partitions in a single run. Here are different ways to parallelize processing within that run.

StrategyBest forMax concurrencyOverheadComplexity
Batch querySQL databasesN/A (single query)Very lowVery low
Thread poolI/O-bound tasks10-100 threadsLowLow
Process poolCPU-bound tasksNumber of CPU coresMediumLow

Strategy 1: Batch query (fastest for databases)

Process all partitions in a single database query:

src/project_mini/defs/partition_backfill_strategies/parallel_batch_query.py
import dagster as dg

customer_partitions = dg.StaticPartitionsDefinition(
["customer_a", "customer_b", "customer_c", "customer_d", "customer_e"]
)


@dg.asset(
partitions_def=customer_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def process_customers_batch(context: dg.AssetExecutionContext) -> None:
"""Process all partitions in a single database query."""
customer_ids = context.partition_keys

# Single query for all customers - most efficient for databases
query = "SELECT * FROM customers WHERE customer_id IN %s"
results = execute_query(query, customer_ids)

context.log.info(f"Processed {len(results)} customers in single query")


def execute_query(query: str, params: list) -> list:
# Simulated database query
return [{"customer_id": p, "data": "..."} for p in params]

Best for: SQL databases, REST APIs with batch endpoints

Strategy 2: Thread pool (I/O-bound operations)

Use threads for parallel I/O operations:

src/project_mini/defs/partition_backfill_strategies/parallel_threadpool.py
from concurrent.futures import ThreadPoolExecutor

import dagster as dg

customer_partitions = dg.StaticPartitionsDefinition(
["customer_a", "customer_b", "customer_c", "customer_d", "customer_e"]
)


@dg.asset(
partitions_def=customer_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def fetch_customer_data(context: dg.AssetExecutionContext) -> None:
"""Use threads for parallel I/O operations."""
customer_ids = context.partition_keys

def fetch_one(customer_id):
# Simulated API call
return {"customer_id": customer_id, "data": f"data for {customer_id}"}

# Process up to 5 customers concurrently
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_one, customer_ids))

context.log.info(f"Fetched {len(results)} customers with thread pool")

Best for: HTTP requests, file I/O, database queries

Parallelism: Limited by max_workers (5 concurrent in this example)

Strategy 3: Process pool (CPU-bound operations)

Use processes for parallel CPU-intensive work:

src/project_mini/defs/partition_backfill_strategies/parallel_processpool.py
from concurrent.futures import ProcessPoolExecutor

import dagster as dg

customer_partitions = dg.StaticPartitionsDefinition(
["customer_a", "customer_b", "customer_c", "customer_d", "customer_e"]
)


def analyze_one(customer_id: str) -> dict:
"""CPU-intensive analysis - must be defined at module level for ProcessPool."""
# Simulated CPU-intensive work
result = sum(i * i for i in range(100000))
return {"customer_id": customer_id, "analysis": result}


@dg.asset(
partitions_def=customer_partitions,
backfill_policy=dg.BackfillPolicy.multi_run(max_partitions_per_run=10),
)
def analyze_customer_data(context: dg.AssetExecutionContext) -> None:
"""Use processes for parallel CPU-intensive work."""
customer_ids = context.partition_keys

# Use multiple CPU cores
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(analyze_one, customer_ids))

context.log.info(f"Analyzed {len(results)} customers with process pool")

Best for: CPU-intensive computations, data transformations

Parallelism: Limited by CPU cores