Internals


Please note that internal APIs are likely to be in much greater flux pre-1.0 than user-facing APIs, particularly if not exported in the top level dagster module.


If you find yourself consulting these docs because you are writing custom components and plug-ins, please get in touch with the core team on our Slack. We’re curious what you’re up to, happy to help, excited for new community contributions, and eager to make the system as easy to work with as possible – including for teams who are looking to customize it.

Logging

class dagster.DagsterLogManager[source]

Centralized dispatch for logging from user code.

Handles the construction of uniform structured log messages and passes them through to the underlying loggers.

An instance of the log manager is made available to solids as context.log. Users should not initialize instances of the log manager directly. To configure custom loggers, set the logger_defs on a ModeDefinition for a pipeline.

The log manager supports standard convenience methods like those exposed by the Python standard library logging module (i.e., within the body of a solid, context.log.{debug, info, warning, warn, error, critical, fatal}).

The underlying integer API can also be called directly using, e.g. context.log.log(5, msg), and the log manager will delegate to the log method defined on each of the loggers it manages.

User-defined custom log levels are not supported, and calls to, e.g., context.log.trace or context.log.notice will result in hard exceptions at runtime.

critical(msg, **kwargs)[source]

Log at the logging.CRITICAL level.

See debug().

debug(msg, **kwargs)[source]

Log at the logging.DEBUG level.

The message will be automatically adorned with contextual information about the name of the pipeline, the name of the solid, etc., so it is generally unnecessary to include this type of information in the log message.

You can optionally additional key-value pairs to an individual log message using the kwargs to this method.

Parameters
  • msg (str) – The message to log.

  • **kwargs (Optional[Any]) – Any additional key-value pairs for only this log message.

error(msg, **kwargs)[source]

Log at the logging.ERROR level.

See debug().

fatal(msg, **kwargs)

Alias for critical()

info(msg, **kwargs)[source]

Log at the logging.INFO level.

See debug().

log(level, msg, **kwargs)[source]

Invoke the underlying loggers for a given integer log level.

Parameters
  • level (int) – An integer represeting a Python logging level.

  • orig_message (str) – The message to log.

warn(msg, **kwargs)

Alias for warning()

warning(msg, **kwargs)[source]

Log at the logging.WARNING level.

See debug().

with_tags(**new_tags)[source]

Add new tags in “new_tags” to the set of tags attached to this log manager instance, and return a new DagsterLogManager with the merged set of tags.

Parameters

tags (Dict[str,str]) – Dictionary of tags

Returns

a new DagsterLogManager namedtuple with updated tags for the same

run ID and loggers.

Return type

DagsterLogManager


Executors

@dagster.executor(name=None, config_schema=None, required_resource_keys=None)[source]

Define an executor.

The decorated function should accept an InitExecutorContext and return an instance of Executor.

Parameters
  • name (Optional[str]) – The name of the executor.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.executor_config.

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the executor.

class dagster.ExecutorDefinition(name, config_schema=None, executor_creation_fn=None, required_resource_keys=None, _configured_config_mapping_fn=None)[source]
Parameters
  • name (Optional[str]) – The name of the executor.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.executor_config.

  • executor_creation_fn (Optional[Callable]) – Should accept an InitExecutorContext and return an instance of Executor

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the executor.

  • _configured_config_mapping_fn – This argument is for internal use only. Users should not specify this field. To preconfigure a resource, use the configured() API.

class dagster.InitExecutorContext[source]

Executor-specific initialization context.

pipeline

The pipeline to be executed.

Type

IPipeline

mode_def

The mode in which the pipeline is to be executed.

Type

ModeDefinition

executor_def

The definition of the executor currently being constructed.

Type

ExecutorDefinition

pipeline_run

Configuration for this pipeline run.

Type

PipelineRun

environment_config

The parsed environment configuration for this pipeline run.

Type

EnvironmentConfig

executor_config

The parsed config passed to the executor.

Type

dict

system_storage_def

The system storage definition.

Type

SystemStorageDefinition

intermediate_storage_def

The intermediate storage definition.

Type

Optional[IntermediateStorageDefinition]

instance

The current instance.

Type

DagsterInstance

class dagster.Executor[source]
abstract execute(pipeline_context, execution_plan)[source]

For the given context and execution plan, orchestrate a series of sub plan executions in a way that satisfies the whole plan being executed.

Parameters
  • pipeline_context (SystemPipelineExecutionContext) – The pipeline execution context.

  • execution_plan (ExecutionPlan) – The plan to execute.

Returns

A stream of dagster events.

abstract property retries

The Retries state / policy for this instance of the Executor. Executors should allow this to be controlled via configuration if possible.

Returns: Retries


System Storage

@dagster.system_storage(required_resource_keys, name=None, is_persistent=True, config_schema=None)[source]

Creates a system storage definition.

The decorated function will be passed as the system_storage_creation_fn to a SystemStorageDefinition.

Parameters
  • name (str) – The name of the system storage.

  • is_persistent (bool) – Does storage def persist in way that can cross process/node boundaries. Execution with, for example, the multiprocess executor or within the context of dagster-airflow require a persistent storage mode.

  • required_resource_keys (Set[str]) – The resources that this storage needs at runtime to function.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.system_storage_config.

class dagster.SystemStorageDefinition(name, is_persistent, required_resource_keys, config_schema=None, system_storage_creation_fn=None, _configured_config_mapping_fn=None)[source]

Defines run metadata and intermediate data storage behaviors.

Example storage definitions are the default mem_system_storage(), which stores all intermediates and run data in memory, and fs_system_storage(), which stores all that data in the local filesystem. By default, storage definitions can be configured on a per-pipeline run basis by setting the storage.in_memory and storage.filesystem keys in pipeline run configuration respectively.

It’s possible to write additional system storage definitions, such as the dagster_aws.s3_system_storage. Library authors can write system storages to support additional cloud providers, and users can write custom system storages to support their own infrastructure needs.

Storage definitions can be made available to pipelines by setting the system_storage_defs on a ModeDefinition attached to the pipeline definition. This will determine the config schema of the storage key in the pipeline run configuration.

Parameters
  • name (str) – Name of the storage mode.

  • is_persistent (bool) – Whether the storage is persistent in a way that can cross process/node boundaries. Execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode.

  • required_resource_keys (Set[str]) – The resources that this storage needs at runtime to function.

  • config_schema (Optional[ConfigSchema]) – The schema for the storage’s configuration schema. Configuration data passed in this schema will be made available to the system_storage_creation_fn under init_context.system_storage_config.

  • system_storage_creation_fn – (Callable[[InitSystemStorageContext], SystemStorageData]) Called to construct the storage. This function should consume the init context and emit a SystemStorageData.

  • _configured_config_mapping_fn – This argument is for internal use only. Users should not specify this field. To preconfigure a resource, use the configured() API.

class dagster.InitSystemStorageContext[source]

System storage-specific initialization context.

pipeline_def

The definition of the pipeline in context.

Type

PipelineDefinition

mode_def

The definition of the mode in contxt.

Type

ModeDefinition

system_storage_def

The definition of the system storage to be constructed.

Type

SystemStorageDefinition

pipeline_run

The pipeline run in context.

Type

PipelineRun

instance

The instance.

Type

DagsterInstance

environment_config

The environment config.

Type

EnvironmentConfig

type_storage_plugin_registry

Registry containing custom type storage plugins.

Type

TypeStoragePluginRegistry

resources

Resources available in context.

Type

Any

system_storage_config

The system storage-specific configuration data provided by the environment config. The schema for this data is defined by the config_field argument to SystemStorageDefinition.

Type

Dict[str, Any]

class dagster.core.storage.system_storage.SystemStorageData(intermediate_storage, file_manager)[source]

Represents an instance of system storage.

intermediate_storage

An intermediates manager.

Type

IntermediateStorage

file_manager

A file manager.

Type

FileManager

class dagster.core.storage.file_manager.FileManager[source]

The base class for all file managers in dagster. The file manager is a user-facing abstraction that allows a Dagster user to pass files in between solids, and the file manager is responsible for marshalling those files to and from the nodes where the actual Dagster computation occur.

If the user does their file manipulations using this abstraction, it is straightforward to write a pipeline that executes both:

  1. in a local development environment with no external dependencies, where files are available directly on the filesystem and

  2. in a cluster environment where those files would need to be on a distributed filesystem (e.g. hdfs) or an object store (s3).

The business logic remains constant and a new implementation of the file manager is swapped out based on the system storage specified by the operator.

abstract copy_handle_to_local_temp(file_handle)[source]

Take a file handle and make it available as a local temp file. Returns a path.

In an implementation like an S3FileManager, this would download the file from s3 to local filesystem, to files created (typically) by the python tempfile module.

These temp files are not guaranteed to be able across solid boundaries. For files that must work across solid boundaries, use the read, read_data, write, and write_data methods on this class.

Parameters

file_handle (FileHandle) – The file handle to make available as a local temp file.

Returns

Path to the temp file.

Return type

str

abstract delete_local_temp()[source]

Delete all the local temporary files created by copy_handle_to_local_temp. This should typically only be called by framework implementors.

abstract read(file_handle, mode='rb')[source]

Return a file-like stream for the file handle. Defaults to binary mode read. This may incur an expensive network call for file managers backed by object stores such as s3.

Parameters
  • file_handle (FileHandle) – The file handle to make available as a stream.

  • mode (str) – The mode in which to open the file. Default: 'rb'.

Returns

A file-like stream.

Return type

Union[IOBytes, IOString]

abstract read_data(file_handle)[source]

Return the bytes for a given file handle. This may incur an expensive network call for file managers backed by object stores such as s3.

Parameters

file_handle (FileHandle) – The file handle for which to return bytes.

Returns

Bytes for a given file handle.

Return type

bytes

abstract write(file_obj, mode='wb', ext=None)[source]

Write the bytes contained within the given file_obj into the file manager. This returns a FileHandle corresponding to the newly created file.

File managers typically return a subclass of FileHandle appropriate for their implementation: e.g., a LocalFileManager returns a LocalFileHandle, an S3FileManager returns an S3FileHandle, and so forth.

Parameters
  • file_obj (Union[IOBytes, IOString]) – A file-like object.

  • mode (Optional[str]) – The mode in which to write the file into storage. Default: 'wb'.

  • ext (Optional[str]) – For file managers that support file extensions, the extension with which to write the file. Default: None.

Returns

A handle to the newly created file.

Return type

FileHandle

abstract write_data(data, ext=None)[source]

Write raw bytes into storage.

Parameters
  • data (bytes) – The bytes to write into storage.

  • ext (Optional[str]) – For file managers that support file extensions, the extension with which to write the file. Default: None.

Returns

A handle to the newly created file.

Return type

FileHandle

class dagster.core.storage.file_manager.LocalFileManager(base_dir)[source]

Intermediate Storage

@dagster.intermediate_storage(required_resource_keys=None, name=None, is_persistent=True, config_schema=None)[source]

Creates an intermediate storage definition

The decorated function will be passed as the intermediate_storage_creation_fn to a IntermediateStorageDefinition.

Parameters
  • name (str) – The name of the intermediate storage.

  • is_persistent (bool) – Whether the storage is persistent in a way that can cross process/node boundaries. Re-execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode.

  • required_resource_keys (Optional[Set[str]]) – The resources that this storage needs at runtime to function.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.intermediate_storage_config.

class dagster.IntermediateStorageDefinition(name, is_persistent, required_resource_keys, config_schema=None, intermediate_storage_creation_fn=None, _configured_config_mapping_fn=None)[source]

Defines intermediate data storage behaviors.

Parameters
  • name (str) – Name of the storage mode.

  • is_persistent (bool) – Whether the storage is persistent in a way that can cross process/node boundaries. Re-execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode.

  • required_resource_keys (Optional[Set[str]]) – The resources that this storage needs at runtime to function.

  • config_schema (Optional[ConfigSchema]) – The schema for the storage’s configuration schema. Configuration data passed in this schema will be made available to the intermediate_storage_creation_fn under init_context.intermediate_storage_config.

  • intermediate_storage_creation_fn – (Callable[[InitIntermediateStorageContext], IntermediateStorage]) Called to construct the storage. This function should consume the init context and emit a IntermediateStorage.

  • _configured_config_mapping_fn – This argument is for internal use only. Users should not specify this field. To preconfigure a resource, use the configured() API.

class dagster.InitIntermediateStorageContext[source]

Intermediate storage-specific initialization context.

pipeline_def

The definition of the pipeline in context.

Type

PipelineDefinition

mode_def

The definition of the mode in contxt.

Type

ModeDefinition

intermediate_storage_def

The definition of the intermediate storage to be constructed.

Type

IntermediateStorageDefinition

pipeline_run

The pipeline run in context.

Type

PipelineRun

instance

The instance.

Type

DagsterInstance

environment_config

The environment config.

Type

EnvironmentConfig

type_storage_plugin_registry

Registry containing custom type storage plugins.

Type

TypeStoragePluginRegistry

resources

Resources available in context.

Type

Any

intermediate_storage_config

The intermediate storage-specific configuration data provided by the environment config. The schema for this data is defined by the config_field argument to IntermediateStorageDefinition.

Type

Dict[str, Any]


Instance

class dagster.DagsterInstance(instance_type, local_artifact_storage, run_storage, event_storage, compute_log_manager, schedule_storage=None, scheduler=None, run_coordinator=None, run_launcher=None, settings=None, ref=None)[source]

Core abstraction for managing Dagster’s access to storage and other resources.

Use DagsterInstance.get() to grab the current DagsterInstance which will load based on the values in the dagster.yaml file in $DAGSTER_HOME if set, otherwise fallback to using an ephemeral in-memory set of components.

Configuration of this class should be done by setting values in $DAGSTER_HOME/dagster.yaml. For example, to use Postgres for run and event log storage, you can write a dagster.yaml such as the following:

Parameters
  • instance_type (InstanceType) – Indicates whether the instance is ephemeral or persistent. Users should not attempt to set this value directly or in their dagster.yaml files.

  • local_artifact_storage (LocalArtifactStorage) – The local artifact storage is used to configure storage for any artifacts that require a local disk, such as schedules, or when using the filesystem system storage to manage files and intermediates. By default, this will be a dagster.core.storage.root.LocalArtifactStorage. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • run_storage (RunStorage) – The run storage is used to store metadata about ongoing and past pipeline runs. By default, this will be a dagster.core.storage.runs.SqliteRunStorage. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • event_storage (EventLogStorage) – Used to store the structured event logs generated by pipeline runs. By default, this will be a dagster.core.storage.event_log.SqliteEventLogStorage. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • compute_log_manager (ComputeLogManager) – The compute log manager handles stdout and stderr logging for solid compute functions. By default, this will be a dagster.core.storage.local_compute_log_manager.LocalComputeLogManager. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • run_coordinator (RunCoordinator) – A runs coordinator may be used to manage the execution of pipeline runs.

  • run_launcher (Optional[RunLauncher]) – Optionally, a run launcher may be used to enable a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in addition to running them locally.

  • settings (Optional[Dict]) – Specifies certain per-instance settings, such as feature flags. These are set in the dagster.yaml under a set of whitelisted keys.

  • ref (Optional[InstanceRef]) – Used by internal machinery to pass instances across process boundaries.

get_addresses_for_step_output_versions(step_output_versions)[source]

For each given step output, finds whether an output exists with the given version, and returns its address if it does.

Parameters

step_output_versions (Dict[(str, StepOutputHandle), str]) – (pipeline name, step output handle) -> version.

Returns

(pipeline name, step output handle) -> address.

For each step output, an address if there is one and None otherwise.

Return type

Dict[(str, StepOutputHandle), str]

launch_run(run_id, external_pipeline)[source]

Launch a pipeline run.

This method is typically called using instance.submit_run rather than being invoked directly. This method delegates to the RunLauncher, if any, configured on the instance, and will call its implementation of RunLauncher.launch_run() to begin the execution of the specified run. Runs should be created in the instance (e.g., by calling DagsterInstance.create_run()) before this method is called, and should be in the PipelineRunStatus.NOT_STARTED state.

Parameters

run_id (str) – The id of the run the launch.

report_engine_event(message, pipeline_run, engine_event_data=None, cls=None, step_key=None)[source]

Report a EngineEvent that occurred outside of a pipeline execution context.

resolve_memoized_execution_plan(execution_plan, run_config, mode)[source]
Returns

Execution plan configured to only run unmemoized steps.

Return type

ExecutionPlan

submit_run(run_id, external_pipeline)[source]

Submit a pipeline run to the coordinator.

This method delegates to the RunCoordinator, configured on the instance, and will call its implementation of RunCoordinator.submit_run() to send the run to the coordinator for execution. Runs should be created in the instance (e.g., by calling DagsterInstance.create_run()) before this method is called, and should be in the PipelineRunStatus.NOT_STARTED state. They also must have a non-null ExternalPipelineOrigin.

Parameters

run_id (str) – The id of the run.

class dagster.core.instance.InstanceRef[source]

Serializable representation of a DagsterInstance.

Users should not instantiate this class directly.

class dagster.serdes.ConfigurableClass[source]

Abstract mixin for classes that can be loaded from config.

This supports a powerful plugin pattern which avoids both a) a lengthy, hard-to-synchronize list of conditional imports / optional extras_requires in dagster core and b) a magic directory or file in which third parties can place plugin packages. Instead, the intention is to make, e.g., run storage, pluggable with a config chunk like:

run_storage:
    module: very_cool_package.run_storage
    class: SplendidRunStorage
    config:
        magic_word: "quux"

This same pattern should eventually be viable for other system components, e.g. engines.

The ConfigurableClass mixin provides the necessary hooks for classes to be instantiated from an instance of ConfigurableClassData.

Pieces of the Dagster system which we wish to make pluggable in this way should consume a config type such as:

{'module': str, 'class': str, 'config': Field(Permissive())}
abstract classmethod config_type()[source]

dagster.ConfigType: The config type against which to validate a config yaml fragment serialized in an instance of ConfigurableClassData.

abstract static from_config_value(inst_data, config_value)[source]

New up an instance of the ConfigurableClass from a validated config value.

Called by ConfigurableClassData.rehydrate.

Parameters

config_value (dict) – The validated config value to use. Typically this should be the value attribute of a EvaluateValueResult.

A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass’s constructor:

@staticmethod
def from_config_value(inst_data, config_value):
    return MyConfigurableClass(inst_data=inst_data, **config_value)
abstract property inst_data

Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path.

class dagster.serdes.ConfigurableClassData[source]

Serializable tuple describing where to find a class and the config fragment that should be used to instantiate it.

Users should not instantiate this class directly.

Classes intended to be serialized in this way should implement the dagster.serdes.ConfigurableClass mixin.

class dagster.core.storage.root.LocalArtifactStorage(base_dir, inst_data=None)[source]
classmethod config_type()[source]

dagster.ConfigType: The config type against which to validate a config yaml fragment serialized in an instance of ConfigurableClassData.

static from_config_value(inst_data, config_value)[source]

New up an instance of the ConfigurableClass from a validated config value.

Called by ConfigurableClassData.rehydrate.

Parameters

config_value (dict) – The validated config value to use. Typically this should be the value attribute of a EvaluateValueResult.

A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass’s constructor:

@staticmethod
def from_config_value(inst_data, config_value):
    return MyConfigurableClass(inst_data=inst_data, **config_value)
property inst_data

Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path.


Run storage

class dagster.PipelineRun[source]

Serializable internal representation of a pipeline run, as stored in a RunStorage.

class dagster.core.storage.runs.RunStorage[source]

Abstract base class for storing pipeline run history.

Note that run storages using SQL databases as backing stores should implement SqlRunStorage.

Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of concrete subclasses of this class should be done by setting values in that file.

class dagster.core.storage.runs.SqlRunStorage[source]

Base class for SQL based run storages

class dagster.core.storage.runs.SqliteRunStorage(conn_string, inst_data=None)[source]

SQLite-backed run storage.

Users should not directly instantiate this class; it is instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of this class should be done by setting values in that file.

This is the default run storage when none is specified in the dagster.yaml.

To explicitly specify SQLite for run storage, you can add a block such as the following to your dagster.yaml:

run_storage:
  module: dagster.core.storage.runs
  class: SqliteRunStorage
  config:
    base_dir: /path/to/dir

The base_dir param tells the run storage where on disk to store the database.

See also: dagster_postgres.PostgresRunStorage.


Event log storage

class dagster.core.storage.event_log.EventLogStorage[source]

Abstract base class for storing structured event logs from pipeline runs.

Note that event log storages using SQL databases as backing stores should implement SqlEventLogStorage.

Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of concrete subclasses of this class should be done by setting values in that file.

class dagster.core.storage.event_log.SqlEventLogStorage[source]

Base class for SQL backed event log storages.

class dagster.core.storage.event_log.SqliteEventLogStorage(base_dir, inst_data=None)[source]

SQLite-backed event log storage.

Users should not directly instantiate this class; it is instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of this class should be done by setting values in that file.

This is the default event log storage when none is specified in the dagster.yaml.

To explicitly specify SQLite for event log storage, you can add a block such as the following to your dagster.yaml:

event_log_storage:
  module: dagster.core.storage.event_log
  class: SqliteEventLogStorage
  config:
    base_dir: /path/to/dir

The base_dir param tells the event log storage where on disk to store the databases. To improve concurrent performance, event logs are stored in a separate SQLite database for each run.

See also: dagster_postgres.PostgresEventLogStorage.


Compute log manager

class dagster.core.storage.compute_log_manager.ComputeLogManager[source]

Abstract base class for storing unstructured compute logs (stdout/stderr) from the compute steps of pipeline solids.

class dagster.core.storage.local_compute_log_manager.LocalComputeLogManager(base_dir, inst_data=None)[source]

Stores copies of stdout & stderr for each compute step locally on disk.

See also: dagster_aws.S3ComputeLogManager.


Run launcher

class dagster.core.launcher.RunLauncher[source]
class dagster.core.launcher.DefaultRunLauncher(inst_data=None)[source]

Launches runs against running GRPC servers.

See also: dagster_k8s.K8sRunLauncher.


Scheduling

class dagster.core.scheduler.Scheduler[source]

Abstract base class for a scheduler. This component is responsible for interfacing with an external system such as cron to ensure scheduled repeated execution according.

class dagster_cron.cron_scheduler.SystemCronScheduler(inst_data=None)[source]

Scheduler implementation that uses the local systems cron. Only works on unix systems that have cron.

Enable this scheduler by adding it to your dagster.yaml in $DAGSTER_HOME.

class dagster.core.storage.schedules.ScheduleStorage[source]

Abstract class for managing persistance of scheduler artifacts

class dagster.core.storage.schedules.SqlScheduleStorage[source]

Base class for SQL backed schedule storage

class dagster.core.storage.schedules.SqliteScheduleStorage(conn_string, inst_data=None)[source]

Local SQLite backed schedule storage

See also: dagster_postgres.PostgresScheduleStorage.


Exception handling

dagster.core.errors.user_code_error_boundary(error_cls, msg_fn, control_flow_exceptions=None, **kwargs)[source]

Wraps the execution of user-space code in an error boundary. This places a uniform policy around an user code invoked by the framework. This ensures that all user errors are wrapped in an exception derived from DagsterUserCodeExecutionError, and that the original stack trace of the user error is preserved, so that it can be reported without confusing framework code in the stack trace, if a tool author wishes to do so. This has been especially help in a notebooking context.

Examples:

with user_code_error_boundary(
    # Pass a class that inherits from DagsterUserCodeExecutionError
    DagstermillExecutionError,
    # Pass a function that produces a message
    lambda: 'Error occurred during the execution of Dagstermill solid '
    '{solid_name}: {notebook_path}'.format(
        solid_name=name, notebook_path=notebook_path
    ),
):
    call_user_provided_function()

Architecture

Details of internal architecture captured at a specific point in time. These are expected to only be useful to people working on dagster core or complex libraries/integrations.

Pipeline Execution Flow - March 2020 (0.7.6)

/assets/images/apidocs/internal/execution_flow.png