Conditional Execution

This example demonstrates how to conditionally execute part of a pipeline based on what happens inside a solid.

In this example, we choose a branch of the pipeline to execute at random. In branching_solid, we randomly generate a number. If it's zero, we execute branch_1_solid. Otherwise, we execute branch_2_solid.

Dagster's mechanism for conditional execution is non-required outputs. For any solid output definition, we can set the is_required argument to False. If any of the inputs to a solid come from non-required outputs, and any of those non-required outputs are not yielded by the upstream solid, then the solid won't run.

In this example, branching_solid only yields an Output for branch_1 if it picks zero. If one is chosen, no output for branch_1 will be yielded, so branch_1_solid will be missing an input, and it won't run.
        OutputDefinition(int, "branch_1", is_required=False),
        OutputDefinition(int, "branch_2", is_required=False),
def branching_solid(_):
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
        yield Output(2, "branch_2")

@solid(input_defs=[InputDefinition("_input", int)])
def branch_1_solid(_, _input):

@solid(input_defs=[InputDefinition("_input", int)])
def branch_2_solid(_, _input):

def my_pipeline():
    branch_1, branch_2 = branching_solid()

Open in a playground

Open in Gitpod


curl | tar -xz --strip=2 dagster-master/examples/conditional_execution
cd conditional_execution