Source code for dagster._core.launcher.base
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, NamedTuple, Optional
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.dagster_run import DagsterRun
from dagster._serdes import whitelist_for_serdes
if TYPE_CHECKING:
from dagster._core.workspace.context import BaseWorkspaceRequestContext
class LaunchRunContext(NamedTuple):
"""Context available within a run launcher's launch_run call."""
dagster_run: DagsterRun
workspace: Optional["BaseWorkspaceRequestContext"]
@property
def job_code_origin(self) -> Optional[JobPythonOrigin]:
return self.dagster_run.job_code_origin
class ResumeRunContext(NamedTuple):
"""Context available within a run launcher's resume_run call."""
dagster_run: DagsterRun
workspace: Optional["BaseWorkspaceRequestContext"]
resume_attempt_number: Optional[int] = None
@property
def job_code_origin(self) -> Optional[JobPythonOrigin]:
return self.dagster_run.job_code_origin
@whitelist_for_serdes
class WorkerStatus(Enum):
RUNNING = "RUNNING"
NOT_FOUND = "NOT_FOUND"
FAILED = "FAILED"
SUCCESS = "SUCCESS"
UNKNOWN = "UNKNOWN"
class CheckRunHealthResult(NamedTuple):
"""Result of a check_run_worker_health call."""
status: WorkerStatus
msg: Optional[str] = None
transient: Optional[bool] = None
run_worker_id: Optional[str] = None # Identifier for a particular run worker
def __str__(self) -> str:
return f"{self.status.value}: '{self.msg}'"
[docs]
class RunLauncher(ABC, MayHaveInstanceWeakref[T_DagsterInstance]):
@abstractmethod
def launch_run(self, context: LaunchRunContext) -> None:
"""Launch a run.
This method should begin the execution of the specified run, and may emit engine events.
Runs should be created in the instance (e.g., by calling
``DagsterInstance.create_run()``) *before* this method is called, and
should be in the ``PipelineRunStatus.STARTING`` state. Typically, this method will
not be invoked directly, but should be invoked through ``DagsterInstance.launch_run()``.
Args:
context (LaunchRunContext): information about the launch - every run launcher
will need the PipelineRun, and some run launchers may need information from the
BaseWorkspaceRequestContext from which the run was launched.
"""
@abstractmethod
def terminate(self, run_id: str) -> bool:
"""Terminates a process.
Returns False is the process was already terminated. Returns true if
the process was alive and was successfully terminated
"""
def dispose(self) -> None:
"""Do any resource cleanup that should happen when the DagsterInstance is
cleaning itself up.
"""
def join(self, timeout: int = 30) -> None:
pass
@property
def supports_check_run_worker_health(self) -> bool:
"""Whether the run launcher supports check_run_worker_health."""
return False
def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
raise NotImplementedError(
"This run launcher does not support run monitoring. Please disable it on your instance."
)
def get_run_worker_debug_info(
self, run: DagsterRun, include_container_logs: Optional[bool] = True
) -> Optional[str]:
return None
@property
def supports_resume_run(self) -> bool:
"""Whether the run launcher supports resume_run."""
return False
def resume_run(self, context: ResumeRunContext) -> None:
raise NotImplementedError(
"This run launcher does not support resuming runs. If using "
"run monitoring, set max_resume_run_attempts to 0."
)