Skip to main content

Dynamic fanout

In this example, we'll explore how to implement dynamic fanout patterns in Dagster. Dynamic fanout is useful when you need to process a variable number of items in parallel, where the exact count isn't known until runtime. This is particularly valuable for data processing pipelines that need to handle varying workloads efficiently.

Problem: Variable workload processing

Imagine you have a data processing pipeline that receives multiple related records, where each record contains 10-30 processing units that require expensive computation. Without dynamic fanout, you'd have to do one of the following:

  • Process everything sequentially (slow)
  • Pre-define a fixed number of parallel processes (inflexible)
  • Process all units in a single large operation (difficult to debug and monitor)

The challenge is creating a pipeline that can:

  • Dynamically spawn sub-pipelines based on input data
  • Process each record's units in parallel
  • Collect and aggregate results efficiently
  • Maintain proper lineage and observability

Solution 1: Sequential processing within sub-pipelines

The first approach uses dynamic fanout to create parallel sub-pipelines for each record, but processes units within each sub-pipeline sequentially. This provides the first layer of parallelization.

src/project_mini/defs/dynamic_fanout/dynamic_fanout.py
@dg.op
def sub_pipeline_process_record_option_a(
context: dg.OpExecutionContext, record: dict[str, Any]
) -> dict[str, Any]:
"""SUB-PIPELINE: Complete processing workflow for a single data record.

1. "Extract processing units from the record (10-30 units per record)"
2. "Each unit goes through processing (Second layer of parallelization)"
[Currently sequential as specified]
3. "Results are aggregated to create final record output"
"""
context.log.info(f"Sub-pipeline processing record: {record['id']}")

# Step 1: Extract processing units from record (10-30 units)
processing_units = extract_processing_units(record)
context.log.info(f"Extracted {len(processing_units)} units from {record['id']}")

# Step 2: Process each unit (Second layer of parallelization)
# Currently sequential as specified, but can be parallelized when ready
unit_results = []

# Sequential processing (current implementation)
for i, unit in enumerate(processing_units):
context.log.info(f"Processing unit {i + 1}/{len(processing_units)} for {record['id']}")
result = process_single_unit(unit)
unit_results.append(result)

# Step 3: Aggregate results to create final record output
aggregated_output = aggregate_unit_results(record, unit_results)

context.log.info(
f"Sub-pipeline completed for {record['id']}: aggregated {len(unit_results)} unit results"
)

return {
"record_id": record["id"],
"sub_pipeline_result": aggregated_output,
"units_processed": len(unit_results),
"original_record": record,
}

This approach creates a sub-pipeline for each input record:

  1. Dynamic trigger: Uses DynamicOut to create one sub-pipeline per input record
  2. Sequential unit processing: Processes 10-30 units within each sub-pipeline sequentially
  3. Result aggregation: Combines unit results into a final record output
Processing layerParallelizationUnits per record
Record-levelParallel1 sub-pipeline per record
Unit-levelSequential10-30 units processed in order

Solution 2: Parallel processing within sub-pipelines

The second approach adds a second layer of parallelization by processing units within each sub-pipeline in parallel using multiprocessing.

src/project_mini/defs/dynamic_fanout/dynamic_fanout.py
Loading...

This enhanced approach provides two layers of parallelization:

  1. Record-level parallelization: Multiple sub-pipelines run simultaneously
  2. Unit-level parallelization: Within each sub-pipeline, units are processed using a multiprocessing pool
  3. Automatic scaling: Number of processes adapts to available CPU cores and workload size
Processing layerParallelizationPerformance benefit
Record-levelParallelScales with number of records
Unit-levelParallelScales with CPU cores

Complete pipeline implementation

The complete pipeline uses a graph-backed asset to orchestrate the dynamic fanout pattern:

src/project_mini/defs/dynamic_fanout/dynamic_fanout.py
Loading...

Key features of this implementation:

  1. Dynamic triggering: trigger_sub_pipelines() creates sub-pipelines based on input data
  2. Parallel execution: .map() processes each record in its own sub-pipeline
  3. Synchronization barrier: .collect() ensures all sub-pipelines complete before proceeding
  4. Result aggregation: collect_sub_pipeline_results() processes all results together

The graph-backed asset approach provides:

  • Full observability: Each sub-pipeline execution is tracked individually
  • Proper lineage: Clear dependency relationships between operations
  • Fault tolerance: Failed sub-pipelines can be retried independently
  • Scalability: Automatically adapts to varying input sizes