Source code for dagster._core.executor.base
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Iterator
from dagster import _check as check
from dagster._annotations import public
from dagster._core.execution.plan.objects import StepFailureData, StepRetryData
from dagster._core.execution.retries import RetryMode
from dagster._utils.error import SerializableErrorInfo
if TYPE_CHECKING:
from dagster._core.events import DagsterEvent
from dagster._core.execution.context.system import IStepContext, PlanOrchestrationContext
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.execution.plan.state import KnownExecutionState
[docs]
class Executor(ABC):
[docs]
@public
@abstractmethod
def execute(
self, plan_context: "PlanOrchestrationContext", execution_plan: "ExecutionPlan"
) -> Iterator["DagsterEvent"]:
"""For the given context and execution plan, orchestrate a series of sub plan executions in a way that satisfies the whole plan being executed.
Args:
plan_context (PlanOrchestrationContext): The plan's orchestration context.
execution_plan (ExecutionPlan): The plan to execute.
Returns:
A stream of dagster events.
"""
@public
@property
@abstractmethod
def retries(self) -> RetryMode:
"""Whether retries are enabled or disabled for this instance of the executor.
Executors should allow this to be controlled via configuration if possible.
Returns: RetryMode
"""
def get_failure_or_retry_event_after_crash(
self,
step_context: "IStepContext",
err_info: SerializableErrorInfo,
known_state: "KnownExecutionState",
):
from dagster._core.events import DagsterEvent
# determine the retry policy for the step if needed
retry_policy = step_context.op_retry_policy
retry_state = known_state.get_retry_state()
previous_attempt_count = retry_state.get_attempt_count(step_context.step.key)
should_retry = (
retry_policy
and not step_context.retry_mode.disabled
and previous_attempt_count < retry_policy.max_retries
)
if should_retry:
return DagsterEvent.step_retry_event(
step_context,
StepRetryData(
error=err_info,
seconds_to_wait=check.not_none(retry_policy).calculate_delay(
previous_attempt_count + 1
),
),
)
else:
return DagsterEvent.step_failure_event(
step_context=step_context,
step_failure_data=StepFailureData(error=err_info, user_failure_data=None),
)