Pipelines

The second fundamental abstraction in Dagster is a pipeline. A pipeline is a set of solids which have data dependencies on each other to create a directed acyclic graph, or DAG.

Paired with appropriate configuration, the pipeline can be compiled by the Dagster engine into an execution plan that is executable on various compute substrates.

Defining a pipeline

There are two ways to define a pipeline:

  1. Using the @pipeline decorator [Preferred]
  2. Construct a PipelineDefinition object

Method 1: Using the decorator

To use the @pipeline decorator, we wrap a function that composes a set of solids.

Within the body of the function that is decorated, we use function calls to indicate the dependency structure of the solids making up the pipeline.

solids.py
from dagster import DependencyDefinition, InputDefinition, PipelineDefinition, pipeline, solid


@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@pipeline
def one_plus_one_pipeline():
    add_one(return_one())

Method 2: Constructing the PipelineDefinition object

To construct a PipelineDefinition object, you need to pass the constructor a pipeline name, list of solid definitions, and a dictionary defining the dependency structure.

The dependency structure declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. Keys of the top level dict are either the string names of solids in the pipeline or, in the case of aliased solids, SolidInvocations. Values of the top level dict are themselves dicts, which map input names belonging to the solid or aliased solid to DependencyDefinitions.

solids.py
one_plus_one_pipeline_def = PipelineDefinition(
    name="one_plus_one_pipeline",
    solid_defs=[return_one, add_one],
    dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)