Source code for dagster.core.executor.init

from collections import namedtuple

from dagster import check
from dagster.core.definitions import (
    ExecutorDefinition,
    IPipeline,
    IntermediateStorageDefinition,
    ModeDefinition,
)
from dagster.core.instance import DagsterInstance
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.system_config.objects import EnvironmentConfig


[docs]class InitExecutorContext( namedtuple( "InitExecutorContext", "pipeline mode_def executor_def pipeline_run environment_config " "executor_config intermediate_storage_def instance", ) ): """Executor-specific initialization context. Attributes: pipeline (IPipeline): The pipeline to be executed. mode_def (ModeDefinition): The mode in which the pipeline is to be executed. executor_def (ExecutorDefinition): The definition of the executor currently being constructed. pipeline_run (PipelineRun): Configuration for this pipeline run. environment_config (EnvironmentConfig): The parsed environment configuration for this pipeline run. executor_config (dict): The parsed config passed to the executor. intermediate_storage_def (Optional[IntermediateStorageDefinition]): The intermediate storage definition. instance (DagsterInstance): The current instance. """ def __new__( cls, pipeline, mode_def, executor_def, pipeline_run, environment_config, executor_config, instance, intermediate_storage_def=None, ): return super(InitExecutorContext, cls).__new__( cls, pipeline=check.inst_param(pipeline, "pipeline", IPipeline), mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition), executor_def=check.inst_param(executor_def, "executor_def", ExecutorDefinition), pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun), environment_config=check.inst_param( environment_config, "environment_config", EnvironmentConfig ), executor_config=check.dict_param(executor_config, executor_config, key_type=str), intermediate_storage_def=check.opt_inst_param( intermediate_storage_def, "intermediate_storage_def", IntermediateStorageDefinition ), instance=check.inst_param(instance, "instance", DagsterInstance), ) @property def pipeline_def(self): return self.pipeline.get_definition()