Pipelines
The second fundamental abstraction in Dagster is a pipeline. A pipeline is a set of solids which have data dependencies on each other to create a directed acyclic graph, or DAG.
Paired with appropriate configuration, the pipeline can be compiled by the Dagster engine into an execution plan that is executable on various compute substrates.
Defining a pipeline¶
There are two ways to define a pipeline:
- Using the
@pipeline
decorator [Preferred] - Construct a
PipelineDefinition
object
Method 1: Using the decorator
To use the @pipeline
decorator,
we wrap a function that composes a set of solids.
Within the body of the function that is decorated, we use function calls to indicate the dependency structure of the solids making up the pipeline.
from dagster import DependencyDefinition, InputDefinition, PipelineDefinition, pipeline, solid
@solid
def return_one(context):
return 1
@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
return number + 1
@pipeline
def one_plus_one_pipeline():
add_one(return_one())
Method 2: Constructing the PipelineDefinition object
To construct a PipelineDefinition
object, you need to pass the constructor a pipeline name,
list of solid definitions, and a dictionary defining the dependency structure.
The dependency structure declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. Keys of the top level dict are either the string names of solids in the pipeline or, in the case of aliased solids, SolidInvocations. Values of the top level dict are themselves dicts, which map input names belonging to the solid or aliased solid to DependencyDefinitions.
one_plus_one_pipeline_def = PipelineDefinition(
name="one_plus_one_pipeline",
solid_defs=[return_one, add_one],
dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)