Ask AI

Source code for dagster._core.run_coordinator.default_run_coordinator

import logging
from typing import Mapping, Optional

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData

from .base import RunCoordinator, SubmitRunContext


[docs]class DefaultRunCoordinator(RunCoordinator, ConfigurableClass): """Immediately send runs to the run launcher.""" def __init__(self, inst_data: Optional[ConfigurableClassData] = None): self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator") super().__init__() @property def inst_data(self) -> Optional[ConfigurableClassData]: return self._inst_data @classmethod def config_type(cls) -> UserConfigSchema: return {} @classmethod def from_config_value( cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object] ) -> Self: return cls(inst_data=inst_data, **config_value) def submit_run(self, context: SubmitRunContext) -> DagsterRun: dagster_run = context.dagster_run if dagster_run.status == DagsterRunStatus.NOT_STARTED: self._instance.launch_run(dagster_run.run_id, context.workspace) else: self._logger.warning( f"submit_run called for run {dagster_run.run_id} with status " f"{dagster_run.status.value}, skipping launch." ) run = self._instance.get_run_by_id(dagster_run.run_id) if run is None: check.failed(f"Failed to reload run {dagster_run.run_id}") return run def cancel_run(self, run_id: str) -> bool: return self._instance.run_launcher.terminate(run_id)