Ask AI

Source code for dagster._core.events

"""Structured representations of system events."""

import logging
import os
import sys
import uuid
from enum import Enum
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Any,
    Dict,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Union,
    cast,
)

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions import (
    AssetKey,
    AssetMaterialization,
    AssetObservation,
    ExpectationResult,
    HookDefinition,
    NodeHandle,
)
from dagster._core.definitions.asset_check_evaluation import (
    AssetCheckEvaluation,
    AssetCheckEvaluationPlanned,
)
from dagster._core.definitions.events import AssetLineageInfo, ObjectStoreOperationType
from dagster._core.definitions.metadata import (
    MetadataFieldSerializer,
    MetadataValue,
    RawMetadataValue,
    normalize_metadata,
)
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.errors import DagsterInvariantViolationError, HookExecutionError
from dagster._core.execution.context.system import IPlanContext, IStepContext, StepExecutionContext
from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster._core.execution.plan.inputs import StepInputData
from dagster._core.execution.plan.objects import StepFailureData, StepRetryData, StepSuccessData
from dagster._core.execution.plan.outputs import StepOutputData
from dagster._core.log_manager import DagsterLogManager
from dagster._core.storage.captured_log_manager import CapturedLogContext
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._serdes import (
    NamedTupleSerializer,
    whitelist_for_serdes,
)
from dagster._serdes.serdes import (
    EnumSerializer,
    UnpackContext,
    is_whitelisted_for_serdes_object,
)
from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster._utils.timing import format_duration

if TYPE_CHECKING:
    from dagster._core.definitions.events import ObjectStoreOperation
    from dagster._core.execution.plan.plan import ExecutionPlan
    from dagster._core.execution.plan.step import StepKind


EventSpecificData = Union[
    StepOutputData,
    StepFailureData,
    StepSuccessData,
    "StepMaterializationData",
    "StepExpectationResultData",
    StepInputData,
    "EngineEventData",
    "HookErroredData",
    StepRetryData,
    "JobFailureData",
    "JobCanceledData",
    "ObjectStoreOperationResultData",
    "HandledOutputData",
    "LoadedInputData",
    "ComputeLogsCaptureData",
    "AssetObservationData",
    "AssetMaterializationPlannedData",
    "AssetCheckEvaluation",
    "AssetCheckEvaluationPlanned",
]


[docs]class DagsterEventType(str, Enum): """The types of events that may be yielded by op and job execution.""" STEP_OUTPUT = "STEP_OUTPUT" STEP_INPUT = "STEP_INPUT" STEP_FAILURE = "STEP_FAILURE" STEP_START = "STEP_START" STEP_SUCCESS = "STEP_SUCCESS" STEP_SKIPPED = "STEP_SKIPPED" # The process carrying out step execution is starting/started. Shown as a # marker start/end in the Dagster UI. STEP_WORKER_STARTING = "STEP_WORKER_STARTING" STEP_WORKER_STARTED = "STEP_WORKER_STARTED" # Resource initialization for execution has started/succeede/failed. Shown # as a marker start/end in the Dagster UI. RESOURCE_INIT_STARTED = "RESOURCE_INIT_STARTED" RESOURCE_INIT_SUCCESS = "RESOURCE_INIT_SUCCESS" RESOURCE_INIT_FAILURE = "RESOURCE_INIT_FAILURE" STEP_UP_FOR_RETRY = "STEP_UP_FOR_RETRY" # "failed" but want to retry STEP_RESTARTED = "STEP_RESTARTED" ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" ASSET_MATERIALIZATION_PLANNED = "ASSET_MATERIALIZATION_PLANNED" ASSET_OBSERVATION = "ASSET_OBSERVATION" STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" ASSET_CHECK_EVALUATION_PLANNED = "ASSET_CHECK_EVALUATION_PLANNED" ASSET_CHECK_EVALUATION = "ASSET_CHECK_EVALUATION" # We want to display RUN_* events in the Dagster UI and in our LogManager output, but in order to # support backcompat for our storage layer, we need to keep the persisted value to be strings # of the form "PIPELINE_*". We may have user code that pass in the DagsterEventType # enum values into storage APIs (like get_event_records, which takes in an EventRecordsFilter). RUN_ENQUEUED = "PIPELINE_ENQUEUED" RUN_DEQUEUED = "PIPELINE_DEQUEUED" RUN_STARTING = "PIPELINE_STARTING" # Launch is happening, execution hasn't started yet RUN_START = "PIPELINE_START" # Execution has started RUN_SUCCESS = "PIPELINE_SUCCESS" RUN_FAILURE = "PIPELINE_FAILURE" RUN_CANCELING = "PIPELINE_CANCELING" RUN_CANCELED = "PIPELINE_CANCELED" # Keep these legacy enum values around, to keep back-compatability for user code that might be # using these constants to filter event records PIPELINE_ENQUEUED = RUN_ENQUEUED PIPELINE_DEQUEUED = RUN_DEQUEUED PIPELINE_STARTING = RUN_STARTING PIPELINE_START = RUN_START PIPELINE_SUCCESS = RUN_SUCCESS PIPELINE_FAILURE = RUN_FAILURE PIPELINE_CANCELING = RUN_CANCELING PIPELINE_CANCELED = RUN_CANCELED OBJECT_STORE_OPERATION = "OBJECT_STORE_OPERATION" ASSET_STORE_OPERATION = "ASSET_STORE_OPERATION" LOADED_INPUT = "LOADED_INPUT" HANDLED_OUTPUT = "HANDLED_OUTPUT" ENGINE_EVENT = "ENGINE_EVENT" HOOK_COMPLETED = "HOOK_COMPLETED" HOOK_ERRORED = "HOOK_ERRORED" HOOK_SKIPPED = "HOOK_SKIPPED" ALERT_START = "ALERT_START" ALERT_SUCCESS = "ALERT_SUCCESS" ALERT_FAILURE = "ALERT_FAILURE" LOGS_CAPTURED = "LOGS_CAPTURED"
EVENT_TYPE_TO_DISPLAY_STRING = { DagsterEventType.PIPELINE_ENQUEUED: "RUN_ENQUEUED", DagsterEventType.PIPELINE_DEQUEUED: "RUN_DEQUEUED", DagsterEventType.PIPELINE_STARTING: "RUN_STARTING", DagsterEventType.PIPELINE_START: "RUN_START", DagsterEventType.PIPELINE_SUCCESS: "RUN_SUCCESS", DagsterEventType.PIPELINE_FAILURE: "RUN_FAILURE", DagsterEventType.PIPELINE_CANCELING: "RUN_CANCELING", DagsterEventType.PIPELINE_CANCELED: "RUN_CANCELED", } STEP_EVENTS = { DagsterEventType.STEP_INPUT, DagsterEventType.STEP_START, DagsterEventType.STEP_OUTPUT, DagsterEventType.STEP_FAILURE, DagsterEventType.STEP_SUCCESS, DagsterEventType.STEP_SKIPPED, DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.STEP_EXPECTATION_RESULT, DagsterEventType.ASSET_CHECK_EVALUATION, DagsterEventType.OBJECT_STORE_OPERATION, DagsterEventType.HANDLED_OUTPUT, DagsterEventType.LOADED_INPUT, DagsterEventType.STEP_RESTARTED, DagsterEventType.STEP_UP_FOR_RETRY, } FAILURE_EVENTS = { DagsterEventType.RUN_FAILURE, DagsterEventType.STEP_FAILURE, DagsterEventType.RUN_CANCELED, } PIPELINE_EVENTS = { DagsterEventType.RUN_ENQUEUED, DagsterEventType.RUN_DEQUEUED, DagsterEventType.RUN_STARTING, DagsterEventType.RUN_START, DagsterEventType.RUN_SUCCESS, DagsterEventType.RUN_FAILURE, DagsterEventType.RUN_CANCELING, DagsterEventType.RUN_CANCELED, } HOOK_EVENTS = { DagsterEventType.HOOK_COMPLETED, DagsterEventType.HOOK_ERRORED, DagsterEventType.HOOK_SKIPPED, } ALERT_EVENTS = { DagsterEventType.ALERT_START, DagsterEventType.ALERT_SUCCESS, DagsterEventType.ALERT_FAILURE, } MARKER_EVENTS = { DagsterEventType.ENGINE_EVENT, DagsterEventType.STEP_WORKER_STARTING, DagsterEventType.STEP_WORKER_STARTED, DagsterEventType.RESOURCE_INIT_STARTED, DagsterEventType.RESOURCE_INIT_SUCCESS, DagsterEventType.RESOURCE_INIT_FAILURE, } EVENT_TYPE_TO_PIPELINE_RUN_STATUS = { DagsterEventType.RUN_START: DagsterRunStatus.STARTED, DagsterEventType.RUN_SUCCESS: DagsterRunStatus.SUCCESS, DagsterEventType.RUN_FAILURE: DagsterRunStatus.FAILURE, DagsterEventType.RUN_ENQUEUED: DagsterRunStatus.QUEUED, DagsterEventType.RUN_STARTING: DagsterRunStatus.STARTING, DagsterEventType.RUN_CANCELING: DagsterRunStatus.CANCELING, DagsterEventType.RUN_CANCELED: DagsterRunStatus.CANCELED, } PIPELINE_RUN_STATUS_TO_EVENT_TYPE = {v: k for k, v in EVENT_TYPE_TO_PIPELINE_RUN_STATUS.items()} # These are the only events currently supported in `EventLogStorage.store_event_batch` BATCH_WRITABLE_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, } ASSET_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION_PLANNED, } ASSET_CHECK_EVENTS = { DagsterEventType.ASSET_CHECK_EVALUATION, DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED, } class RunFailureReasonSerializer(EnumSerializer): def unpack(self, value: str): try: RunFailureReason(value) except ValueError: return RunFailureReason.UNKNOWN return super().unpack(value) @whitelist_for_serdes(serializer=RunFailureReasonSerializer) class RunFailureReason(Enum): UNEXPECTED_TERMINATION = "UNEXPECTED_TERMINATION" RUN_EXCEPTION = "RUN_EXCEPTION" STEP_FAILURE = "STEP_FAILURE" JOB_INITIALIZATION_FAILURE = "JOB_INITIALIZATION_FAILURE" UNKNOWN = "UNKNOWN" def _assert_type( method: str, expected_type: Union[DagsterEventType, Sequence[DagsterEventType]], actual_type: DagsterEventType, ) -> None: _expected_type = ( [expected_type] if isinstance(expected_type, DagsterEventType) else expected_type ) check.invariant( actual_type in _expected_type, f"{method} only callable when event_type is" f" {','.join([t.value for t in _expected_type])}, called on {actual_type}", ) def _validate_event_specific_data( event_type: DagsterEventType, event_specific_data: Optional["EventSpecificData"] ) -> Optional["EventSpecificData"]: if event_type == DagsterEventType.STEP_OUTPUT: check.inst_param(event_specific_data, "event_specific_data", StepOutputData) elif event_type == DagsterEventType.STEP_FAILURE: check.inst_param(event_specific_data, "event_specific_data", StepFailureData) elif event_type == DagsterEventType.STEP_SUCCESS: check.inst_param(event_specific_data, "event_specific_data", StepSuccessData) elif event_type == DagsterEventType.ASSET_MATERIALIZATION: check.inst_param(event_specific_data, "event_specific_data", StepMaterializationData) elif event_type == DagsterEventType.STEP_EXPECTATION_RESULT: check.inst_param(event_specific_data, "event_specific_data", StepExpectationResultData) elif event_type == DagsterEventType.STEP_INPUT: check.inst_param(event_specific_data, "event_specific_data", StepInputData) elif event_type in ( DagsterEventType.ENGINE_EVENT, DagsterEventType.STEP_WORKER_STARTING, DagsterEventType.STEP_WORKER_STARTED, DagsterEventType.RESOURCE_INIT_STARTED, DagsterEventType.RESOURCE_INIT_SUCCESS, DagsterEventType.RESOURCE_INIT_FAILURE, ): check.inst_param(event_specific_data, "event_specific_data", EngineEventData) elif event_type == DagsterEventType.HOOK_ERRORED: check.inst_param(event_specific_data, "event_specific_data", HookErroredData) elif event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: check.inst_param( event_specific_data, "event_specific_data", AssetMaterializationPlannedData ) elif event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED: check.inst_param(event_specific_data, "event_specific_data", AssetCheckEvaluationPlanned) elif event_type == DagsterEventType.ASSET_CHECK_EVALUATION: check.inst_param(event_specific_data, "event_specific_data", AssetCheckEvaluation) return event_specific_data def generate_event_batch_id(): return str(uuid.uuid4()) def log_step_event( step_context: IStepContext, event: "DagsterEvent", batch_metadata: Optional["DagsterEventBatchMetadata"], ) -> None: event_type = DagsterEventType(event.event_type_value) log_level = logging.ERROR if event_type in FAILURE_EVENTS else logging.DEBUG step_context.log.log_dagster_event( level=log_level, msg=event.message or f"{event_type} for step {step_context.step.key}", dagster_event=event, batch_metadata=batch_metadata, ) def log_job_event(job_context: IPlanContext, event: "DagsterEvent") -> None: event_type = DagsterEventType(event.event_type_value) log_level = logging.ERROR if event_type in FAILURE_EVENTS else logging.DEBUG job_context.log.log_dagster_event( level=log_level, msg=event.message or f"{event_type} for pipeline {job_context.job_name}", dagster_event=event, ) def log_resource_event(log_manager: DagsterLogManager, event: "DagsterEvent") -> None: event_specific_data = cast(EngineEventData, event.event_specific_data) log_level = logging.ERROR if event_specific_data.error else logging.DEBUG log_manager.log_dagster_event(level=log_level, msg=event.message or "", dagster_event=event) class DagsterEventSerializer(NamedTupleSerializer["DagsterEvent"]): def before_unpack(self, context, unpacked_dict: Any) -> Dict[str, Any]: event_type_value, event_specific_data = _handle_back_compat( unpacked_dict["event_type_value"], unpacked_dict.get("event_specific_data") ) unpacked_dict["event_type_value"] = event_type_value unpacked_dict["event_specific_data"] = event_specific_data return unpacked_dict def handle_unpack_error( self, exc: Exception, context: UnpackContext, storage_dict: Dict[str, Any], ) -> "DagsterEvent": event_type_value, _ = _handle_back_compat( storage_dict["event_type_value"], storage_dict.get("event_specific_data") ) step_key = storage_dict.get("step_key") orig_message = storage_dict.get("message") new_message = ( f"Could not deserialize event of type {event_type_value}. This event may have been" " written by a newer version of Dagster." + (f' Original message: "{orig_message}"' if orig_message else "") ) return DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, job_name=storage_dict["pipeline_name"], message=new_message, step_key=step_key, event_specific_data=EngineEventData( error=serializable_error_info_from_exc_info(sys.exc_info()) ), ) class DagsterEventBatchMetadata(NamedTuple): id: str is_end: bool
[docs]@whitelist_for_serdes( serializer=DagsterEventSerializer, storage_field_names={ "node_handle": "solid_handle", "job_name": "pipeline_name", }, ) class DagsterEvent( NamedTuple( "_DagsterEvent", [ ("event_type_value", str), ("job_name", str), ("step_handle", Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]]), ("node_handle", Optional[NodeHandle]), ("step_kind_value", Optional[str]), ("logging_tags", Optional[Mapping[str, str]]), ("event_specific_data", Optional["EventSpecificData"]), ("message", Optional[str]), ("pid", Optional[int]), ("step_key", Optional[str]), ], ) ): """Events yielded by op and job execution. Users should not instantiate this class. Attributes: event_type_value (str): Value for a DagsterEventType. job_name (str) node_handle (NodeHandle) step_kind_value (str): Value for a StepKind. logging_tags (Dict[str, str]) event_specific_data (Any): Type must correspond to event_type_value. message (str) pid (int) step_key (Optional[str]): DEPRECATED """ @staticmethod def from_step( event_type: "DagsterEventType", step_context: IStepContext, event_specific_data: Optional["EventSpecificData"] = None, message: Optional[str] = None, batch_metadata: Optional["DagsterEventBatchMetadata"] = None, ) -> "DagsterEvent": event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, event_specific_data=_validate_event_specific_data(event_type, event_specific_data), message=check.opt_str_param(message, "message"), pid=os.getpid(), ) log_step_event(step_context, event, batch_metadata) return event @staticmethod def from_job( event_type: DagsterEventType, job_context: IPlanContext, message: Optional[str] = None, event_specific_data: Optional["EventSpecificData"] = None, step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, ) -> "DagsterEvent": check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ) event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, job_name=job_context.job_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data(event_type, event_specific_data), step_handle=step_handle, pid=os.getpid(), ) log_job_event(job_context, event) return event @staticmethod def from_resource( event_type: DagsterEventType, job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, message: Optional[str] = None, event_specific_data: Optional["EngineEventData"] = None, ) -> "DagsterEvent": event = DagsterEvent( event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, job_name=job_name, message=check.opt_str_param(message, "message"), event_specific_data=_validate_event_specific_data( DagsterEventType.ENGINE_EVENT, event_specific_data ), step_handle=execution_plan.step_handle_for_single_step_plans(), pid=os.getpid(), ) log_resource_event(log_manager, event) return event def __new__( cls, event_type_value: str, job_name: str, step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, node_handle: Optional[NodeHandle] = None, step_kind_value: Optional[str] = None, logging_tags: Optional[Mapping[str, str]] = None, event_specific_data: Optional["EventSpecificData"] = None, message: Optional[str] = None, pid: Optional[int] = None, # legacy step_key: Optional[str] = None, ): # old events may contain node_handle but not step_handle if node_handle is not None and step_handle is None: step_handle = StepHandle(node_handle) # Legacy events may have step_key set directly, preserve those to stay in sync # with legacy execution plan snapshots. if step_handle is not None and step_key is None: step_key = step_handle.to_key() return super(DagsterEvent, cls).__new__( cls, check.str_param(event_type_value, "event_type_value"), check.str_param(job_name, "job_name"), check.opt_inst_param( step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) ), check.opt_inst_param(node_handle, "node_handle", NodeHandle), check.opt_str_param(step_kind_value, "step_kind_value"), check.opt_mapping_param(logging_tags, "logging_tags"), _validate_event_specific_data(DagsterEventType(event_type_value), event_specific_data), check.opt_str_param(message, "message"), check.opt_int_param(pid, "pid"), check.opt_str_param(step_key, "step_key"), ) @property def node_name(self) -> str: check.invariant(self.node_handle is not None) node_handle = cast(NodeHandle, self.node_handle) return node_handle.name @public @property def event_type(self) -> DagsterEventType: """DagsterEventType: The type of this event.""" return DagsterEventType(self.event_type_value) @public @property def is_step_event(self) -> bool: """bool: If this event relates to a specific step.""" return self.event_type in STEP_EVENTS @public @property def is_hook_event(self) -> bool: """bool: If this event relates to the execution of a hook.""" return self.event_type in HOOK_EVENTS @property def is_alert_event(self) -> bool: return self.event_type in ALERT_EVENTS @property def step_kind(self) -> "StepKind": from dagster._core.execution.plan.step import StepKind return StepKind(self.step_kind_value) @public @property def is_step_success(self) -> bool: """bool: If this event is of type STEP_SUCCESS.""" return self.event_type == DagsterEventType.STEP_SUCCESS @public @property def is_successful_output(self) -> bool: """bool: If this event is of type STEP_OUTPUT.""" return self.event_type == DagsterEventType.STEP_OUTPUT @public @property def is_step_start(self) -> bool: """bool: If this event is of type STEP_START.""" return self.event_type == DagsterEventType.STEP_START @public @property def is_step_failure(self) -> bool: """bool: If this event is of type STEP_FAILURE.""" return self.event_type == DagsterEventType.STEP_FAILURE @public @property def is_resource_init_failure(self) -> bool: """bool: If this event is of type RESOURCE_INIT_FAILURE.""" return self.event_type == DagsterEventType.RESOURCE_INIT_FAILURE @public @property def is_step_skipped(self) -> bool: """bool: If this event is of type STEP_SKIPPED.""" return self.event_type == DagsterEventType.STEP_SKIPPED @public @property def is_step_up_for_retry(self) -> bool: """bool: If this event is of type STEP_UP_FOR_RETRY.""" return self.event_type == DagsterEventType.STEP_UP_FOR_RETRY @public @property def is_step_restarted(self) -> bool: """bool: If this event is of type STEP_RESTARTED.""" return self.event_type == DagsterEventType.STEP_RESTARTED @property def is_job_success(self) -> bool: return self.event_type == DagsterEventType.RUN_SUCCESS @property def is_job_failure(self) -> bool: return self.event_type == DagsterEventType.RUN_FAILURE @property def is_run_failure(self) -> bool: return self.event_type == DagsterEventType.RUN_FAILURE @public @property def is_failure(self) -> bool: """bool: If this event represents the failure of a run or step.""" return self.event_type in FAILURE_EVENTS @property def is_job_event(self) -> bool: return self.event_type in PIPELINE_EVENTS @public @property def is_engine_event(self) -> bool: """bool: If this event is of type ENGINE_EVENT.""" return self.event_type == DagsterEventType.ENGINE_EVENT @public @property def is_handled_output(self) -> bool: """bool: If this event is of type HANDLED_OUTPUT.""" return self.event_type == DagsterEventType.HANDLED_OUTPUT @public @property def is_loaded_input(self) -> bool: """bool: If this event is of type LOADED_INPUT.""" return self.event_type == DagsterEventType.LOADED_INPUT @public @property def is_step_materialization(self) -> bool: """bool: If this event is of type ASSET_MATERIALIZATION.""" return self.event_type == DagsterEventType.ASSET_MATERIALIZATION @public @property def is_expectation_result(self) -> bool: """bool: If this event is of type STEP_EXPECTATION_RESULT.""" return self.event_type == DagsterEventType.STEP_EXPECTATION_RESULT @public @property def is_asset_observation(self) -> bool: """bool: If this event is of type ASSET_OBSERVATION.""" return self.event_type == DagsterEventType.ASSET_OBSERVATION @public @property def is_asset_materialization_planned(self) -> bool: """bool: If this event is of type ASSET_MATERIALIZATION_PLANNED.""" return self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED @public @property def asset_key(self) -> Optional[AssetKey]: """Optional[AssetKey]: For events that correspond to a specific asset_key / partition (ASSET_MATERIALIZTION, ASSET_OBSERVATION, ASSET_MATERIALIZATION_PLANNED), returns that asset key. Otherwise, returns None. """ if self.event_type == DagsterEventType.ASSET_MATERIALIZATION: return self.step_materialization_data.materialization.asset_key elif self.event_type == DagsterEventType.ASSET_OBSERVATION: return self.asset_observation_data.asset_observation.asset_key elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.asset_key else: return None @public @property def partition(self) -> Optional[str]: """Optional[AssetKey]: For events that correspond to a specific asset_key / partition (ASSET_MATERIALIZTION, ASSET_OBSERVATION, ASSET_MATERIALIZATION_PLANNED), returns that partition. Otherwise, returns None. """ if self.event_type == DagsterEventType.ASSET_MATERIALIZATION: return self.step_materialization_data.materialization.partition elif self.event_type == DagsterEventType.ASSET_OBSERVATION: return self.asset_observation_data.asset_observation.partition elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.partition else: return None @property def partitions_subset(self) -> Optional[PartitionsSubset]: if self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.partitions_subset return None @property def step_input_data(self) -> "StepInputData": _assert_type("step_input_data", DagsterEventType.STEP_INPUT, self.event_type) return cast(StepInputData, self.event_specific_data) @property def step_output_data(self) -> StepOutputData: _assert_type("step_output_data", DagsterEventType.STEP_OUTPUT, self.event_type) return cast(StepOutputData, self.event_specific_data) @property def step_success_data(self) -> "StepSuccessData": _assert_type("step_success_data", DagsterEventType.STEP_SUCCESS, self.event_type) return cast(StepSuccessData, self.event_specific_data) @property def step_failure_data(self) -> "StepFailureData": _assert_type("step_failure_data", DagsterEventType.STEP_FAILURE, self.event_type) return cast(StepFailureData, self.event_specific_data) @property def step_retry_data(self) -> "StepRetryData": _assert_type("step_retry_data", DagsterEventType.STEP_UP_FOR_RETRY, self.event_type) return cast(StepRetryData, self.event_specific_data) @property def step_materialization_data(self) -> "StepMaterializationData": _assert_type( "step_materialization_data", DagsterEventType.ASSET_MATERIALIZATION, self.event_type ) return cast(StepMaterializationData, self.event_specific_data) @property def asset_observation_data(self) -> "AssetObservationData": _assert_type("asset_observation_data", DagsterEventType.ASSET_OBSERVATION, self.event_type) return cast(AssetObservationData, self.event_specific_data) @property def asset_materialization_planned_data(self) -> "AssetMaterializationPlannedData": _assert_type( "asset_materialization_planned", DagsterEventType.ASSET_MATERIALIZATION_PLANNED, self.event_type, ) return cast(AssetMaterializationPlannedData, self.event_specific_data) @property def asset_check_planned_data(self) -> "AssetCheckEvaluationPlanned": _assert_type( "asset_check_planned", DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED, self.event_type, ) return cast(AssetCheckEvaluationPlanned, self.event_specific_data) @property def step_expectation_result_data(self) -> "StepExpectationResultData": _assert_type( "step_expectation_result_data", DagsterEventType.STEP_EXPECTATION_RESULT, self.event_type, ) return cast(StepExpectationResultData, self.event_specific_data) @property def materialization(self) -> AssetMaterialization: _assert_type( "step_materialization_data", DagsterEventType.ASSET_MATERIALIZATION, self.event_type ) return cast(StepMaterializationData, self.event_specific_data).materialization @property def asset_check_evaluation_data(self) -> AssetCheckEvaluation: _assert_type( "asset_check_evaluation", DagsterEventType.ASSET_CHECK_EVALUATION, self.event_type ) return cast(AssetCheckEvaluation, self.event_specific_data) @property def job_failure_data(self) -> "JobFailureData": _assert_type("job_failure_data", DagsterEventType.RUN_FAILURE, self.event_type) return cast(JobFailureData, self.event_specific_data) @property def engine_event_data(self) -> "EngineEventData": _assert_type( "engine_event_data", [ DagsterEventType.ENGINE_EVENT, DagsterEventType.RESOURCE_INIT_STARTED, DagsterEventType.RESOURCE_INIT_SUCCESS, DagsterEventType.RESOURCE_INIT_FAILURE, DagsterEventType.STEP_WORKER_STARTED, DagsterEventType.STEP_WORKER_STARTING, ], self.event_type, ) return cast(EngineEventData, self.event_specific_data) @property def hook_completed_data(self) -> Optional["EventSpecificData"]: _assert_type("hook_completed_data", DagsterEventType.HOOK_COMPLETED, self.event_type) return self.event_specific_data @property def hook_errored_data(self) -> "HookErroredData": _assert_type("hook_errored_data", DagsterEventType.HOOK_ERRORED, self.event_type) return cast(HookErroredData, self.event_specific_data) @property def hook_skipped_data(self) -> Optional["EventSpecificData"]: _assert_type("hook_skipped_data", DagsterEventType.HOOK_SKIPPED, self.event_type) return self.event_specific_data @property def logs_captured_data(self) -> "ComputeLogsCaptureData": _assert_type("logs_captured_data", DagsterEventType.LOGS_CAPTURED, self.event_type) return cast(ComputeLogsCaptureData, self.event_specific_data) @staticmethod def step_output_event( step_context: StepExecutionContext, step_output_data: StepOutputData ) -> "DagsterEvent": output_def = step_context.op.output_def_named( step_output_data.step_output_handle.output_name ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_OUTPUT, step_context=step_context, event_specific_data=step_output_data, message=( 'Yielded output "{output_name}"{mapping_clause} of type' ' "{output_type}".{type_check_clause}'.format( output_name=step_output_data.step_output_handle.output_name, output_type=output_def.dagster_type.display_name, type_check_clause=( ( " Warning! Type check failed." if not step_output_data.type_check_data.success else " (Type check passed)." ) if step_output_data.type_check_data else " (No type check)." ), mapping_clause=( f' mapping key "{step_output_data.step_output_handle.mapping_key}"' if step_output_data.step_output_handle.mapping_key else "" ), ) ), ) @staticmethod def step_failure_event( step_context: IStepContext, step_failure_data: "StepFailureData", message=None, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_FAILURE, step_context=step_context, event_specific_data=step_failure_data, message=(message or f'Execution of step "{step_context.step.key}" failed.'), ) @staticmethod def step_retry_event( step_context: IStepContext, step_retry_data: "StepRetryData" ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_UP_FOR_RETRY, step_context=step_context, event_specific_data=step_retry_data, message=( 'Execution of step "{step_key}" failed and has requested a retry{wait_str}.'.format( step_key=step_context.step.key, wait_str=( f" in {step_retry_data.seconds_to_wait} seconds" if step_retry_data.seconds_to_wait else "" ), ) ), ) @staticmethod def step_input_event( step_context: StepExecutionContext, step_input_data: "StepInputData" ) -> "DagsterEvent": input_def = step_context.op_def.input_def_named(step_input_data.input_name) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_INPUT, step_context=step_context, event_specific_data=step_input_data, message='Got input "{input_name}" of type "{input_type}".{type_check_clause}'.format( input_name=step_input_data.input_name, input_type=input_def.dagster_type.display_name, type_check_clause=( ( " Warning! Type check failed." if not step_input_data.type_check_data.success else " (Type check passed)." ) if step_input_data.type_check_data else " (No type check)." ), ), ) @staticmethod def step_start_event(step_context: IStepContext) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_START, step_context=step_context, message=f'Started execution of step "{step_context.step.key}".', ) @staticmethod def step_restarted_event(step_context: IStepContext, previous_attempts: int) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_RESTARTED, step_context=step_context, message=f'Started re-execution (attempt # {previous_attempts + 1}) of step "{step_context.step.key}".', ) @staticmethod def step_success_event( step_context: IStepContext, success: "StepSuccessData" ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SUCCESS, step_context=step_context, event_specific_data=success, message=f'Finished execution of step "{step_context.step.key}" in {format_duration(success.duration_ms)}.', ) @staticmethod def step_skipped_event(step_context: IStepContext) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.STEP_SKIPPED, step_context=step_context, message=f'Skipped execution of step "{step_context.step.key}".', ) @staticmethod def asset_materialization( step_context: IStepContext, materialization: AssetMaterialization, batch_metadata: Optional[DagsterEventBatchMetadata] = None, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_MATERIALIZATION, step_context=step_context, event_specific_data=StepMaterializationData(materialization), message=( materialization.description if materialization.description else "Materialized value{label_clause}.".format( label_clause=f" {materialization.label}" if materialization.label else "" ) ), batch_metadata=batch_metadata, ) @staticmethod def asset_observation( step_context: IStepContext, observation: AssetObservation, batch_metadata: Optional[DagsterEventBatchMetadata] = None, ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_OBSERVATION, step_context=step_context, event_specific_data=AssetObservationData(observation), batch_metadata=batch_metadata, ) @staticmethod def asset_check_evaluation( step_context: IStepContext, asset_check_evaluation: AssetCheckEvaluation ) -> "DagsterEvent": return DagsterEvent.from_step( event_type=DagsterEventType.ASSET_CHECK_EVALUATION, step_context=step_context, event_specific_data=asset_check_evaluation, ) @staticmethod def step_expectation_result( step_context: IStepContext, expectation_result: ExpectationResult ) -> "DagsterEvent": def _msg(): if expectation_result.description: return expectation_result.description return "Expectation{label_clause} {result_verb}".format( label_clause=" " + expectation_result.label if expectation_result.label else "", result_verb="passed" if expectation_result.success else "failed", ) return DagsterEvent.from_step( event_type=DagsterEventType.STEP_EXPECTATION_RESULT, step_context=step_context, event_specific_data=StepExpectationResultData(expectation_result), message=_msg(), ) @staticmethod def step_concurrency_blocked( step_context: IStepContext, concurrency_key: str, initial=True ) -> "DagsterEvent": message = ( f"Step blocked by concurrency limit for key {concurrency_key}" if initial else f"Step still blocked by concurrency limit for key {concurrency_key}" ) return DagsterEvent.from_step( event_type=DagsterEventType.ENGINE_EVENT, step_context=step_context, message=message, event_specific_data=EngineEventData(metadata={"concurrency_key": concurrency_key}), ) @staticmethod def job_start(job_context: IPlanContext) -> "DagsterEvent": return DagsterEvent.from_job( DagsterEventType.RUN_START, job_context, message=f'Started execution of run for "{job_context.job_name}".', ) @staticmethod def job_success(job_context: IPlanContext) -> "DagsterEvent": return DagsterEvent.from_job( DagsterEventType.RUN_SUCCESS, job_context, message=f'Finished execution of run for "{job_context.job_name}".', ) @staticmethod def job_failure( job_context_or_name: Union[IPlanContext, str], context_msg: str, failure_reason: RunFailureReason, error_info: Optional[SerializableErrorInfo] = None, ) -> "DagsterEvent": check.str_param(context_msg, "context_msg") if isinstance(job_context_or_name, IPlanContext): return DagsterEvent.from_job( DagsterEventType.RUN_FAILURE, job_context_or_name, message=( f'Execution of run for "{job_context_or_name.job_name}" failed. {context_msg}' ), event_specific_data=JobFailureData(error_info, failure_reason=failure_reason), ) else: # when the failure happens trying to bring up context, the job_context hasn't been # built and so can't use from_pipeline check.str_param(job_context_or_name, "pipeline_name") event = DagsterEvent( event_type_value=DagsterEventType.RUN_FAILURE.value, job_name=job_context_or_name, event_specific_data=JobFailureData(error_info, failure_reason=failure_reason), message=f'Execution of run for "{job_context_or_name}" failed. {context_msg}', pid=os.getpid(), ) return event @staticmethod def job_canceled( job_context: IPlanContext, error_info: Optional[SerializableErrorInfo] = None ) -> "DagsterEvent": return DagsterEvent.from_job( DagsterEventType.RUN_CANCELED, job_context, message=f'Execution of run for "{job_context.job_name}" canceled.', event_specific_data=JobCanceledData( check.opt_inst_param(error_info, "error_info", SerializableErrorInfo) ), ) @staticmethod def step_worker_starting( step_context: IStepContext, message: str, metadata: Mapping[str, MetadataValue], ) -> "DagsterEvent": return DagsterEvent.from_step( DagsterEventType.STEP_WORKER_STARTING, step_context, message=message, event_specific_data=EngineEventData( metadata=metadata, marker_start="step_process_start" ), ) @staticmethod def step_worker_started( log_manager: DagsterLogManager, job_name: str, message: str, metadata: Mapping[str, MetadataValue], step_key: Optional[str], ) -> "DagsterEvent": event = DagsterEvent( DagsterEventType.STEP_WORKER_STARTED.value, job_name=job_name, message=message, event_specific_data=EngineEventData(metadata=metadata, marker_end="step_process_start"), pid=os.getpid(), step_key=step_key, ) log_manager.log_dagster_event( level=logging.DEBUG, msg=message, dagster_event=event, ) return event @staticmethod def resource_init_start( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_keys: AbstractSet[str], ) -> "DagsterEvent": return DagsterEvent.from_resource( DagsterEventType.RESOURCE_INIT_STARTED, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Starting initialization of resources [{}].".format( ", ".join(sorted(resource_keys)) ), event_specific_data=EngineEventData(metadata={}, marker_start="resources"), ) @staticmethod def resource_init_success( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_instances: Mapping[str, Any], resource_init_times: Mapping[str, str], ) -> "DagsterEvent": metadata = {} for key in resource_instances.keys(): metadata[key] = MetadataValue.python_artifact(resource_instances[key].__class__) metadata[f"{key}:init_time"] = resource_init_times[key] return DagsterEvent.from_resource( DagsterEventType.RESOURCE_INIT_SUCCESS, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Finished initialization of resources [{}].".format( ", ".join(sorted(resource_init_times.keys())) ), event_specific_data=EngineEventData( metadata=metadata, marker_end="resources", ), ) @staticmethod def resource_init_failure( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_keys: AbstractSet[str], error: SerializableErrorInfo, ) -> "DagsterEvent": return DagsterEvent.from_resource( DagsterEventType.RESOURCE_INIT_FAILURE, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Initialization of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata={}, marker_end="resources", error=error, ), ) @staticmethod def resource_teardown_failure( job_name: str, execution_plan: "ExecutionPlan", log_manager: DagsterLogManager, resource_keys: AbstractSet[str], error: SerializableErrorInfo, ) -> "DagsterEvent": return DagsterEvent.from_resource( DagsterEventType.ENGINE_EVENT, job_name=job_name, execution_plan=execution_plan, log_manager=log_manager, message="Teardown of resources [{}] failed.".format(", ".join(resource_keys)), event_specific_data=EngineEventData( metadata={}, marker_start=None, marker_end=None, error=error, ), ) @staticmethod def engine_event( plan_context: IPlanContext, message: str, event_specific_data: Optional["EngineEventData"] = None, ) -> "DagsterEvent": if isinstance(plan_context, IStepContext): return DagsterEvent.from_step( DagsterEventType.ENGINE_EVENT, step_context=plan_context, event_specific_data=event_specific_data, message=message, ) else: return DagsterEvent.from_job( DagsterEventType.ENGINE_EVENT, plan_context, message, event_specific_data=event_specific_data, ) @staticmethod def object_store_operation( step_context: IStepContext, object_store_operation_result: "ObjectStoreOperation" ) -> "DagsterEvent": object_store_name = ( f"{object_store_operation_result.object_store_name} " if object_store_operation_result.object_store_name else "" ) serialization_strategy_modifier = ( f" using {object_store_operation_result.serialization_strategy_name}" if object_store_operation_result.serialization_strategy_name else "" ) value_name = object_store_operation_result.value_name if ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.SET_OBJECT ): message = ( f"Stored intermediate object for output {value_name} in " f"{object_store_name}object store{serialization_strategy_modifier}." ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.GET_OBJECT ): message = ( f"Retrieved intermediate object for input {value_name} in " f"{object_store_name}object store{serialization_strategy_modifier}." ) elif ( ObjectStoreOperationType(object_store_operation_result.op) == ObjectStoreOperationType.CP_OBJECT ): message = f"Copied intermediate object for input {value_name} from {object_store_operation_result.key} to {object_store_operation_result.dest_key}" else: message = "" return DagsterEvent.from_step( DagsterEventType.OBJECT_STORE_OPERATION, step_context, event_specific_data=ObjectStoreOperationResultData( op=object_store_operation_result.op, value_name=value_name, address=object_store_operation_result.key, metadata={"key": MetadataValue.path(object_store_operation_result.key)}, version=object_store_operation_result.version, mapping_key=object_store_operation_result.mapping_key, ), message=message, ) @staticmethod def handled_output( step_context: IStepContext, output_name: str, manager_key: str, message_override: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, ) -> "DagsterEvent": message = f'Handled output "{output_name}" using IO manager "{manager_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.HANDLED_OUTPUT, step_context=step_context, event_specific_data=HandledOutputData( output_name=output_name, manager_key=manager_key, metadata=metadata if metadata else {}, ), message=message_override or message, ) @staticmethod def loaded_input( step_context: IStepContext, input_name: str, manager_key: str, upstream_output_name: Optional[str] = None, upstream_step_key: Optional[str] = None, message_override: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, ) -> "DagsterEvent": message = f'Loaded input "{input_name}" using input manager "{manager_key}"' if upstream_output_name: message += f', from output "{upstream_output_name}" of step "{upstream_step_key}"' return DagsterEvent.from_step( event_type=DagsterEventType.LOADED_INPUT, step_context=step_context, event_specific_data=LoadedInputData( input_name=input_name, manager_key=manager_key, upstream_output_name=upstream_output_name, upstream_step_key=upstream_step_key, metadata=metadata if metadata else {}, ), message=message_override or message, ) @staticmethod def hook_completed( step_context: StepExecutionContext, hook_def: HookDefinition ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_COMPLETED event = DagsterEvent( event_type_value=event_type.value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, message=( f'Finished the execution of hook "{hook_def.name}" triggered for' f' "{step_context.op.name}".' ), ) step_context.log.log_dagster_event( level=logging.DEBUG, msg=event.message or "", dagster_event=event ) return event @staticmethod def hook_errored( step_context: StepExecutionContext, error: HookExecutionError ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_ERRORED event = DagsterEvent( event_type_value=event_type.value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, event_specific_data=_validate_event_specific_data( event_type, HookErroredData( error=serializable_error_info_from_exc_info(error.original_exc_info) ), ), ) step_context.log.log_dagster_event(level=logging.ERROR, msg=str(error), dagster_event=event) return event @staticmethod def hook_skipped( step_context: StepExecutionContext, hook_def: HookDefinition ) -> "DagsterEvent": event_type = DagsterEventType.HOOK_SKIPPED event = DagsterEvent( event_type_value=event_type.value, job_name=step_context.job_name, step_handle=step_context.step.handle, node_handle=step_context.step.node_handle, step_kind_value=step_context.step.kind.value, logging_tags=step_context.event_tags, message=( f'Skipped the execution of hook "{hook_def.name}". It did not meet its triggering ' f'condition during the execution of "{step_context.op.name}".' ), ) step_context.log.log_dagster_event( level=logging.DEBUG, msg=event.message or "", dagster_event=event ) return event @staticmethod def legacy_compute_log_step_event(step_context: StepExecutionContext): step_key = step_context.step.key return DagsterEvent.from_step( DagsterEventType.LOGS_CAPTURED, step_context, message=f"Started capturing logs for step: {step_key}.", event_specific_data=ComputeLogsCaptureData( step_keys=[step_key], file_key=step_key, ), ) @staticmethod def capture_logs( job_context: IPlanContext, step_keys: Sequence[str], log_key: Sequence[str], log_context: CapturedLogContext, ): file_key = log_key[-1] return DagsterEvent.from_job( DagsterEventType.LOGS_CAPTURED, job_context, message=f"Started capturing logs in process (pid: {os.getpid()}).", event_specific_data=ComputeLogsCaptureData( step_keys=step_keys, file_key=file_key, external_stdout_url=log_context.external_stdout_url, external_stderr_url=log_context.external_stderr_url, external_url=log_context.external_url, ), ) @staticmethod def build_asset_materialization_planned_event( job_name: str, step_key: str, asset_materialization_planned_data: "AssetMaterializationPlannedData", ) -> "DagsterEvent": """Constructs an asset materialization planned event, to be logged by the caller.""" event = DagsterEvent( event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, job_name=job_name, message=( f"{job_name} intends to materialize asset {asset_materialization_planned_data.asset_key.to_string()}" ), event_specific_data=asset_materialization_planned_data, step_key=step_key, ) return event
def get_step_output_event( events: Sequence[DagsterEvent], step_key: str, output_name: Optional[str] = "result" ) -> Optional["DagsterEvent"]: check.sequence_param(events, "events", of_type=DagsterEvent) check.str_param(step_key, "step_key") check.str_param(output_name, "output_name") for event in events: if ( event.event_type == DagsterEventType.STEP_OUTPUT and event.step_key == step_key and event.step_output_data.output_name == output_name ): return event return None @whitelist_for_serdes class AssetObservationData( NamedTuple("_AssetObservation", [("asset_observation", AssetObservation)]) ): def __new__(cls, asset_observation: AssetObservation): return super(AssetObservationData, cls).__new__( cls, asset_observation=check.inst_param( asset_observation, "asset_observation", AssetObservation ), ) @whitelist_for_serdes class StepMaterializationData( NamedTuple( "_StepMaterializationData", [ ("materialization", AssetMaterialization), ("asset_lineage", Sequence[AssetLineageInfo]), ], ) ): def __new__( cls, materialization: AssetMaterialization, asset_lineage: Optional[Sequence[AssetLineageInfo]] = None, ): return super(StepMaterializationData, cls).__new__( cls, materialization=check.inst_param( materialization, "materialization", AssetMaterialization ), asset_lineage=check.opt_sequence_param( asset_lineage, "asset_lineage", of_type=AssetLineageInfo ), ) @whitelist_for_serdes class AssetMaterializationPlannedData( NamedTuple( "_AssetMaterializationPlannedData", [ ("asset_key", AssetKey), ("partition", Optional[str]), ("partitions_subset", Optional["PartitionsSubset"]), ], ) ): def __new__( cls, asset_key: AssetKey, partition: Optional[str] = None, partitions_subset: Optional["PartitionsSubset"] = None, ): if partitions_subset and partition: raise DagsterInvariantViolationError( "Cannot provide both partition and partitions_subset" ) if partitions_subset: check.opt_inst_param(partitions_subset, "partitions_subset", PartitionsSubset) check.invariant( is_whitelisted_for_serdes_object(partitions_subset), "partitions_subset must be serializable", ) return super(AssetMaterializationPlannedData, cls).__new__( cls, asset_key=check.inst_param(asset_key, "asset_key", AssetKey), partition=check.opt_str_param(partition, "partition"), partitions_subset=partitions_subset, ) @whitelist_for_serdes class StepExpectationResultData( NamedTuple( "_StepExpectationResultData", [ ("expectation_result", ExpectationResult), ], ) ): def __new__(cls, expectation_result: ExpectationResult): return super(StepExpectationResultData, cls).__new__( cls, expectation_result=check.inst_param( expectation_result, "expectation_result", ExpectationResult ), ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class ObjectStoreOperationResultData( NamedTuple( "_ObjectStoreOperationResultData", [ ("op", ObjectStoreOperationType), ("value_name", Optional[str]), ("metadata", Mapping[str, MetadataValue]), ("address", Optional[str]), ("version", Optional[str]), ("mapping_key", Optional[str]), ], ) ): def __new__( cls, op: ObjectStoreOperationType, value_name: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, address: Optional[str] = None, version: Optional[str] = None, mapping_key: Optional[str] = None, ): return super(ObjectStoreOperationResultData, cls).__new__( cls, op=cast(ObjectStoreOperationType, check.str_param(op, "op")), value_name=check.opt_str_param(value_name, "value_name"), metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), address=check.opt_str_param(address, "address"), version=check.opt_str_param(version, "version"), mapping_key=check.opt_str_param(mapping_key, "mapping_key"), ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class EngineEventData( NamedTuple( "_EngineEventData", [ ("metadata", Mapping[str, MetadataValue]), ("error", Optional[SerializableErrorInfo]), ("marker_start", Optional[str]), ("marker_end", Optional[str]), ], ) ): # serdes log # * added optional error # * added marker_start / marker_end # def __new__( cls, metadata: Optional[Mapping[str, RawMetadataValue]] = None, error: Optional[SerializableErrorInfo] = None, marker_start: Optional[str] = None, marker_end: Optional[str] = None, ): return super(EngineEventData, cls).__new__( cls, metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), error=check.opt_inst_param(error, "error", SerializableErrorInfo), marker_start=check.opt_str_param(marker_start, "marker_start"), marker_end=check.opt_str_param(marker_end, "marker_end"), ) @staticmethod def in_process( pid: int, step_keys_to_execute: Optional[Sequence[str]] = None ) -> "EngineEventData": return EngineEventData( metadata={ "pid": MetadataValue.text(str(pid)), **( {"step_keys": MetadataValue.text(str(step_keys_to_execute))} if step_keys_to_execute else {} ), } ) @staticmethod def multiprocess( pid: int, step_keys_to_execute: Optional[Sequence[str]] = None ) -> "EngineEventData": return EngineEventData( metadata={ "pid": MetadataValue.text(str(pid)), **( {"step_keys": MetadataValue.text(str(step_keys_to_execute))} if step_keys_to_execute else {} ), } ) @staticmethod def interrupted(steps_interrupted: Sequence[str]) -> "EngineEventData": return EngineEventData( metadata={"steps_interrupted": MetadataValue.text(str(steps_interrupted))} ) @staticmethod def engine_error(error: SerializableErrorInfo) -> "EngineEventData": return EngineEventData(metadata={}, error=error) @whitelist_for_serdes(storage_name="PipelineFailureData") class JobFailureData( NamedTuple( "_JobFailureData", [ ("error", Optional[SerializableErrorInfo]), ("failure_reason", Optional[RunFailureReason]), ], ) ): def __new__( cls, error: Optional[SerializableErrorInfo], failure_reason: Optional[RunFailureReason] = None, ): return super(JobFailureData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo), failure_reason=check.opt_inst_param(failure_reason, "failure_reason", RunFailureReason), ) @whitelist_for_serdes(storage_name="PipelineCanceledData") class JobCanceledData( NamedTuple( "_JobCanceledData", [ ("error", Optional[SerializableErrorInfo]), ], ) ): def __new__(cls, error: Optional[SerializableErrorInfo]): return super(JobCanceledData, cls).__new__( cls, error=check.opt_inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes class HookErroredData( NamedTuple( "_HookErroredData", [ ("error", SerializableErrorInfo), ], ) ): def __new__(cls, error: SerializableErrorInfo): return super(HookErroredData, cls).__new__( cls, error=check.inst_param(error, "error", SerializableErrorInfo) ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class HandledOutputData( NamedTuple( "_HandledOutputData", [ ("output_name", str), ("manager_key", str), ("metadata", Mapping[str, MetadataValue]), ], ) ): def __new__( cls, output_name: str, manager_key: str, metadata: Optional[Mapping[str, MetadataValue]] = None, ): return super(HandledOutputData, cls).__new__( cls, output_name=check.str_param(output_name, "output_name"), manager_key=check.str_param(manager_key, "manager_key"), metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), ) @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, ) class LoadedInputData( NamedTuple( "_LoadedInputData", [ ("input_name", str), ("manager_key", str), ("upstream_output_name", Optional[str]), ("upstream_step_key", Optional[str]), ("metadata", Mapping[str, MetadataValue]), ], ) ): def __new__( cls, input_name: str, manager_key: str, upstream_output_name: Optional[str] = None, upstream_step_key: Optional[str] = None, metadata: Optional[Mapping[str, MetadataValue]] = None, ): return super(LoadedInputData, cls).__new__( cls, input_name=check.str_param(input_name, "input_name"), manager_key=check.str_param(manager_key, "manager_key"), upstream_output_name=check.opt_str_param(upstream_output_name, "upstream_output_name"), upstream_step_key=check.opt_str_param(upstream_step_key, "upstream_step_key"), metadata=normalize_metadata( check.opt_mapping_param(metadata, "metadata", key_type=str) ), ) @whitelist_for_serdes(storage_field_names={"file_key": "log_key"}) class ComputeLogsCaptureData( NamedTuple( "_ComputeLogsCaptureData", [ ("file_key", str), # renamed log_key => file_key to avoid confusion ("step_keys", Sequence[str]), ("external_url", Optional[str]), ("external_stdout_url", Optional[str]), ("external_stderr_url", Optional[str]), ], ) ): def __new__( cls, file_key: str, step_keys: Sequence[str], external_url: Optional[str] = None, external_stdout_url: Optional[str] = None, external_stderr_url: Optional[str] = None, ): return super(ComputeLogsCaptureData, cls).__new__( cls, file_key=check.str_param(file_key, "file_key"), step_keys=check.opt_list_param(step_keys, "step_keys", of_type=str), external_url=check.opt_str_param(external_url, "external_url"), external_stdout_url=check.opt_str_param(external_stdout_url, "external_stdout_url"), external_stderr_url=check.opt_str_param(external_stderr_url, "external_stderr_url"), ) ################################################################################################### # THE GRAVEYARD # # -|- -|- -|- # | | | # _-'~~~~~`-_ . _-'~~~~~`-_ _-'~~~~~`-_ # .' '. .' '. .' '. # | R I P | | R I P | | R I P | # | | | | | | # | Synthetic | | Asset | | Pipeline | # | Process | | Store | | Init | # | Events | | Operations | | Failures | # | | | | | | ################################################################################################### # Old data structures referenced below # class AssetStoreOperationData(NamedTuple): # op: str # step_key: str # output_name: str # asset_store_key: str # # # class AssetStoreOperationType(Enum): # SET_ASSET = "SET_ASSET" # GET_ASSET = "GET_ASSET" # # # class PipelineInitFailureData(NamedTuple): # error: SerializableErrorInfo def _handle_back_compat( event_type_value: str, event_specific_data: Optional[Dict[str, Any]], ) -> Tuple[str, Optional[Dict[str, Any]]]: # transform old specific process events in to engine events if event_type_value in [ "PIPELINE_PROCESS_START", "PIPELINE_PROCESS_STARTED", "PIPELINE_PROCESS_EXITED", ]: return "ENGINE_EVENT", {"__class__": "EngineEventData"} # changes asset store ops in to get/set asset elif event_type_value == "ASSET_STORE_OPERATION": assert ( event_specific_data is not None ), "ASSET_STORE_OPERATION event must have specific data" if event_specific_data["op"] in ( "GET_ASSET", '{"__enum__": "AssetStoreOperationType.GET_ASSET"}', ): return ( "LOADED_INPUT", { "__class__": "LoadedInputData", "input_name": event_specific_data["output_name"], "manager_key": event_specific_data["asset_store_key"], }, ) if event_specific_data["op"] in ( "SET_ASSET", '{"__enum__": "AssetStoreOperationType.SET_ASSET"}', ): return ( "HANDLED_OUTPUT", { "__class__": "HandledOutputData", "output_name": event_specific_data["output_name"], "manager_key": event_specific_data["asset_store_key"], }, ) # previous name for ASSET_MATERIALIZATION was STEP_MATERIALIZATION if event_type_value == "STEP_MATERIALIZATION": assert event_specific_data is not None, "STEP_MATERIALIZATION event must have specific data" return "ASSET_MATERIALIZATION", event_specific_data # transform PIPELINE_INIT_FAILURE to PIPELINE_FAILURE if event_type_value == "PIPELINE_INIT_FAILURE": assert ( event_specific_data is not None ), "PIPELINE_INIT_FAILURE event must have specific data" return "PIPELINE_FAILURE", { "__class__": "PipelineFailureData", "error": event_specific_data.get("error"), } return event_type_value, event_specific_data