[Legacy] Pipelines

As of Dagster 0.13.0, we recommend using Jobs as an alternative to Pipelines.

Pipeline definitions

@dagster.pipeline(name=None, description=None, mode_defs=None, preset_defs=None, tags=None, hook_defs=None, input_defs=None, output_defs=None, config_schema=None, config_fn=None, solid_retry_policy=None, version_strategy=None)[source]

Create a pipeline with the specified parameters from the decorated composition function.

Using this decorator allows you to build up the dependency graph of the pipeline by writing a function that invokes solids and passes the output to other solids.

Parameters
  • name (Optional[str]) – The name of the pipeline. Must be unique within any RepositoryDefinition containing the pipeline.

  • description (Optional[str]) – A human-readable description of the pipeline.

  • mode_defs (Optional[List[ModeDefinition]]) – The set of modes in which this pipeline can operate. Modes are used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline. Modes can be used, e.g., to vary available resource and logging implementations between local test and production runs.

  • preset_defs (Optional[List[PresetDefinition]]) – A set of preset collections of configuration options that may be used to execute a pipeline. A preset consists of an environment dict, an optional subset of solids to execute, and a mode selection. Presets can be used to ship common combinations of options to pipeline end users in Python code, and can be selected by tools like Dagit.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution run of the pipeline. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • hook_defs (Optional[Set[HookDefinition]]) – A set of hook definitions applied to the pipeline. When a hook is applied to a pipeline, it will be attached to all solid instances within the pipeline.

  • solid_retry_policy (Optional[RetryPolicy]) – The default retry policy for all solids in this pipeline. Only used if retry policy is not defined on the solid definition or solid invocation.

  • version_strategy (Optional[VersionStrategy]) – The version strategy to use with this pipeline. Providing a VersionStrategy will enable memoization on the pipeline.

Example

@solid(output_defs=[OutputDefinition(int, "two"), OutputDefinition(int, "four")])
def emit_two_four(_) -> int:
    yield Output(2, "two")
    yield Output(4, "four")


@lambda_solid
def add_one(num: int) -> int:
    return num + 1


@lambda_solid
def mult_two(num: int) -> int:
    return num * 2


@pipeline
def math_pipeline():
    two, four = emit_two_four()
    add_one(two)
    mult_two(four)
class dagster.PipelineDefinition(solid_defs=None, name=None, description=None, dependencies=None, mode_defs=None, preset_defs=None, tags=None, hook_defs=None, solid_retry_policy=None, graph_def=None, _parent_pipeline_def=None, version_strategy=None)[source]

Defines a Dagster pipeline.

A pipeline is made up of

  • Solids, each of which is a single functional unit of data computation.

  • Dependencies, which determine how the values produced by solids as their outputs flow from one solid to another. This tells Dagster how to arrange solids, and potentially multiple aliased instances of solids, into a directed, acyclic graph (DAG) of compute.

  • Modes, which can be used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline, and to switch between them.

  • Presets, which can be used to ship common combinations of pipeline config options in Python code, and to switch between them.

Parameters
  • solid_defs (List[SolidDefinition]) – The set of solids used in this pipeline.

  • name (str) – The name of the pipeline. Must be unique within any RepositoryDefinition containing the pipeline.

  • description (Optional[str]) – A human-readable description of the pipeline.

  • dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. Keys of the top level dict are either the string names of solids in the pipeline or, in the case of aliased solids, NodeInvocations. Values of the top level dict are themselves dicts, which map input names belonging to the solid or aliased solid to DependencyDefinitions.

  • mode_defs (Optional[List[ModeDefinition]]) – The set of modes in which this pipeline can operate. Modes are used to attach resources, custom loggers, custom system storage options, and custom executors to a pipeline. Modes can be used, e.g., to vary available resource and logging implementations between local test and production runs.

  • preset_defs (Optional[List[PresetDefinition]]) – A set of preset collections of configuration options that may be used to execute a pipeline. A preset consists of an environment dict, an optional subset of solids to execute, and a mode selection. Presets can be used to ship common combinations of options to pipeline end users in Python code, and can be selected by tools like Dagit.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution run of the pipeline. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • hook_defs (Optional[AbstractSet[HookDefinition]]) – A set of hook definitions applied to the pipeline. When a hook is applied to a pipeline, it will be attached to all solid instances within the pipeline.

  • solid_retry_policy (Optional[RetryPolicy]) – The default retry policy for all solids in this pipeline. Only used if retry policy is not defined on the solid definition or solid invocation.

  • _parent_pipeline_def (INTERNAL ONLY) – Used for tracking pipelines created using solid subsets.

Examples

@solid
def return_one(_):
    return 1


@solid(input_defs=[InputDefinition('num')], required_resource_keys={'op'})
def apply_op(context, num):
    return context.resources.op(num)

@resource(config_schema=Int)
def adder_resource(init_context):
    return lambda x: x + init_context.resource_config


add_mode = ModeDefinition(
    name='add_mode',
    resource_defs={'op': adder_resource},
    description='Mode that adds things',
)


add_three_preset = PresetDefinition(
    name='add_three_preset',
    run_config={'resources': {'op': {'config': 3}}},
    mode='add_mode',
)


pipeline_def = PipelineDefinition(
    name='basic',
    solid_defs=[return_one, apply_op],
    dependencies={'apply_op': {'num': DependencyDefinition('return_one')}},
    mode_defs=[add_mode],
    preset_defs=[add_three_preset],
)

Executing pipelines

dagster.execute_pipeline(pipeline, run_config=None, mode=None, preset=None, tags=None, solid_selection=None, instance=None, raise_on_error=True)[source]

Execute a pipeline synchronously.

Users will typically call this API when testing pipeline execution, or running standalone scripts.

Parameters
  • pipeline (Union[IPipeline, PipelineDefinition]) – The pipeline to execute.

  • run_config (Optional[dict]) – The configuration that parametrizes this run, as a dict.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True, since this is the most useful behavior in test.

  • solid_selection (Optional[List[str]]) –

    A list of solid selection queries (including single solid names) to execute. For example:

    • ['some_solid']: selects some_solid itself.

    • ['*some_solid']: select some_solid and all its ancestors (upstream dependencies).

    • ['*some_solid+++']: select some_solid, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_solid', 'other_solid_a', 'other_solid_b+']: select some_solid and all its ancestors, other_solid_a itself, and other_solid_b and its direct child solids.

Returns

The result of pipeline execution.

Return type

PipelineExecutionResult

For the asynchronous version, see execute_pipeline_iterator().

dagster.execute_pipeline_iterator(pipeline, run_config=None, mode=None, preset=None, tags=None, solid_selection=None, instance=None)[source]

Execute a pipeline iteratively.

Rather than package up the result of running a pipeline into a single object, like execute_pipeline(), this function yields the stream of events resulting from pipeline execution.

This is intended to allow the caller to handle these events on a streaming basis in whatever way is appropriate.

Parameters
  • pipeline (Union[IPipeline, PipelineDefinition]) – The pipeline to execute.

  • run_config (Optional[dict]) – The configuration that parametrizes this run, as a dict.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • solid_selection (Optional[List[str]]) –

    A list of solid selection queries (including single solid names) to execute. For example:

    • ['some_solid']: selects some_solid itself.

    • ['*some_solid']: select some_solid and all its ancestors (upstream dependencies).

    • ['*some_solid+++']: select some_solid, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_solid', 'other_solid_a', 'other_solid_b+']: select some_solid and all its ancestors, other_solid_a itself, and other_solid_b and its direct child solids.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

Returns

The stream of events resulting from pipeline execution.

Return type

Iterator[DagsterEvent]

class dagster.PipelineExecutionResult(pipeline_def, run_id, event_list, reconstruct_context, output_capture=None)[source]

The result of executing a pipeline.

Returned by execute_pipeline(). Users should not instantiate this class directly.

output_for_solid(handle_str, output_name='result')

Get the output of a solid by its solid handle string and output name.

Parameters
  • handle_str (str) – The string handle for the solid.

  • output_name (str) – Optional. The name of the output, default to DEFAULT_OUTPUT.

Returns

The output value for the handle and output_name.

result_for_handle(handle)

Get the result of a solid by its solid handle.

This allows indexing into top-level solids to retrieve the results of children of composite solids.

Parameters

handle (Union[str,NodeHandle]) – The handle for the solid.

Returns

The result of the given solid.

Return type

Union[CompositeSolidExecutionResult, SolidExecutionResult]

result_for_solid(name)

Get the result of a top level solid.

Parameters

name (str) – The name of the top-level solid or aliased solid for which to retrieve the result.

Returns

The result of the solid execution within the pipeline.

Return type

Union[CompositeSolidExecutionResult, SolidExecutionResult]

property solid_result_list

The results for each top level solid.

Type

List[Union[CompositeSolidExecutionResult, SolidExecutionResult]]

property step_event_list

List[DagsterEvent] The full list of events generated by steps in the execution.

Excludes events generated by the pipeline lifecycle, e.g., PIPELINE_START.

property success

Whether all steps in the execution were successful.

Type

bool

dagster.default_executors List[ExecutorDefinition]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

The default executors available on any ModeDefinition that does not provide custom executors. These are currently [in_process_executor, multiprocess_executor].

Re-executing pipelines

dagster.reexecute_pipeline(pipeline, parent_run_id, run_config=None, step_selection=None, mode=None, preset=None, tags=None, instance=None, raise_on_error=True)[source]

Reexecute an existing pipeline run.

Users will typically call this API when testing pipeline reexecution, or running standalone scripts.

Parameters
  • pipeline (Union[IPipeline, PipelineDefinition]) – The pipeline to execute.

  • parent_run_id (str) – The id of the previous run to reexecute. The run must exist in the instance.

  • run_config (Optional[dict]) – The configuration that parametrizes this run, as a dict.

  • solid_selection (Optional[List[str]]) –

    A list of solid selection queries (including single solid names) to execute. For example:

    • ['some_solid']: selects some_solid itself.

    • ['*some_solid']: select some_solid and all its ancestors (upstream dependencies).

    • ['*some_solid+++']: select some_solid, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_solid', 'other_solid_a', 'other_solid_b+']: select some_solid and all its ancestors, other_solid_a itself, and other_solid_b and its direct child solids.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True, since this is the most useful behavior in test.

Returns

The result of pipeline execution.

Return type

PipelineExecutionResult

For the asynchronous version, see reexecute_pipeline_iterator().

dagster.reexecute_pipeline_iterator(pipeline, parent_run_id, run_config=None, step_selection=None, mode=None, preset=None, tags=None, instance=None)[source]

Reexecute a pipeline iteratively.

Rather than package up the result of running a pipeline into a single object, like reexecute_pipeline(), this function yields the stream of events resulting from pipeline reexecution.

This is intended to allow the caller to handle these events on a streaming basis in whatever way is appropriate.

Parameters
  • pipeline (Union[IPipeline, PipelineDefinition]) – The pipeline to execute.

  • parent_run_id (str) – The id of the previous run to reexecute. The run must exist in the instance.

  • run_config (Optional[dict]) – The configuration that parametrizes this run, as a dict.

  • solid_selection (Optional[List[str]]) –

    A list of solid selection queries (including single solid names) to execute. For example:

    • ['some_solid']: selects some_solid itself.

    • ['*some_solid']: select some_solid and all its ancestors (upstream dependencies).

    • ['*some_solid+++']: select some_solid, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_solid', 'other_solid_a', 'other_solid_b+']: select some_solid and all its ancestors, other_solid_a itself, and other_solid_b and its direct child solids.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

Returns

The stream of events resulting from pipeline reexecution.

Return type

Iterator[DagsterEvent]

Reconstructable pipelines

class dagster.reconstructable(target)[source]

Create a ReconstructablePipeline from a function that returns a PipelineDefinition/JobDefinition, or a function decorated with @pipeline/@job.

When your pipeline/job must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like dagstermill), Dagster must know how to reconstruct the pipeline/job on the other side of the process boundary.

Passing a job created with ~dagster.GraphDefinition.to_job to reconstructable(), requires you to wrap that job’s definition in a module-scoped function, and pass that function instead:

from dagster import graph, reconstructable

@graph
def my_graph():
    ...

def define_my_job():
    return my_graph.to_job()

reconstructable(define_my_job)

This function implements a very conservative strategy for reconstruction, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of pipelines or jobs, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.

If you need to reconstruct objects constructed in these ways, you should use build_reconstructable_pipeline() instead, which allows you to specify your own reconstruction strategy.

Examples:

from dagster import job, reconstructable

@job
def foo_job():
    ...

reconstructable_foo_job = reconstructable(foo_job)


@graph
def foo():
    ...

def make_bar_job():
    return foo.to_job()

reconstructable_bar_job = reconstructable(make_bar_job)
dagster.core.definitions.reconstructable.build_reconstructable_pipeline(reconstructor_module_name, reconstructor_function_name, reconstructable_args=None, reconstructable_kwargs=None, reconstructor_working_directory=None)

Create a dagster.core.definitions.reconstructable.ReconstructablePipeline.

When your pipeline must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like dagstermill), Dagster must know how to reconstruct the pipeline on the other side of the process boundary.

This function allows you to use the strategy of your choice for reconstructing pipelines, so that you can reconstruct certain kinds of pipelines that are not supported by reconstructable(), such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.

If you need to reconstruct pipelines constructed in these ways, use this function instead of reconstructable().

Parameters
  • reconstructor_module_name (str) – The name of the module containing the function to use to reconstruct the pipeline.

  • reconstructor_function_name (str) – The name of the function to use to reconstruct the pipeline.

  • reconstructable_args (Tuple) – Args to the function to use to reconstruct the pipeline. Values of the tuple must be JSON serializable.

  • reconstructable_kwargs (Dict[str, Any]) – Kwargs to the function to use to reconstruct the pipeline. Values of the dict must be JSON serializable.

Examples:

# module: mymodule

from dagster import PipelineDefinition, pipeline, build_reconstructable_pipeline

class PipelineFactory:
    def make_pipeline(*args, **kwargs):

        @pipeline
        def _pipeline(...):
            ...

        return _pipeline

def reconstruct_pipeline(*args):
    factory = PipelineFactory()
    return factory.make_pipeline(*args)

factory = PipelineFactory()

foo_pipeline_args = (...,...)

foo_pipeline_kwargs = {...:...}

foo_pipeline = factory.make_pipeline(*foo_pipeline_args, **foo_pipeline_kwargs)

reconstructable_foo_pipeline = build_reconstructable_pipeline(
    'mymodule',
    'reconstruct_pipeline',
    foo_pipeline_args,
    foo_pipeline_kwargs,
)
class dagster.core.definitions.reconstructable.ReconstructablePipeline(repository, pipeline_name, solid_selection_str=None, solids_to_execute=None)[source]

Defines a reconstructable pipeline. When your pipeline/job must cross process boundaries, Dagster must know how to reconstruct the pipeline/job on the other side of the process boundary.

Parameters
  • repository (ReconstructableRepository) – The reconstructable representation of the repository the pipeline/job belongs to.

  • pipeline_name (str) – The name of the pipeline/job.

  • solid_selection_str (Optional[str]) – The string value of a comma separated list of user-input solid/op selection. None if no selection is specified, i.e. the entire pipeline/job will be run.

  • solids_to_execute (Optional[FrozenSet[str]]) – A set of solid/op names to execute. None if no selection is specified, i.e. the entire pipeline/job will be run.

Dependencies and aliases

class dagster.DependencyDefinition(solid=None, output='result', description=None, node=None)[source]

Represents an edge in the DAG of nodes (ops or graphs) forming a job.

This object is used at the leaves of a dictionary structure that represents the complete dependency structure of a job whose keys represent the dependent node and dependent input, so this object only contains information about the dependee.

Concretely, if the input named ‘input’ of op_b depends on the output named ‘result’ of op_a, and the output named ‘other_result’ of graph_a, the structure will look as follows:

dependency_structure = {
    'my_downstream_op': {
        'input': DependencyDefinition('my_upstream_op', 'result')
    }
    'my_downstream_op': {
        'input': DependencyDefinition('my_upstream_graph', 'result')
    }
}

In general, users should prefer not to construct this class directly or use the JobDefinition API that requires instances of this class. Instead, use the @job API:

@job
def the_job():
    node_b(node_a())
Parameters
  • solid (str) – (legacy) The name of the solid that is depended on, that is, from which the value passed between the two nodes originates.

  • output (Optional[str]) – The name of the output that is depended on. (default: “result”)

  • description (Optional[str]) – Human-readable description of this dependency.

  • node (str) – The name of the node (op or graph) that is depended on, that is, from which the value passed between the two nodes originates.

class dagster.MultiDependencyDefinition(dependencies)[source]

Represents a fan-in edge in the DAG of op instances forming a job.

This object is used only when an input of type List[T] is assembled by fanning-in multiple upstream outputs of type T.

This object is used at the leaves of a dictionary structure that represents the complete dependency structure of a job or pipeline whose keys represent the dependent ops or graphs and dependent input, so this object only contains information about the dependee.

Concretely, if the input named ‘input’ of op_c depends on the outputs named ‘result’ of op_a and op_b, this structure will look as follows:

dependency_structure = {
    'op_c': {
        'input': MultiDependencyDefinition(
            [
                DependencyDefinition('op_a', 'result'),
                DependencyDefinition('op_b', 'result')
            ]
        )
    }
}

In general, users should prefer not to construct this class directly or use the JobDefinition API that requires instances of this class. Instead, use the @job API:

@job
def the_job():
    op_c(op_a(), op_b())
Parameters

dependencies (List[Union[DependencyDefinition, Type[MappedInputPlaceHolder]]]) – List of upstream dependencies fanned in to this input.

dagster.SolidInvocation

alias of dagster.core.definitions.dependency.NodeInvocation

Pipeline configuration

Run Config Schema

The run_config used by execute_pipeline() and execute_pipeline_iterator() has the following schema:

{
  # configuration for execution, required if executors require config
  execution: {
    # the name of one, and only one available executor, typically 'in_process' or 'multiprocess'
    __executor_name__: {
      # executor-specific config, if required or permitted
      config: {
        ...
      }
    }
  },

  # configuration for loggers, required if loggers require config
  loggers: {
    # the name of an available logger
    __logger_name__: {
      # logger-specific config, if required or permitted
      config: {
        ...
      }
    },
    ...
  },

  # configuration for resources, required if resources require config
  resources: {
    # the name of a resource
    __resource_name__: {
      # resource-specific config, if required or permitted
      config: {
        ...
      }
    },
    ...
  },

  # configuration for solids, required if solids require config
  solids: {

    # these keys align with the names of the solids, or their alias in this pipeline
    __solid_name__: {

      # pass any data that was defined via config_field
      config: ...,

      # configurably specify input values, keyed by input name
      inputs: {
        __input_name__: {
          # if an dagster_type_loader is specified, that schema must be satisfied here;
          # scalar, built-in types will generally allow their values to be specified directly:
          value: ...
        }
      },

    }
  },

}