Execution

Executing pipelines

dagster.execute_pipeline(pipeline, run_config=None, mode=None, preset=None, tags=None, solid_selection=None, instance=None, raise_on_error=True)[source]

Execute a pipeline synchronously.

Users will typically call this API when testing pipeline execution, or running standalone scripts.

Parameters
  • pipeline (Union[ExecutablePipeline, PipelineDefinition]) – The pipeline to execute.

  • run_config (Optional[dict]) – The environment configuration that parametrizes this run, as a dict.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True, since this is the most useful behavior in test.

  • solid_selection (Optional[List[str]]) –

    A list of solid selection queries (including single solid names) to execute. For example: - [‘some_solid’]: select “some_solid” itself. - [‘*some_solid’]: select “some_solid” and all its ancestors (upstream dependencies). - [‘*some_solid+++’]: select “some_solid”, all its ancestors, and its descendants

    (downstream dependencies) within 3 levels down.

    • [‘*some_solid’, ‘other_solid_a’, ‘other_solid_b+’]: select “some_solid” and all its

      ancestors, “other_solid_a” itself, and “other_solid_b” and its direct child solids.

Returns

The result of pipeline execution.

Return type

PipelineExecutionResult

For the asynchronous version, see execute_pipeline_iterator().

This is the entrypoint for dagster CLI execution. For the dagster-graphql entrypoint, see dagster.core.execution.api.execute_plan().

dagster.execute_pipeline_iterator(pipeline, run_config=None, mode=None, preset=None, tags=None, solid_selection=None, instance=None)[source]

Execute a pipeline iteratively.

Rather than package up the result of running a pipeline into a single object, like execute_pipeline(), this function yields the stream of events resulting from pipeline execution.

This is intended to allow the caller to handle these events on a streaming basis in whatever way is appropriate.

Parameters
  • pipeline (Union[ExecutablePipeline, PipelineDefinition]) – The pipeline to execute.

  • run_config (Optional[dict]) – The environment configuration that parametrizes this run, as a dict.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • solid_selection (Optional[List[str]]) –

    A list of solid selection queries (including single solid names) to execute. For example: - [‘some_solid’]: select “some_solid” itself. - [‘*some_solid’]: select “some_solid” and all its ancestors (upstream dependencies). - [‘*some_solid+++’]: select “some_solid”, all its ancestors, and its descendants

    (downstream dependencies) within 3 levels down.

    • [‘*some_solid’, ‘other_solid_a’, ‘other_solid_b+’]: select “some_solid” and all its

      ancestors, “other_solid_a” itself, and “other_solid_b” and its direct child solids.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

Returns

The stream of events resulting from pipeline execution.

Return type

Iterator[DagsterEvent]

Re-executing pipelines

dagster.reexecute_pipeline(pipeline, parent_run_id, run_config=None, step_selection=None, mode=None, preset=None, tags=None, instance=None, raise_on_error=True, step_keys_to_execute=None)[source]

Reexecute an existing pipeline run.

Users will typically call this API when testing pipeline reexecution, or running standalone scripts.

Parameters
  • pipeline (Union[ExecutablePipeline, PipelineDefinition]) – The pipeline to execute.

  • parent_run_id (str) – The id of the previous run to reexecute. The run must exist in the instance.

  • run_config (Optional[dict]) – The environment configuration that parametrizes this run, as a dict.

  • step_selection (Optional[List[str]]) –

    A list of step selection queries (including single step keys) to execute. For example: - [‘some_solid.compute’]: select the execution step “some_solid.compute” itself. - [‘*some_solid.compute’]: select the step “some_solid.compute” and all its ancestors

    (upstream dependencies).

    • [‘*some_solid.compute+++’]: select the step “some_solid.compute”, all its ancestors,

      and its descendants (downstream dependencies) within 3 levels down.

    • [‘*some_solid.compute’, ‘other_solid_a.compute’, ‘other_solid_b.compute+’]: select

      ”some_solid.compute” and all its ancestors, “other_solid_a.compute” itself, and “other_solid_b.compute” and its direct child execution steps.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True, since this is the most useful behavior in test.

Returns

The result of pipeline execution.

Return type

PipelineExecutionResult

For the asynchronous version, see reexecute_pipeline_iterator().

dagster.reexecute_pipeline_iterator(pipeline, parent_run_id, run_config=None, step_selection=None, mode=None, preset=None, tags=None, instance=None, step_keys_to_execute=None)[source]

Reexecute a pipeline iteratively.

Rather than package up the result of running a pipeline into a single object, like reexecute_pipeline(), this function yields the stream of events resulting from pipeline reexecution.

This is intended to allow the caller to handle these events on a streaming basis in whatever way is appropriate.

Parameters
  • pipeline (Union[ExecutablePipeline, PipelineDefinition]) – The pipeline to execute.

  • parent_run_id (str) – The id of the previous run to reexecute. The run must exist in the instance.

  • run_config (Optional[dict]) – The environment configuration that parametrizes this run, as a dict.

  • step_selection (Optional[List[str]]) –

    A list of step selection queries (including single step keys) to execute. For example: - [‘some_solid.compute’]: select the execution step “some_solid.compute” itself. - [‘*some_solid.compute’]: select the step “some_solid.compute” and all its ancestors

    (upstream dependencies).

    • [‘*some_solid.compute+++’]: select the step “some_solid.compute”, all its ancestors,

      and its descendants (downstream dependencies) within 3 levels down.

    • [‘*some_solid.compute’, ‘other_solid_a.compute’, ‘other_solid_b.compute+’]: select

      ”some_solid.compute” and all its ancestors, “other_solid_a.compute” itself, and “other_solid_b.compute” and its direct child execution steps.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

Returns

The stream of events resulting from pipeline reexecution.

Return type

Iterator[DagsterEvent]

Executing solids

dagster.execute_solid(solid_def, mode_def=None, input_values=None, tags=None, run_config=None, raise_on_error=True)[source]

Execute a single solid in an ephemeral pipeline.

Intended to support unit tests. Input values may be passed directly, and no pipeline need be specified – an ephemeral pipeline will be constructed.

Parameters
  • solid_def (SolidDefinition) – The solid to execute.

  • mode_def (Optional[ModeDefinition]) – The mode within which to execute the solid. Use this if, e.g., custom resources, loggers, or executors are desired.

  • input_values (Optional[Dict[str, Any]]) – A dict of input names to input values, used to pass inputs to the solid directly. You may also use the run_config to configure any inputs that are configurable.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • run_config (Optional[dict]) – The environment configuration that parameterized this execution, as a dict.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True, since this is the most useful behavior in test.

Returns

The result of executing the solid.

Return type

Union[CompositeSolidExecutionResult, SolidExecutionResult]

dagster.execute_solid_within_pipeline(pipeline_def, solid_name, inputs=None, run_config=None, mode=None, preset=None, tags=None, instance=None)[source]

Execute a single solid within an existing pipeline.

Intended to support tests. Input values may be passed directly.

Parameters
  • pipeline_def (PipelineDefinition) – The pipeline within which to execute the solid.

  • solid_name (str) – The name of the solid, or the aliased solid, to execute.

  • inputs (Optional[Dict[str, Any]]) – A dict of input names to input values, used to pass input values to the solid directly. You may also use the run_config to configure any inputs that are configurable.

  • run_config (Optional[dict]) – The environment configuration that parameterized this execution, as a dict.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

Returns

The result of executing the solid.

Return type

Union[CompositeSolidExecutionResult, SolidExecutionResult]

dagster.execute_solids_within_pipeline(pipeline_def, solid_names, inputs=None, run_config=None, mode=None, preset=None, tags=None, instance=None)[source]

Execute a set of solids within an existing pipeline.

Intended to support tests. Input values may be passed directly.

Parameters
  • pipeline_def (PipelineDefinition) – The pipeline within which to execute the solid.

  • solid_names (FrozenSet[str]) – A set of the solid names, or the aliased solids, to execute.

  • inputs (Optional[Dict[str, Dict[str, Any]]]) – A dict keyed on solid names, whose values are dicts of input names to input values, used to pass input values to the solids directly. You may also use the run_config to configure any inputs that are configurable.

  • run_config (Optional[dict]) – The environment configuration that parameterized this execution, as a dict.

  • mode (Optional[str]) – The name of the pipeline mode to use. You may not set both mode and preset.

  • preset (Optional[str]) – The name of the pipeline preset to use. You may not set both mode and preset.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to pipeline logs.

  • instance (Optional[DagsterInstance]) – The instance to execute against. If this is None, an ephemeral instance will be used, and no artifacts will be persisted from the run.

Returns

The results of executing the solids, keyed by solid name.

Return type

Dict[str, Union[CompositeSolidExecutionResult, SolidExecutionResult]]

Execution context

class dagster.core.execution.context.compute.SolidExecutionContext(system_compute_execution_context)[source]

The context object available to solid compute logic.

property instance

The current Instance

property pdb

Allows pdb debugging from within the solid.

Example:

@solid
def debug_solid(context):
    context.pdb.set_trace()
property pipeline_run

The current PipelineRun

property solid_config

The parsed config specific to this solid.

class dagster.core.execution.context.compute.AbstractComputeExecutionContext[source]

Base class for solid context implemented by SolidExecutionContext and DagstermillInNotebookExecutionContext

abstract get_tag(key)[source]

Implement this method to get a logging tag.

abstract has_tag(key)[source]

Implement this method to check if a logging tag is set.

abstract property log

The log manager available in the execution context.

abstract property pipeline_def

The pipeline being executed.

abstract property resources

Resources available in the execution context.

abstract property run_id

The run id for the context.

abstract property solid

The solid corresponding to the execution step being executed.

abstract property solid_def

The solid definition corresponding to the execution step being executed.

class dagster.core.execution.context.compute.SystemComputeExecutionContext(execution_context_data, log_manager, step)[source]
class dagster.reconstructable[source]

Create a ReconstructablePipeline from a function that returns a PipelineDefinition, or a function decorated with @pipeline

When your pipeline must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like dagstermill), Dagster must know how to reconstruct the pipeline on the other side of the process boundary.

This function implements a very conservative strategy for reconstructing pipelines, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of pipelines, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.

If you need to reconstruct pipelines constructed in these ways, you should use build_reconstructable_pipeline() instead, which allows you to specify your own strategy for reconstructing a pipeline.

Examples:

from dagster import PipelineDefinition, pipeline, recontructable

@pipeline
def foo_pipeline():
    ...

reconstructable_foo_pipeline = reconstructable(foo_pipeline)


def make_bar_pipeline():
    return PipelineDefinition(...)

reconstructable_bar_pipeline = reconstructable(bar_pipeline)

Pipeline and solid results

class dagster.PipelineExecutionResult(pipeline_def, run_id, event_list, reconstruct_context)[source]

The result of executing a pipeline.

Returned by execute_pipeline(). Users should not instantiate this class.

class dagster.SolidExecutionResult(solid, step_events_by_kind, reconstruct_context)[source]

Execution result for a leaf solid in a pipeline.

Users should not instantiate this class.

property compute_input_event_dict

All events of type STEP_INPUT, keyed by input name.

Type

Dict[str, DagsterEvent]

property compute_output_event_dict

All events of type STEP_OUTPUT, keyed by output name

Type

Dict[str, DagsterEvent]

property compute_step_events

All events generated by execution of the solid compute function.

Type

List[DagsterEvent]

property compute_step_failure_event

The STEP_FAILURE event, throws if it did not fail.

Type

DagsterEvent

property expectation_events_during_compute

All events of type STEP_EXPECTATION_RESULT.

Type

List[DagsterEvent]

property expectation_results_during_compute

All expectation results yielded by the solid

Type

List[ExpectationResult]

property failure_data

Any data corresponding to this step’s failure, if it failed.

Type

Union[None, StepFailureData]

get_output_event_for_compute(output_name='result')[source]

The STEP_OUTPUT event for the given output name.

Throws if not present.

Parameters

output_name (Optional[str]) – The name of the output. (default: ‘result’)

Returns

The corresponding event.

Return type

DagsterEvent

get_step_success_event()[source]

DagsterEvent: The STEP_SUCCESS event, throws if not present.

property input_events_during_compute

All events of type STEP_INPUT.

Type

List[DagsterEvent]

property materialization_events_during_compute

All events of type STEP_MATERIALIZATION.

Type

List[DagsterEvent]

property materializations_during_compute

All materializations yielded by the solid.

Type

List[Materialization]

property output_events_during_compute

All events of type STEP_OUTPUT.

Type

List[DagsterEvent]

output_value(output_name='result')[source]

Get a computed output value.

Note that calling this method will reconstruct the pipeline context (including, e.g., resources) to retrieve materialized output values.

Parameters

output_name (str) – The output name for which to retrieve the value. (default: ‘result’)

Returns

None if execution did not succeed, otherwise the output value.

Return type

Union[None, Any]

property output_values

The computed output values.

Keys of this dictionary are output names, values are output values.

Returns None if execution did not succeed.

Note that accessing this property will reconstruct the pipeline context (including, e.g., resources) to retrieve materialized output values.

Type

Union[None, Dict[str, Any]]

property skipped

Whether solid execution was skipped.

Type

bool

property success

Whether solid execution was successful.

Type

bool

class dagster.CompositeSolidExecutionResult(solid, event_list, step_events_by_kind, reconstruct_context, handle=None)[source]

Execution result for a composite solid in a pipeline.

Users should not instantiate this class.

class dagster.DagsterEvent[source]

Events yielded by solid and pipeline execution.

Users should not instantiate this class.

property event_type

The type of this event.

Type

DagsterEventType

class dagster.DagsterEventType[source]

The types of events that may be yielded by solid and pipeline execution.

ENGINE_EVENT = 'ENGINE_EVENT'
HOOK_COMPLETED = 'HOOK_COMPLETED'
HOOK_ERRORED = 'HOOK_ERRORED'
HOOK_SKIPPED = 'HOOK_SKIPPED'
OBJECT_STORE_OPERATION = 'OBJECT_STORE_OPERATION'
PIPELINE_FAILURE = 'PIPELINE_FAILURE'
PIPELINE_INIT_FAILURE = 'PIPELINE_INIT_FAILURE'
PIPELINE_START = 'PIPELINE_START'
PIPELINE_SUCCESS = 'PIPELINE_SUCCESS'
STEP_EXPECTATION_RESULT = 'STEP_EXPECTATION_RESULT'
STEP_FAILURE = 'STEP_FAILURE'
STEP_INPUT = 'STEP_INPUT'
STEP_MATERIALIZATION = 'STEP_MATERIALIZATION'
STEP_OUTPUT = 'STEP_OUTPUT'
STEP_RESTARTED = 'STEP_RESTARTED'
STEP_SKIPPED = 'STEP_SKIPPED'
STEP_START = 'STEP_START'
STEP_SUCCESS = 'STEP_SUCCESS'
STEP_UP_FOR_RETRY = 'STEP_UP_FOR_RETRY'

Pipeline configuration

Run Config Schema

The run_config used by execute_pipeline() and execute_pipeline_iterator() has the following schema:

{
  # configuration for execution, required if executors require config
  execution: {
    # the name of one, and only one available executor, typically 'in_process' or 'multiprocess'
    __executor_name__: {
      # executor-specific config, if required or permitted
      config: {
        ...
      }
    }
  },

  # configuration for loggers, required if loggers require config
  loggers: {
    # the name of an available logger
    __logger_name__: {
      # logger-specific config, if required or permitted
      config: {
        ...
      }
    },
    ...
  },

  # configuration for resources, required if resources require config
  resources: {
    # the name of a resource
    __resource_name__: {
      # resource-specific config, if required or permitted
      config: {
        ...
      }
    },
    ...
  },

  # configuration for solids, required if solids require config
  solids: {

    # these keys align with the names of the solids, or their alias in this pipeline
    __solid_name__: {

      # pass any data that was defined via config_field
      config: ...,

      # configurably specify input values, keyed by input name
      inputs: {
        __input_name__: {
          # if an dagster_type_loader is specified, that schema must be satisfied here;
          # scalar, built-in types will generally allow their values to be specified directly:
          value: ...
        }
      },

      # configurably materialize output values
      outputs: {
        __output_name__: {
          # if an dagster_type_materializer is specified, that schema must be satisfied
          # here; pickleable types will generally allow output as follows:
          pickle: {
            path: String
          }
        }
      }
    }
  },

  # optionally use an available system storage for intermediates etc.
  storage: {
    # the name of one, and only one available system storage, typically 'filesystem' or
    # 'in_memory'
    __storage_name__: {
      config: {
        ...
      }
    }
  }
}

System Storage

dagster.mem_system_storage SystemStorageDefinition[source]

The default in-memory system storage.

In most Dagster environments, this will be the default system storage. It is available by default on any ModeDefinition that does not provide custom system storages. To select it explicitly, include the following top-level fragment in config:

storage:
  in_memory:
dagster.fs_system_storage SystemStorageDefinition[source]

The default filesystem system storage.

Filesystem system storage is available by default on any ModeDefinition that does not provide custom system storages. To select it, include a fragment such as the following in config:

storage:
  filesystem:
    base_dir: '/path/to/dir/'

You may omit the base_dir config value, in which case the filesystem storage will use the DagsterInstance-provided default.

dagster.default_system_storage_defs List[SystemStorageDefinition]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified. The default system storages available on any ModeDefinition that does not provide custom system storages. These are currently [mem_system_storage, fs_system_storage].

Intermediate Storage

dagster.mem_intermediate_storage IntermediateStorageDefinition[source]

The default in-memory intermediate storage.

In-memory intermediate storage is the default on any pipeline run that does not configure any custom intermediate storage.

Keep in mind when using this storage that intermediates will not be persisted after the pipeline run ends. Use a persistent intermediate storage like fs_intermediate_storage() to persist intermediates and take advantage of advanced features like pipeline re-execution.

dagster.fs_intermediate_storage IntermediateStorageDefinition[source]

The default filesystem intermediate storage.

Filesystem system storage is available by default on any ModeDefinition that does not provide custom system storages. To select it, include a fragment such as the following in config:

intermediate_storage:
  filesystem:
    base_dir: '/path/to/dir/'

You may omit the base_dir config value, in which case the filesystem storage will use the DagsterInstance-provided default.

dagster.default_intermediate_storage_defs List[IntermediateStorageDefinition]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified. The default intermediate storages available on any ModeDefinition that does not provide custom intermediate storages. These are currently [mem_intermediate_storage, fs_intermediate_storage].

Executors

dagster.in_process_executor ExecutorDefinition[source]

The default in-process executor.

In most Dagster environments, this will be the default executor. It is available by default on any ModeDefinition that does not provide custom executors. To select it explicitly, include the following top-level fragment in config:

execution:
  in_process:

Execution priority can be configured using the dagster/priority tag via solid metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.

dagster.multiprocess_executor ExecutorDefinition[source]

The default multiprocess executor.

This simple multiprocess executor is available by default on any ModeDefinition that does not provide custom executors. To select the multiprocess executor, include a fragment such as the following in your config:

execution:
  multiprocess:
    max_concurrent: 4

The max_concurrent arg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set max_concurrent to be 0, this is the return value of multiprocessing.cpu_count().

Execution priority can be configured using the dagster/priority tag via solid metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.

dagster.default_executors List[ExecutorDefinition]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified. The default executors available on any ModeDefinition that does not provide custom executors. These are currently [in_process_executor, multiprocess_executor].

Contexts

class dagster.SystemComputeExecutionContext(execution_context_data, log_manager, step)[source]
class dagster.TypeCheckContext(execution_context_data, log_manager, dagster_type)[source]

The context object available to a type check function on a DagsterType.

log

Centralized log dispatch from user code.

Type

DagsterLogManager

resources

An object whose attributes contain the resources available to this solid.

Type

Any

run_id

The id of this pipeline run.

Type

str

class dagster.HookContext(execution_context_data, log_manager, hook_def, step)[source]

The context object available to a hook function on an DagsterEvent.

log

Centralized log dispatch from user code.

Type

DagsterLogManager

hook_def

The hook that the context object belongs to.

Type

HookDefinition

step

The compute step associated with the hook.

Type

ExecutionStep

solid

The solid instance associated with the hook.

Type

Solid

resources

Resources available in the hook context.

Type

Any

solid_config

The parsed config specific to this solid.

Type

Dict[str, Any]