Dynamic fanout
In this example, we'll explore how to implement dynamic fanout patterns in Dagster. Dynamic fanout is useful when you need to process a variable number of items in parallel, where the exact count isn't known until runtime. This is particularly valuable for data processing pipelines that need to handle varying workloads efficiently.
Problem: Variable workload processing
Imagine you have a data processing pipeline that receives multiple related records, where each record contains 10-30 processing units that require expensive computation. Without dynamic fanout, you'd have to do one of the following:
- Process everything sequentially (slow)
- Pre-define a fixed number of parallel processes (inflexible)
- Process all units in a single large operation (difficult to debug and monitor)
The challenge is creating a pipeline that can:
- Dynamically spawn sub-pipelines based on input data
- Process each record's units in parallel
- Collect and aggregate results efficiently
- Maintain proper lineage and observability
Solution 1: Sequential processing within sub-pipelines
The first approach uses dynamic fanout to create parallel sub-pipelines for each record, but processes units within each sub-pipeline sequentially. This provides the first layer of parallelization.
@dg.op
def sub_pipeline_process_record_option_a(
context: dg.OpExecutionContext, record: dict[str, Any]
) -> dict[str, Any]:
"""SUB-PIPELINE: Complete processing workflow for a single data record.
1. "Extract processing units from the record (10-30 units per record)"
2. "Each unit goes through processing (Second layer of parallelization)"
[Currently sequential as specified]
3. "Results are aggregated to create final record output"
"""
context.log.info(f"Sub-pipeline processing record: {record['id']}")
# Step 1: Extract processing units from record (10-30 units)
processing_units = extract_processing_units(record)
context.log.info(f"Extracted {len(processing_units)} units from {record['id']}")
# Step 2: Process each unit (Second layer of parallelization)
# Currently sequential as specified, but can be parallelized when ready
unit_results = []
# Sequential processing (current implementation)
for i, unit in enumerate(processing_units):
context.log.info(f"Processing unit {i + 1}/{len(processing_units)} for {record['id']}")
result = process_single_unit(unit)
unit_results.append(result)
# Step 3: Aggregate results to create final record output
aggregated_output = aggregate_unit_results(record, unit_results)
context.log.info(
f"Sub-pipeline completed for {record['id']}: aggregated {len(unit_results)} unit results"
)
return {
"record_id": record["id"],
"sub_pipeline_result": aggregated_output,
"units_processed": len(unit_results),
"original_record": record,
}
This approach creates a sub-pipeline for each input record:
- Dynamic trigger: Uses
DynamicOut
to create one sub-pipeline per input record - Sequential unit processing: Processes 10-30 units within each sub-pipeline sequentially
- Result aggregation: Combines unit results into a final record output
Processing layer | Parallelization | Units per record |
---|---|---|
Record-level | Parallel | 1 sub-pipeline per record |
Unit-level | Sequential | 10-30 units processed in order |
Solution 2: Parallel processing within sub-pipelines
The second approach adds a second layer of parallelization by processing units within each sub-pipeline in parallel using multiprocessing.
Loading...
This enhanced approach provides two layers of parallelization:
- Record-level parallelization: Multiple sub-pipelines run simultaneously
- Unit-level parallelization: Within each sub-pipeline, units are processed using a multiprocessing pool
- Automatic scaling: Number of processes adapts to available CPU cores and workload size
Processing layer | Parallelization | Performance benefit |
---|---|---|
Record-level | Parallel | Scales with number of records |
Unit-level | Parallel | Scales with CPU cores |
Complete pipeline implementation
The complete pipeline uses a graph-backed asset to orchestrate the dynamic fanout pattern:
Loading...
Key features of this implementation:
- Dynamic triggering:
trigger_sub_pipelines()
creates sub-pipelines based on input data - Parallel execution:
.map()
processes each record in its own sub-pipeline - Synchronization barrier:
.collect()
ensures all sub-pipelines complete before proceeding - Result aggregation:
collect_sub_pipeline_results()
processes all results together
The graph-backed asset approach provides:
- Full observability: Each sub-pipeline execution is tracked individually
- Proper lineage: Clear dependency relationships between operations
- Fault tolerance: Failed sub-pipelines can be retried independently
- Scalability: Automatically adapts to varying input sizes