Fan-in Pipeline

You can find the code for this example on Github.

This pipeline demonstrates how to create a "fan-in" dependency, which is when a solid waits for a set of other solids to complete before continuing processing. In this example, sum_fan_in collects a list of SolidOutputHandles, one for each return_one solid, which are passed as a List[int] input to sum_fan_in.

In the case where the preceding solids are not of the same type, the SolidOutputHandles can be passed into the "fan-in" solid individually (instead of in a List).

repo.py
from dagster import InputDefinition, List, OutputDefinition, pipeline, repository, solid


@solid(output_defs=[OutputDefinition(int)])
def return_one(_):
    return 1


@solid(input_defs=[InputDefinition("nums", List[int])], output_defs=[OutputDefinition(int)])
def sum_fan_in(_, nums):
    return sum(nums)


@pipeline
def fan_in_pipeline():
    fan_outs = []
    for i in range(0, 10):
        fan_outs.append(return_one.alias("return_one_{}".format(i))())
    sum_fan_in(fan_outs)


@repository
def fan_in_pipeline_repository():
    return [fan_in_pipeline]

An example with configurable levels of fan-ins and fan-outs can be found here.

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/fan_in_pipeline
cd fan_in_pipeline