Pipelines

Pipeline definitions

@dagster.pipeline(name=None, description=None, mode_defs=None, preset_defs=None, tags=None, hook_defs=None)[source]

Create a pipeline with the specified parameters from the decorated composition function.

Using this decorator allows you to build up the dependency graph of the pipeline by writing a function that invokes solids and passes the output to other solids.

Parameters
  • name (Optional[str]) – The name of the pipeline. Must be unique within any RepositoryDefinition containing the pipeline.

  • description (Optional[str]) – A human-readable description of the pipeline.

  • mode_defs (Optional[List[ModeDefinition]]) – The set of modes in which this pipeline can operate. Modes are used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline. Modes can be used, e.g., to vary available resource and logging implementations between local test and production runs.

  • preset_defs (Optional[List[PresetDefinition]]) – A set of preset collections of configuration options that may be used to execute a pipeline. A preset consists of an environment dict, an optional subset of solids to execute, and a mode selection. Presets can be used to ship common combinations of options to pipeline end users in Python code, and can be selected by tools like Dagit.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution run of the pipeline. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • hook_defs (Optional[Set[HookDefinition]]) – A set of hook definitions applied to the pipeline. When a hook is applied to a pipeline, it will be attached to all solid instances within the pipeline.

Example

@solid(output_defs=[OutputDefinition(int, "two"), OutputDefinition(int, "four")])
def emit_two_four(_) -> int:
    yield Output(2, "two")
    yield Output(4, "four")


@lambda_solid
def add_one(num: int) -> int:
    return num + 1


@lambda_solid
def mult_two(num: int) -> int:
    return num * 2


@pipeline
def math_pipeline():
    two, four = emit_two_four()
    add_one(two)
    mult_two(four)
class dagster.PipelineDefinition(solid_defs, name=None, description=None, dependencies=None, mode_defs=None, preset_defs=None, tags=None, hook_defs=None, _parent_pipeline_def=None)[source]

Defines a Dagster pipeline.

A pipeline is made up of

  • Solids, each of which is a single functional unit of data computation.

  • Dependencies, which determine how the values produced by solids as their outputs flow from one solid to another. This tells Dagster how to arrange solids, and potentially multiple aliased instances of solids, into a directed, acyclic graph (DAG) of compute.

  • Modes, which can be used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline, and to switch between them.

  • Presets, which can be used to ship common combinations of pipeline config options in Python code, and to switch between them.

Parameters
  • solid_defs (List[SolidDefinition]) – The set of solids used in this pipeline.

  • name (Optional[str]) – The name of the pipeline. Must be unique within any RepositoryDefinition containing the pipeline.

  • description (Optional[str]) – A human-readable description of the pipeline.

  • dependencies (Optional[Dict[Union[str, SolidInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. Keys of the top level dict are either the string names of solids in the pipeline or, in the case of aliased solids, SolidInvocations. Values of the top level dict are themselves dicts, which map input names belonging to the solid or aliased solid to DependencyDefinitions.

  • mode_defs (Optional[List[ModeDefinition]]) – The set of modes in which this pipeline can operate. Modes are used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline. Modes can be used, e.g., to vary available resource and logging implementations between local test and production runs.

  • preset_defs (Optional[List[PresetDefinition]]) – A set of preset collections of configuration options that may be used to execute a pipeline. A preset consists of an environment dict, an optional subset of solids to execute, and a mode selection. Presets can be used to ship common combinations of options to pipeline end users in Python code, and can be selected by tools like Dagit.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution run of the pipeline. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • hook_defs (Optional[Set[HookDefinition]]) – A set of hook definitions applied to the pipeline. When a hook is applied to a pipeline, it will be attached to all solid instances within the pipeline.

  • _parent_pipeline_def (INTERNAL ONLY) – Used for tracking pipelines created using solid subsets.

Examples

@lambda_solid
def return_one():
    return 1


@solid(input_defs=[InputDefinition('num')], required_resource_keys={'op'})
def apply_op(context, num):
    return context.resources.op(num)

@resource(config_schema=Int)
def adder_resource(init_context):
    return lambda x: x + init_context.resource_config


add_mode = ModeDefinition(
    name='add_mode',
    resource_defs={'op': adder_resource},
    description='Mode that adds things',
)


add_three_preset = PresetDefinition(
    name='add_three_preset',
    run_config={'resources': {'op': {'config': 3}}},
    mode='add_mode',
)


pipeline_def = PipelineDefinition(
    name='basic',
    solid_defs=[return_one, apply_op],
    dependencies={'apply_op': {'num': DependencyDefinition('return_one')}},
    mode_defs=[add_mode],
    preset_defs=[add_three_preset],
)

Dependencies and aliases

class dagster.DependencyDefinition[source]

Represents an edge in the DAG of solid instances forming a pipeline.

This object is used at the leaves of a dictionary structure that represents the complete dependency structure of a pipeline whose keys represent the dependent solid and dependent input, so this object only contains information about the dependee.

Concretely, if the input named ‘input’ of solid_b depends on the output named ‘result’ of solid_a, this structure will look as follows:

dependency_structure = {
    'solid_b': {
        'input': DependencyDefinition('solid_a', 'result')
    }
}

In general, users should prefer not to construct this class directly or use the PipelineDefinition API that requires instances of this class. Instead, use the @pipeline API:

@pipeline
def pipeline():
    solid_b(solid_a())
Parameters
  • solid (str) – The name of the solid that is depended on, that is, from which the value passed between the two solids originates.

  • output (Optional[str]) – The name of the output that is depended on. (default: “result”)

  • description (Optional[str]) – Human-readable description of this dependency.

class dagster.MultiDependencyDefinition[source]

Represents a fan-in edge in the DAG of solid instances forming a pipeline.

This object is used only when an input of type List[T] is assembled by fanning-in multiple upstream outputs of type T.

This object is used at the leaves of a dictionary structure that represents the complete dependency structure of a pipeline whose keys represent the dependent solid and dependent input, so this object only contains information about the dependee.

Concretely, if the input named ‘input’ of solid_c depends on the outputs named ‘result’ of solid_a and solid_b, this structure will look as follows:

dependency_structure = {
    'solid_c': {
        'input': MultiDependencyDefinition(
            [
                DependencyDefinition('solid_a', 'result'),
                DependencyDefinition('solid_b', 'result')
            ]
        )
    }
}

In general, users should prefer not to construct this class directly or use the PipelineDefinition API that requires instances of this class. Instead, use the @pipeline API:

@pipeline
def pipeline():
    solid_c(solid_a(), solid_b())
Parameters
  • solid (str) – The name of the solid that is depended on, that is, from which the value passed between the two solids originates.

  • output (Optional[str]) – The name of the output that is depended on. (default: “result”)

  • description (Optional[str]) – Human-readable description of this dependency.

class dagster.SolidInvocation[source]

Identifies an instance of a solid in a pipeline dependency structure.

Parameters
  • name (str) – Name of the solid of which this is an instance.

  • alias (Optional[str]) – Name specific to this instance of the solid. Necessary when there are multiple instances of the same solid.

  • tags (Optional[Dict[str, Any]]) – Optional tags values to extend or override those set on the solid definition.

  • hook_defs (Optional[Set[HookDefinition]]) – A set of hook definitions applied to the solid instance.

Examples

pipeline = PipelineDefinition(
    solid_defs=[solid_1, solid_2]
    dependencies={
        SolidInvocation('solid_1', alias='other_name') : {
            'input_name' : DependencyDefinition('solid_1'),
        },
        'solid_2' : {
            'input_name': DependencyDefinition('other_name'),
        },
    }
)

In general, users should prefer not to construct this class directly or use the PipelineDefinition API that requires instances of this class. Instead, use the @pipeline API:

@pipeline
def pipeline():
    other_name = solid_1.alias('other_name')
    solid_2(other_name(solid_1))