Ask AI

Source code for dagster._core.execution.execute_in_process_result

from typing import Any, Mapping, Optional, Sequence

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions import JobDefinition, NodeHandle
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.utils import DEFAULT_OUTPUT
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.events import DagsterEvent
from dagster._core.execution.execution_result import ExecutionResult
from dagster._core.execution.plan.outputs import StepOutputHandle
from dagster._core.storage.dagster_run import DagsterRun


[docs] class ExecuteInProcessResult(ExecutionResult): """Result object returned by in-process testing APIs. Users should not instantiate this object directly. Used for retrieving run success, events, and outputs from execution methods that return this object. This object is returned by: - :py:meth:`dagster.GraphDefinition.execute_in_process` - :py:meth:`dagster.JobDefinition.execute_in_process` - :py:meth:`dagster.materialize_to_memory` - :py:meth:`dagster.materialize` """ _handle: NodeHandle _event_list: Sequence[DagsterEvent] _dagster_run: DagsterRun _output_capture: Mapping[StepOutputHandle, Any] _job_def: JobDefinition def __init__( self, event_list: Sequence[DagsterEvent], dagster_run: DagsterRun, output_capture: Optional[Mapping[StepOutputHandle, Any]], job_def: JobDefinition, ): self._job_def = job_def self._event_list = event_list self._dagster_run = dagster_run self._output_capture = check.opt_mapping_param( output_capture, "output_capture", key_type=StepOutputHandle ) @public @property def job_def(self) -> JobDefinition: """JobDefinition: The job definition that was executed.""" return self._job_def @public @property def dagster_run(self) -> DagsterRun: """DagsterRun: The Dagster run that was executed.""" return self._dagster_run @public @property def all_events(self) -> Sequence[DagsterEvent]: """List[DagsterEvent]: All dagster events emitted during execution.""" return self._event_list @public @property def run_id(self) -> str: """str: The run ID of the executed :py:class:`DagsterRun`.""" return self.dagster_run.run_id def _get_output_for_handle(self, handle: NodeHandle, output_name: str) -> Any: mapped_outputs = {} step_key = str(handle) output_found = False for step_output_handle, value in self._output_capture.items(): # For the mapped output case, where step keys are in the format # "step_key[upstream_mapped_output_name]" within the step output handle. if ( step_output_handle.step_key.startswith(f"{step_key}[") and step_output_handle.output_name == output_name ): output_found = True key_start = step_output_handle.step_key.find("[") key_end = step_output_handle.step_key.find("]") upstream_mapped_output_name = step_output_handle.step_key[key_start + 1 : key_end] mapped_outputs[upstream_mapped_output_name] = value # For all other cases, search for exact match. elif ( step_key == step_output_handle.step_key and step_output_handle.output_name == output_name ): output_found = True if not step_output_handle.mapping_key: return self._output_capture[step_output_handle] mapped_outputs[step_output_handle.mapping_key] = value if not output_found: raise DagsterInvariantViolationError( f"No outputs found for output '{output_name}' from node '{handle}'." ) return mapped_outputs
[docs] @public def output_for_node(self, node_str: str, output_name: str = DEFAULT_OUTPUT) -> Any: """Retrieves output value with a particular name from the in-process run of the job. Args: node_str (str): Name of the op/graph whose output should be retrieved. If the intended graph/op is nested within another graph, the syntax is `outer_graph.inner_node`. output_name (Optional[str]): Name of the output on the op/graph to retrieve. Defaults to `result`, the default output name in dagster. Returns: Any: The value of the retrieved output. """ return super(ExecuteInProcessResult, self).output_for_node( node_str, output_name=output_name )
[docs] @public def asset_value(self, asset_key: CoercibleToAssetKey) -> Any: """Retrieves the value of an asset that was materialized during the execution of the job. Args: asset_key (CoercibleToAssetKey): The key of the asset to retrieve. Returns: Any: The value of the retrieved asset. """ node_output_handle = self._job_def.asset_layer.node_output_handle_for_asset( AssetKey.from_coercible(asset_key) ) return self.output_for_node( node_str=str(node_output_handle.node_handle), output_name=node_output_handle.output_name )
[docs] @public def output_value(self, output_name: str = DEFAULT_OUTPUT) -> Any: """Retrieves output of top-level job, if an output is returned. Args: output_name (Optional[str]): The name of the output to retrieve. Defaults to `result`, the default output name in dagster. Returns: Any: The value of the retrieved output. """ return super(ExecuteInProcessResult, self).output_value(output_name=output_name)