DagsterDocs

Re-execution in Dagster#

This guide is intended to walk through how to re-execute Dagster pipelines and where the subsequent executions are found within Dagit.

Motivation#

If solids fail or upstream data has changed within a pipeline execution, the pipeline may need to be re-run starting from a particular point. Dagster calls this process re-execution.

Imagine a machine learning pipeline with three solids, the first being the most time and resource intensive: training the model, testing the model, and building analyses on the results. If the pipeline fails with the solid that is testing the model, after fixing the root cause it would take much more time to create a new run of the pipeline than to start again from the second step.

With Dagster, the re-execution of parts of the pipeline is grouped with the original run to make it easy to trace. The original pipeline execution metadata is not overwritten, making re-execution a non-destructive operation.

Example Walk-through#

Consider the following pipeline which has three solids, one of which fails half of the time.

from random import random

from dagster import ModeDefinition, fs_io_manager, pipeline, solid


@solid()
def unreliable_start(_):
    return 1


@solid()
def unreliable(_, num):
    failure_rate = 0.5
    if random() < failure_rate:
        raise Exception("blah")


@solid()
def unreliable_end(_, num):
    return


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})])
def unreliable_pipeline():
    unreliable_end(unreliable(unreliable_start()))

Although very simple, there are inputs and outputs passed between tasks. With an IO manager, re-execution is able to handle inputs and outputs stored from the initial run.

Upon launching the execution in Dagit, you can find the re-execution option on the top right.

Re-execution in Dagit#

Under the re-execution drop down, you will see multiple options for different types of re-execution. No matter which one you choose, the re-executed run is linked to the original run.

  • All Steps: Re-execute the pipeline run from scratch. This option is most relevant upon testing pipeline runs end to end, if you'd like to associate different runs together.
  • Selected Step(s): Re-execute the selected step(s). This can be done independent of the step status. Your pipeline is likely large, and you know exactly which steps to execute. This can be done by clicking on the boxes in the gantt chart view.
  • From Selected: Re-execute the pipeline downstream from the selected steps. If a particular step fails, this option is most relevant if your intent is to run all downstream steps regardless of status. You're likely just developing a single step, and want to make sure downstream steps work as expected.
  • From Failure: Re-execute the pipeline run, skipping steps completed successfully. This is only enabled when the pipeline has failed. You have likely fixed the failed step, and want to run the step and all downstream dependencies again. Dagster will figure out the dependencies for you!

In the above example, re-executing from failure would make sense as the failed task has a 50% chance of succeeding on the next run.

Dagit Re-execution

If the entire pipeline succeeded but the underlying code changed, running specific steps to test the differences would be more relevant.

Selecting Steps#

Within Dagit, a single or multiple steps may be selected simply by clicking them with the mouse. Alternatively, the text-box can be leveraged by typing step names to be re-run, adding a + for every subsequent step to be selected. This is demoed in the video below:

Re-execution using Python APIs#

Re-execution can be triggered via the API as well.

NameDescription
reexecute_pipelineReexecute an existing pipeline run.

Consider the pipeline unreliable_pipeline, which has a solid named unreliable.

from dagster import DagsterInstance, execute_pipeline, reexecute_pipeline
from reexecution.unreliable_pipeline import unreliable_pipeline


def reexecution():
    instance = DagsterInstance.ephemeral()

    # Initial execution
    pipeline_result_full = execute_pipeline(unreliable_pipeline, instance=instance)

    if not pipeline_result_full.success:
        # Re-execution: Entire pipeline
        reexecution_result_full = reexecute_pipeline(
            unreliable_pipeline,
            parent_run_id=pipeline_result_full.run_id,
            instance=instance,
        )

Using Dagster's API, you can programmatically trigger both an execution and a reexecution. Upon an initial pipeline run failing, you may want to trigger a full re-execution. Similarly, you can trigger a re-execution of selected steps or from a particular point.

# Re-execution: Starting with the "unreliable" solid and all its descendents
reexecution_result_specific_selection = reexecute_pipeline(
    unreliable_pipeline,
    parent_run_id=pipeline_result_full.run_id,
    instance=instance,
    solid_selection=["unreliable+*"],
)

The solid_selection input is configurable, with syntax further documented in the API docs.