import sys
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from functools import cached_property
from queue import Queue
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
Literal,
Mapping,
Optional,
Sequence,
TypedDict,
Union,
cast,
)
from dagster_pipes import (
DAGSTER_PIPES_CONTEXT_ENV_VAR,
DAGSTER_PIPES_MESSAGES_ENV_VAR,
PIPES_METADATA_TYPE_INFER,
Method,
PipesContextData,
PipesDataProvenance,
PipesException,
PipesExtras,
PipesMessage,
PipesMetadataType,
PipesMetadataValue,
PipesOpenedData,
PipesParams,
PipesTimeWindow,
_env_var_to_cli_argument,
encode_param,
)
from typing_extensions import TypeAlias
import dagster._check as check
from dagster import DagsterEvent
from dagster._annotations import public
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_version import DataProvenance, DataVersion
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.result import MaterializeResult
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
has_one_dimension_time_window_partitioning,
)
from dagster._core.errors import DagsterInvariantViolationError, DagsterPipesExecutionError
from dagster._core.events import EngineEventData
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.execution.context.invocation import BaseDirectExecutionContext
from dagster._core.execution.context.op_execution_context import OpExecutionContext
from dagster._utils.error import (
ExceptionInfo,
SerializableErrorInfo,
serializable_error_info_from_exc_info,
)
if TYPE_CHECKING:
from dagster._core.pipes.client import PipesMessageReader
PipesExecutionResult: TypeAlias = Union[MaterializeResult, AssetCheckResult]
class PipesLaunchedData(TypedDict):
"""Payload generated in the orchestration process after external process startup
containing arbitrary information about the external process.
"""
extras: Mapping[str, Any]
[docs]
class PipesMessageHandler:
"""Class to process :py:obj:`PipesMessage` objects received from a pipes process.
Args:
context (Union[OpExecutionContext, AssetExecutionContext]): The context for the executing op/asset.
message_reader (PipesMessageReader): The message reader used to read messages from the
external process.
"""
# In the future it may make sense to merge PipesMessageReader and PipesMessageHandler, or
# otherwise adjust their relationship. The current interaction between the two is a bit awkward,
# but it would also be awkward to have a monolith that users extend.
def __init__(
self,
context: Union[OpExecutionContext, AssetExecutionContext],
message_reader: "PipesMessageReader",
) -> None:
self._context = context
self._message_reader = message_reader
# Queue is thread-safe
self._result_queue: Queue[PipesExecutionResult] = Queue()
self._extra_msg_queue: Queue[Any] = Queue()
# Only read by the main thread after all messages are handled, so no need for a lock
self._received_opened_msg = False
self._messages_include_stdio_logs = False
self._received_closed_msg = False
self._opened_payload: Optional[PipesOpenedData] = None
@contextmanager
def handle_messages(self) -> Iterator[PipesParams]:
with self._message_reader.read_messages(self) as params:
yield params
def get_reported_results(self) -> Sequence[PipesExecutionResult]:
return tuple(self._result_queue.queue)
def get_custom_messages(self) -> Sequence[Any]:
return tuple(self._extra_msg_queue.queue)
@property
def received_opened_message(self) -> bool:
return self._received_opened_msg
@property
def received_closed_message(self) -> bool:
return self._received_closed_msg
def _resolve_metadata(
self, metadata: Mapping[str, PipesMetadataValue]
) -> Mapping[str, MetadataValue]:
return {
k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items()
}
@staticmethod
def _resolve_metadata_value(value: Any, metadata_type: PipesMetadataType) -> MetadataValue:
if metadata_type == PIPES_METADATA_TYPE_INFER:
return normalize_metadata_value(value)
elif metadata_type == "text":
return MetadataValue.text(value)
elif metadata_type == "url":
return MetadataValue.url(value)
elif metadata_type == "path":
return MetadataValue.path(value)
elif metadata_type == "notebook":
return MetadataValue.notebook(value)
elif metadata_type == "json":
return MetadataValue.json(value)
elif metadata_type == "md":
return MetadataValue.md(value)
elif metadata_type == "float":
return MetadataValue.float(value)
elif metadata_type == "int":
return MetadataValue.int(value)
elif metadata_type == "bool":
return MetadataValue.bool(value)
elif metadata_type == "dagster_run":
return MetadataValue.dagster_run(value)
elif metadata_type == "asset":
return MetadataValue.asset(AssetKey.from_user_string(value))
elif metadata_type == "table":
return MetadataValue.table(value)
elif metadata_type == "null":
return MetadataValue.null()
else:
check.failed(f"Unexpected metadata type {metadata_type}")
# Type ignores because we currently validate in individual handlers
def handle_message(self, message: PipesMessage) -> None:
if self._received_closed_msg:
self._context.log.warning(
f"[pipes] unexpected message received after closed: `{message}`"
)
method = cast(Method, message["method"])
if method == "opened":
self._handle_opened(message["params"]) # type: ignore
elif method == "closed":
self._handle_closed(message["params"])
elif method == "report_asset_materialization":
self._handle_report_asset_materialization(**message["params"]) # type: ignore
elif method == "report_asset_check":
self._handle_report_asset_check(**message["params"]) # type: ignore
elif method == "log":
self._handle_log(**message["params"]) # type: ignore
elif method == "log_external_stream":
self._handle_log_external_stream(**message["params"]) # type: ignore
elif method == "report_custom_message":
self._handle_extra_message(**message["params"]) # type: ignore
else:
raise DagsterPipesExecutionError(f"Unknown message method: {message['method']}")
def _handle_opened(self, opened_payload: PipesOpenedData) -> None:
self._received_opened_msg = True
self._context.log.info("[pipes] external process successfully opened dagster pipes.")
self._message_reader.on_opened(opened_payload)
def _handle_closed(self, params: Optional[Mapping[str, Any]]) -> None:
self._received_closed_msg = True
if params and "exception" in params:
err_info = _ser_err_from_pipes_exc(params["exception"])
# report as an engine event to provide structured exception data
DagsterEvent.engine_event(
self._context.get_step_execution_context(),
"[pipes] external process pipes closed with exception",
EngineEventData(error=err_info),
)
def _handle_report_asset_materialization(
self,
asset_key: str,
metadata: Optional[Mapping[str, PipesMetadataValue]],
data_version: Optional[str],
) -> None:
check.str_param(asset_key, "asset_key")
check.opt_str_param(data_version, "data_version")
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
resolved_asset_key = AssetKey.from_escaped_user_string(asset_key)
resolved_metadata = self._resolve_metadata(metadata)
resolved_data_version = None if data_version is None else DataVersion(data_version)
result = MaterializeResult(
asset_key=resolved_asset_key,
metadata=resolved_metadata,
data_version=resolved_data_version,
)
self._result_queue.put(result)
def _handle_report_asset_check(
self,
asset_key: str,
check_name: str,
passed: bool,
severity: str,
metadata: Mapping[str, PipesMetadataValue],
) -> None:
check.str_param(asset_key, "asset_key")
check.str_param(check_name, "check_name")
check.bool_param(passed, "passed")
check.literal_param(severity, "severity", [x.value for x in AssetCheckSeverity])
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
resolved_asset_key = AssetKey.from_escaped_user_string(asset_key)
resolved_metadata = self._resolve_metadata(metadata)
resolved_severity = AssetCheckSeverity(severity)
result = AssetCheckResult(
asset_key=resolved_asset_key,
check_name=check_name,
passed=passed,
severity=resolved_severity,
metadata=resolved_metadata,
)
self._result_queue.put(result)
def _handle_log(self, message: str, level: str = "info") -> None:
check.str_param(message, "message")
self._context.log.log(level, message)
def _handle_log_external_stream(
self, stream: Literal["stdout", "stderr"], text: str, extras: Optional[PipesExtras] = None
):
if stream == "stdout":
sys.stdout.write(text)
elif stream == "stderr":
sys.stderr.write(text)
else:
raise DagsterInvariantViolationError(f"Unexpected stream: {stream}")
def _handle_extra_message(self, payload: Any):
self._extra_msg_queue.put(payload)
def report_pipes_framework_exception(self, origin: str, exc_info: ExceptionInfo):
# use an engine event to provide structured exception, this gives us an event with
# * the context of where the exception happened ([pipes]...)
# * the exception class, message, and stack trace as well as any chained exception
DagsterEvent.engine_event(
self._context.get_step_execution_context(),
f"[pipes] framework exception occurred in {origin}",
EngineEventData(
error=serializable_error_info_from_exc_info(exc_info),
),
)
def on_launched(self, launched_payload: PipesLaunchedData) -> None:
"""Hook that is called if `PipesSession.report_launched()` is called."""
self._context.log.info("[pipes] additional launch_payload reported")
self._message_reader.on_launched(launched_payload)
[docs]
@dataclass
class PipesSession:
"""Object representing a pipes session.
A pipes session is defined by a pair of :py:class:`PipesContextInjector` and
:py:class:`PipesMessageReader` objects. At the opening of the session, the context injector
writes context data to an externally accessible location, and the message reader starts
monitoring an externally accessible location. These locations are encoded in parameters stored
on a `PipesSession` object.
During the session, an external process should be started and the parameters injected into its
environment. The typical way to do this is to call :py:meth:`PipesSession.get_bootstrap_env_vars`
and pass the result as environment variables.
During execution, results (e.g. asset materializations) are reported by the external process and
buffered on the `PipesSession` object. The buffer can periodically be cleared and yielded to
Dagster machinery by calling `yield from PipesSession.get_results()`.
When the external process exits, the session can be closed. Closing consists of handling any
unprocessed messages written by the external process and cleaning up any resources used for
context injection and message reading.
Args:
context_data (PipesContextData): The context for the executing op/asset.
message_handler (PipesMessageHandler): The message handler to use for processing messages
context_injector_params (PipesParams): Parameters yielded by the context injector,
indicating the location from which the external process should load context data.
message_reader_params (PipesParams): Parameters yielded by the message reader, indicating
the location to which the external process should write messages.
created_at (datetime): The time at which the session was created. Useful as cutoff for
reading logs.
"""
context_data: PipesContextData
message_handler: PipesMessageHandler
context_injector_params: PipesParams
message_reader_params: PipesParams
context: Union[OpExecutionContext, AssetExecutionContext]
created_at: datetime = field(default_factory=datetime.now)
@cached_property
def default_remote_invocation_info(self) -> Dict[str, str]:
return {**self.context.dagster_run.dagster_execution_info}
[docs]
@public
def get_bootstrap_env_vars(self) -> Mapping[str, str]:
"""Encode context injector and message reader params as environment variables.
Passing environment variables is the typical way to expose the pipes I/O parameters
to a pipes process.
Returns:
Mapping[str, str]: Environment variables to pass to the external process. The values are
serialized as json, compressed with gzip, and then base-64-encoded.
"""
return {
param_name: encode_param(param_value)
for param_name, param_value in self.get_bootstrap_params().items()
}
[docs]
@public
def get_bootstrap_cli_arguments(self) -> Mapping[str, str]:
"""Encode context injector and message reader params as CLI arguments.
Passing CLI arguments is an alternative way to expose the pipes I/O parameters to a pipes process.
Using environment variables should be preferred when possible.
Returns:
Mapping[str, str]: CLI arguments pass to the external process. The values are
serialized as json, compressed with zlib, and then base64-encoded.
"""
return {
_env_var_to_cli_argument(param_name): encode_param(param_value)
for param_name, param_value in self.get_bootstrap_params().items()
}
[docs]
@public
def get_bootstrap_params(self) -> Mapping[str, Any]:
"""Get the params necessary to bootstrap a launched pipes process. These parameters are typically
are as environment variable. See `get_bootstrap_env_vars`. It is the context injector's
responsibility to decide how to pass these parameters to the external environment.
Returns:
Mapping[str, str]: Parameters to pass to the external process and their corresponding
values that must be passed by the context injector.
"""
return {
DAGSTER_PIPES_CONTEXT_ENV_VAR: self.context_injector_params,
DAGSTER_PIPES_MESSAGES_ENV_VAR: self.message_reader_params,
}
[docs]
@public
def get_results(
self,
*,
implicit_materializations: bool = True,
metadata: Optional[Mapping[str, MetadataValue]] = None,
) -> Sequence[PipesExecutionResult]:
""":py:class:`PipesExecutionResult` objects reported from the external process,
potentially modified by Pipes.
Args:
implicit_materializations (bool): Create MaterializeResults for expected assets
even was nothing is reported from the external process.
metadata (Optional[Mapping[str, MetadataValue]]): Arbitrary metadata that will be attached to all
results generated by the invocation. Useful for attaching information to
asset materializations and checks that is available via the external process launch API
but not in the external process itself (e.g. a job_id param returned by the launch API call).
Returns:
Sequence[PipesExecutionResult]: Result reported by external process.
"""
metadata = metadata or {}
reported = self.message_handler.get_reported_results()
reported = tuple(self._inject_metadata_into_results(reported, metadata))
if not implicit_materializations:
return reported
reported_keys = set(
result.asset_key for result in reported if isinstance(result, MaterializeResult)
)
implicit = [
MaterializeResult(asset_key=key)
for key in self.context.selected_asset_keys
if key not in reported_keys
]
implicit = self._inject_metadata_into_results(implicit, metadata)
return (*reported, *implicit)
[docs]
@public
def get_reported_results(self) -> Sequence[PipesExecutionResult]:
""":py:class:`PipesExecutionResult` objects only explicitly received from the external process.
Returns:
Sequence[PipesExecutionResult]: Result reported by external process.
"""
return self.message_handler.get_reported_results()
[docs]
@public
def get_custom_messages(self) -> Sequence[Any]:
"""Get the sequence of deserialized JSON data that was reported from the external process using
`report_custom_message`.
Returns: Sequence[Any]
"""
return self.message_handler.get_custom_messages()
def report_launched(self, launched_payload: PipesLaunchedData):
"""Submit arbitrary information about the launched process to Pipes.
This hook is not necessarily called in every pipes session. It is useful primarily when we wish to
condition the behavior of Pipes on some parameter that is only available after
external process launch (such as a run id in the external system). The code calling `open_pipes_session()`
is responsible for calling `PipesSession.report_launched()`.
Passes the `launched_payload` to the message reader's `on_launched` method.
"""
self.message_handler.on_launched(launched_payload)
def _inject_metadata_into_results(
self, results: Sequence[PipesExecutionResult], metadata: Mapping[str, MetadataValue]
) -> Sequence[PipesExecutionResult]:
results_with_metadata = []
for result in results:
result = result._replace(metadata={**(result.metadata or {}), **metadata}) # noqa: PLW2901
results_with_metadata.append(result)
return results_with_metadata
def build_external_execution_context_data(
context: Union[OpExecutionContext, AssetExecutionContext],
extras: Optional[PipesExtras],
) -> "PipesContextData":
asset_keys = (
[_convert_asset_key(key) for key in sorted(context.selected_asset_keys)]
if context.has_assets_def
else None
)
code_version_by_asset_key = (
{
_convert_asset_key(key): context.assets_def.code_versions_by_key[key]
for key in context.selected_asset_keys
}
if context.has_assets_def
else None
)
provenance_by_asset_key = (
{
_convert_asset_key(key): _convert_data_provenance(context.get_asset_provenance(key))
for key in context.selected_asset_keys
}
if context.has_assets_def
else None
)
partition_key = context.partition_key if context.has_partition_key else None
partition_key_range = context.partition_key_range if context.has_partition_key else None
partition_time_window = (
context.partition_time_window
if context.has_partition_key
and has_one_dimension_time_window_partitioning(
context.get_step_execution_context().run_partitions_def
)
else None
)
return PipesContextData(
asset_keys=asset_keys,
code_version_by_asset_key=code_version_by_asset_key,
provenance_by_asset_key=provenance_by_asset_key,
partition_key=partition_key,
partition_key_range=(
_convert_partition_key_range(partition_key_range) if partition_key_range else None
),
partition_time_window=(
_convert_time_window(partition_time_window) if partition_time_window else None
),
run_id=context.run_id,
job_name=None if isinstance(context, BaseDirectExecutionContext) else context.job_name,
retry_number=0 if isinstance(context, BaseDirectExecutionContext) else context.retry_number,
extras=extras or {},
)
def _convert_asset_key(asset_key: AssetKey) -> str:
r"""Convert asset key to Pipes-compatible string representation.
This includes escaping forward slashes (/) with backslashes (\).
"""
return asset_key.to_escaped_user_string()
def _convert_data_provenance(
provenance: Optional[DataProvenance],
) -> Optional["PipesDataProvenance"]:
return (
None
if provenance is None
else PipesDataProvenance(
code_version=provenance.code_version,
input_data_versions={
_convert_asset_key(k): v.value for k, v in provenance.input_data_versions.items()
},
is_user_provided=provenance.is_user_provided,
)
)
def _convert_time_window(
time_window: TimeWindow,
) -> "PipesTimeWindow":
return PipesTimeWindow(
start=time_window.start.isoformat(),
end=time_window.end.isoformat(),
)
def _convert_partition_key_range(
partition_key_range: PartitionKeyRange,
) -> "PipesTimeWindow":
return PipesTimeWindow(
start=partition_key_range.start,
end=partition_key_range.end,
)
def _ser_err_from_pipes_exc(exc: PipesException):
return SerializableErrorInfo(
message=exc["message"],
stack=exc["stack"],
cls_name=exc["name"],
cause=_ser_err_from_pipes_exc(exc["cause"]) if exc["cause"] else None,
context=_ser_err_from_pipes_exc(exc["context"]) if exc["context"] else None,
)