Conditional Execution
This example demonstrates how to conditionally execute part of a pipeline based on what happens inside a solid.
In this example, we choose a branch of the pipeline to execute at random. In branching_solid
, we randomly generate a number. If it's zero, we execute branch_1_solid
. Otherwise, we execute branch_2_solid
.
Dagster's mechanism for conditional execution is non-required outputs. For any solid output definition, we can set the is_required
argument to False
. If any of the inputs to a solid come from non-required outputs, and any of those non-required outputs are not yielded by the upstream solid, then the solid won't run.
In this example, branching_solid
only yields an Output
for branch_1
if it picks zero. If one is chosen, no output for branch_1
will be yielded, so branch_1_solid
will be missing an input, and it won't run.
@solid(
output_defs=[
OutputDefinition(int, "branch_1", is_required=False),
OutputDefinition(int, "branch_2", is_required=False),
]
)
def branching_solid(_):
num = random.randint(0, 1)
if num == 0:
yield Output(1, "branch_1")
else:
yield Output(2, "branch_2")
@solid(input_defs=[InputDefinition("_input", int)])
def branch_1_solid(_, _input):
pass
@solid(input_defs=[InputDefinition("_input", int)])
def branch_2_solid(_, _input):
pass
@pipeline
def my_pipeline():
branch_1, branch_2 = branching_solid()
branch_1_solid(branch_1)
branch_2_solid(branch_2)
Open in a playground¶
Download¶
curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/conditional_execution
cd conditional_execution