Conditional Execution

This example demonstrates how to conditionally execute part of a pipeline based on what happens inside a solid.

In this example, we choose a branch of the pipeline to execute at random. In branching_solid, we randomly generate a number. If it's zero, we execute branch_1_solid. Otherwise, we execute branch_2_solid.

Dagster's mechanism for conditional execution is non-required outputs. For any solid output definition, we can set the is_required argument to False. If any of the inputs to a solid come from non-required outputs, and any of those non-required outputs are not yielded by the upstream solid, then the solid won't run.

In this example, branching_solid only yields an Output for branch_1 if it picks zero. If one is chosen, no output for branch_1 will be yielded, so branch_1_solid will be missing an input, and it won't run.

repo.py
@solid(
    output_defs=[
        OutputDefinition(int, "branch_1", is_required=False),
        OutputDefinition(int, "branch_2", is_required=False),
    ]
)
def branching_solid(_):
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
    else:
        yield Output(2, "branch_2")


@solid(input_defs=[InputDefinition("_input", int)])
def branch_1_solid(_, _input):
    pass


@solid(input_defs=[InputDefinition("_input", int)])
def branch_2_solid(_, _input):
    pass


@pipeline
def my_pipeline():
    branch_1, branch_2 = branching_solid()
    branch_1_solid(branch_1)
    branch_2_solid(branch_2)

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/conditional_execution
cd conditional_execution