Executing Pipelines

We provide several different ways to do one-off execution of pipelines and solids. For recurring execution, consult the schedules section.

Dagit

The Dagit playground offers a way to interactively build up the configuration for a run of a pipeline. Drops downs are available for selecting any defined Presets or partitions

CLI

The dagster cli includes both dagster pipeline execute for direct execution as well as dagster pipeline launch for async launching.

dagster pipeline execute -f my_pipeline.py

Python APIs

We also provide python APIs for execution that are useful when writing tests or scripts. execute_pipeline allows for execution of a full pipeline, and returns a PipelineExecutionResult

test_pipeline.py
def test_execute_pipeline():
    result = execute_pipeline(predict_color)
    assert result.success
    assert result.output_for_solid("always_blue") == "blue"

We also provide < module="dagster" object="execute_solid" /> for executing an individual solid. Under the hood this constructs an ephermal pipeline containing only this solid.

test_solid.py
def test_execute_solid():
    result = execute_solid(always_blue)
    assert result.success
    assert result.output_value() == "blue"

By default the python APIs will use an ephemeral DagsterInstance to avoid reporting test runs to the instance. When using the python API for production runs, set the instance using instance=DagsterInstance.get() to use the default loading behavior for the instance.

execute.py
        execute_pipeline(predict_color, instance=DagsterInstance.get())

If pipeline execution will involve reconstucting the pipeline in another process, such as when using the multi-process executor or dagstermill, the python APIs will need a reconstructable instance of the pipeline. This work is handled for you when using dagit or the cli. You will also need to configure the intermediate values between solids to be stored in a way that they can be accessed across processes.

execute.py
def test_multiprocess_executor():
    result = execute_pipeline(
        run_config={
            # This section controls how the run will be executed.
            # The multiprocess executor runs each step in its own sub process.
            "execution": {"multiprocess": {}},
            # This section controls how values will be passed from one solid to the next.
            # The default is in memory, so here we set it to filesystem to allow the
            # separate subprocess to get the values
            "intermediate_storage": {"filesystem": {}},
        },
        # The default instance for this API is an in memory ephemeral one.
        # To allow the multiple processes to coordinate we use one here
        # backed by a temporary directory.
        instance=DagsterInstance.local_temp(),
        # A ReconstructablePipeline is necessary to load the pipeline in child processes.
        # reconstructable() is a utility function that captures where the
        # PipelineDefinition came from.
        pipeline=reconstructable(predict_color),
    )
    assert result.success