Ask AI

Source code for dagster._core.pipes.utils

import datetime
import json
import os
import sys
import tempfile
import time
import warnings
from abc import ABC, abstractmethod
from contextlib import contextmanager
from threading import Event, Thread
from typing import IO, Dict, Iterator, Optional, Sequence, Tuple, TypeVar, Union

from dagster_pipes import (
    PIPES_PROTOCOL_VERSION_FIELD,
    PipesContextData,
    PipesDefaultContextLoader,
    PipesDefaultMessageWriter,
    PipesExtras,
    PipesOpenedData,
    PipesParams,
)

from dagster import (
    OpExecutionContext,
    _check as check,
)
from dagster._core.errors import DagsterInvariantViolationError, DagsterPipesExecutionError
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.pipes.client import PipesContextInjector, PipesLaunchedData, PipesMessageReader
from dagster._core.pipes.context import (
    PipesMessageHandler,
    PipesSession,
    build_external_execution_context_data,
)
from dagster._utils import tail_file

TCursor = TypeVar("TCursor")


_CONTEXT_INJECTOR_FILENAME = "context"
_MESSAGE_READER_FILENAME = "messages"


[docs] class PipesFileContextInjector(PipesContextInjector): """Context injector that injects context data into the external process by writing it to a specified file. Args: path (str): The path of a file to which to write context data. The file will be deleted on close of the pipes session. """ def __init__(self, path: str): self._path = check.str_param(path, "path") @contextmanager def inject_context(self, context_data: "PipesContextData") -> Iterator[PipesParams]: """Inject context to external environment by writing it to a file as JSON and exposing the path to the file. Args: context_data (PipesContextData): The context data to inject. Yields: PipesParams: A dict of parameters that can be used by the external process to locate and load the injected context data. """ with open(self._path, "w") as input_stream: json.dump(context_data, input_stream) try: yield {PipesDefaultContextLoader.FILE_PATH_KEY: self._path} finally: if os.path.exists(self._path): os.remove(self._path) def no_messages_debug_text(self) -> str: return f"Attempted to inject context via file {self._path}"
[docs] class PipesTempFileContextInjector(PipesContextInjector): """Context injector that injects context data into the external process by writing it to an automatically-generated temporary file. """ @contextmanager def inject_context(self, context: "PipesContextData") -> Iterator[PipesParams]: """Inject context to external environment by writing it to an automatically-generated temporary file as JSON and exposing the path to the file. Args: context_data (PipesContextData): The context data to inject. Yields: PipesParams: A dict of parameters that can be used by the external process to locate and load the injected context data. """ with tempfile.TemporaryDirectory() as tempdir: with PipesFileContextInjector( os.path.join(tempdir, _CONTEXT_INJECTOR_FILENAME) ).inject_context(context) as params: yield params def no_messages_debug_text(self) -> str: return "Attempted to inject context via a temporary file."
[docs] class PipesEnvContextInjector(PipesContextInjector): """Context injector that injects context data into the external process by injecting it directly into the external process environment.""" @contextmanager def inject_context( self, context_data: "PipesContextData", ) -> Iterator[PipesParams]: """Inject context to external environment by embedding directly in the parameters that will be passed to the external process (typically as environment variables). Args: context_data (PipesContextData): The context data to inject. Yields: PipesParams: A dict of parameters that can be used by the external process to locate and load the injected context data. """ yield {PipesDefaultContextLoader.DIRECT_KEY: context_data} def no_messages_debug_text(self) -> str: return "Attempted to inject context directly, typically as an environment variable."
[docs] class PipesFileMessageReader(PipesMessageReader): """Message reader that reads messages by tailing a specified file. Args: path (str): The path of the file to which messages will be written. The file will be deleted on close of the pipes session. """ def __init__(self, path: str): self._path = check.str_param(path, "path") def on_launched(self, params: PipesLaunchedData) -> None: self.launched_payload = params @contextmanager def read_messages( self, handler: "PipesMessageHandler", ) -> Iterator[PipesParams]: """Set up a thread to read streaming messages from the external process by tailing the target file. Args: handler (PipesMessageHandler): object to process incoming messages Yields: PipesParams: A dict of parameters that specifies where a pipes process should write pipes protocol messages. """ is_session_closed = Event() thread = None try: open(self._path, "w").close() # create file thread = Thread( target=self._reader_thread, args=(handler, is_session_closed), daemon=True, ) thread.start() yield {PipesDefaultMessageWriter.FILE_PATH_KEY: self._path} finally: is_session_closed.set() if thread: thread.join() if os.path.exists(self._path): os.remove(self._path) def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None: try: for line in tail_file(self._path, lambda: is_resource_complete.is_set()): message = json.loads(line) handler.handle_message(message) except: handler.report_pipes_framework_exception( f"{self.__class__.__name__} reader thread", sys.exc_info(), ) raise def no_messages_debug_text(self) -> str: return f"Attempted to read messages from file {self._path}."
[docs] class PipesTempFileMessageReader(PipesMessageReader): """Message reader that reads messages by tailing an automatically-generated temporary file.""" @contextmanager def read_messages( self, handler: "PipesMessageHandler", ) -> Iterator[PipesParams]: """Set up a thread to read streaming messages from the external process by an automatically-generated temporary file. Args: handler (PipesMessageHandler): object to process incoming messages Yields: PipesParams: A dict of parameters that specifies where a pipes process should write pipes protocol messages. """ with tempfile.TemporaryDirectory() as tempdir: with PipesFileMessageReader( os.path.join(tempdir, _MESSAGE_READER_FILENAME) ).read_messages(handler) as params: yield params def no_messages_debug_text(self) -> str: return "Attempted to read messages from a local temporary file."
# Time in seconds to wait between attempts when polling for some condition. Default value that is # used in several places. DEFAULT_SLEEP_INTERVAL = 1 # Wait up to this many seconds for threads to finish executing during cleanup. Note that this must # be longer than WAIT_FOR_LOGS_TIMEOUT. THREAD_WAIT_TIMEOUT = 120 # Buffer period after the external process has completed before we attempt to close out the log # thread. The purpose of this is to allow an external system to update logs to their final state, # which may take some time after the external process has completed. This is subtly different than # `WAIT_FOR_LOGS_TIMEOUT`, which is the amount of time we will wait for targeted log files to exist # at all. In contrast, `WAIT_FOR_LOGS_AFTER_EXECUTION_SECONDS` is used to guard against the case # where log files exist but incomplete immediately after the external process has completed. WAIT_FOR_LOGS_AFTER_EXECUTION_INTERVAL = 10 # Number of seconds to wait after an external process has completed to close # out the log thread. This is necessary because the log thread contains two # points at which it waits indefinitely: (1) for the `opened` message to be # received; (2) for the log files to exist (which may not occur until some time # after the external process has completed). WAIT_FOR_LOGS_TIMEOUT = 60 class PipesThreadedMessageReader(PipesMessageReader): """A base class for message readers that read messages and logs in background threads. Args: interval (float): The interval in seconds at which to poll for messages. log_readers (Optional[Sequence[PipesLogReader]]): A set of log readers to use to read logs. """ interval: float log_readers: Dict[str, "PipesLogReader"] opened_payload: Optional[PipesOpenedData] launched_payload: Optional[PipesLaunchedData] def __init__( self, interval: float = 10, log_readers: Optional[Sequence["PipesLogReader"]] = None, ): self.interval = interval self.log_readers = { str(i): reader for i, reader in enumerate( check.opt_sequence_param(log_readers, "log_readers", of_type=PipesLogReader) ) } self.opened_payload = None self.launched_payload = None @contextmanager def read_messages( self, handler: "PipesMessageHandler", ) -> Iterator[PipesParams]: """Set up a thread to read streaming messages by periodically reading message chunks from a target location. Args: handler (PipesMessageHandler): object to process incoming messages Yields: PipesParams: A dict of parameters that specifies where a pipes process should write pipes protocol message chunks. """ with self.get_params() as params: is_session_closed = Event() messages_thread = None logs_thread = None try: messages_thread = Thread( target=self._messages_thread, args=(handler, params, is_session_closed), daemon=True, ) messages_thread.start() logs_thread = Thread( target=self._logs_thread, args=(handler, params, is_session_closed, messages_thread), daemon=True, ) logs_thread.start() yield params finally: is_session_closed.set() if messages_thread: _join_thread(messages_thread, "messages") if logs_thread: _join_thread(logs_thread, "logs") def on_opened(self, opened_payload: PipesOpenedData) -> None: self.opened_payload = opened_payload def on_launched(self, launched_payload: PipesLaunchedData) -> None: self.launched_payload = launched_payload def add_log_reader(self, log_reader: "PipesLogReader") -> None: """Can be used to attach extra log readers to the message reader. Typically called when the target for reading logs is not known until after the external process has started (for example, when the target depends on an external job_id). The LogReader will be eventually started by the PipesThreadedMessageReader. """ key = str(len(self.log_readers)) self.log_readers[key] = log_reader @abstractmethod def messages_are_readable(self, params: PipesParams) -> bool: ... @abstractmethod @contextmanager def get_params(self) -> Iterator[PipesParams]: """Yield a set of parameters to be passed to a message writer in a pipes process. Yields: PipesParams: A dict of parameters that specifies where a pipes process should write pipes protocol message chunks. """ @abstractmethod def download_messages( self, cursor: Optional[TCursor], params: PipesParams ) -> Optional[Tuple[TCursor, str]]: """Download a chunk of messages from the target location. Args: cursor (Optional[Any]): Cursor specifying start location from which to download messages in a stream. The format of the value varies with the message reader implementation. It might be an integer index for a line in a log file, or a timestamp for a message in a time-indexed stream. params (PipesParams): A dict of parameters that specifies where to download messages from. """ ... def _messages_thread( self, handler: "PipesMessageHandler", params: PipesParams, is_session_closed: Event, ) -> None: try: start_or_last_download = datetime.datetime.now() session_closed_at = None cursor = None can_read_messages = False # main loop to read messages # at every step, we: # - exit early if we have received the closed message # - consume params from the launched_payload if possible # - check if we can start reading messages (e.g. log files are available) # - download a chunk of messages and process them # - if is_session_closed is set, we exit the loop after waiting for WAIT_FOR_LOGS_AFTER_EXECUTION_INTERVAL while True: # if we have the closed message, we can exit # since the message reader has been started and the external process has completed if handler.received_closed_message: return if not can_read_messages: # this branch will be executed until we can read messages # check for new params in case they have been updated params = {**params, **(self.launched_payload or {})} can_read_messages = self.messages_are_readable(params) now = datetime.datetime.now() if ( now - start_or_last_download ).seconds > self.interval or is_session_closed.is_set(): if can_read_messages: start_or_last_download = now result = self.download_messages(cursor, params) if result is not None: cursor, chunk = result for line in chunk.split("\n"): try: message = json.loads(line) if PIPES_PROTOCOL_VERSION_FIELD in message.keys(): handler.handle_message(message) except json.JSONDecodeError: pass time.sleep(DEFAULT_SLEEP_INTERVAL) if is_session_closed.is_set(): if session_closed_at is None: session_closed_at = datetime.datetime.now() # After the external process has completed, we don't want to immediately exit if ( datetime.datetime.now() - session_closed_at ).seconds > WAIT_FOR_LOGS_AFTER_EXECUTION_INTERVAL: if not can_read_messages: self._log_unstartable_warning(handler, params) return except: handler.report_pipes_framework_exception( f"{self.__class__.__name__} messages thread", sys.exc_info(), ) raise def _log_unstartable_warning(self, handler: PipesMessageHandler, params: PipesParams) -> None: if self.launched_payload is not None: handler._context.log.warning( # noqa: SLF001 f"[pipes] Target of {self.__class__.__name__} is not readable after receiving extra params from the external process (`on_launched` has been called)" ) else: handler._context.log.warning( # noqa: SLF001 f"[pipes] Target of {self.__class__.__name__} is not readable." ) def _logs_thread( self, handler: "PipesMessageHandler", params: PipesParams, is_session_closed: Event, messages_thread: Thread, ) -> None: wait_for_logs_start = None # Loop over all log readers and start them if the target is readable, which typically means # a file exists at the target location. Different execution environments may write logs at # different times (e.g., some may write logs periodically during execution, while others may # only write logs after the process has completed). try: unstarted_log_readers = {**self.log_readers} while True: if self.opened_payload is not None: params = {**params, **self.opened_payload} # periodically check for new readers which may be added after the # external process has started and add them to the unstarted log readers for key in self.log_readers: if key not in unstarted_log_readers: unstarted_log_readers[key] = self.log_readers[key] for key in list(unstarted_log_readers.keys()).copy(): if unstarted_log_readers[key].target_is_readable(params): reader = unstarted_log_readers.pop(key) reader.start(params, is_session_closed) # In some cases logs might not be written out until after the external process has # exited. That will leave us in this state, where some log readers have not been # started even though the external process is finished. We start a timer and wait # for up to WAIT_FOR_LOGS_TIMEOUT seconds for the logs to be written. If they are # not written after this amount of time has elapsed, we warn the user and bail. if is_session_closed.is_set(): if wait_for_logs_start is None: wait_for_logs_start = datetime.datetime.now() if not unstarted_log_readers: return elif ( unstarted_log_readers and (datetime.datetime.now() - wait_for_logs_start).seconds > WAIT_FOR_LOGS_TIMEOUT ): for key, log_reader in unstarted_log_readers.items(): warnings.warn( log_reader.with_debug_info( f"[pipes] Attempted to read log for reader {log_reader.name} but log was" f" still not written {WAIT_FOR_LOGS_TIMEOUT} seconds after session close. Abandoning reader {key}." ) ) return time.sleep(DEFAULT_SLEEP_INTERVAL) except Exception: handler.report_pipes_framework_exception( f"{self.__class__.__name__} logs thread", sys.exc_info(), ) raise finally: for log_reader in self.log_readers.values(): if log_reader.is_running(): log_reader.stop()
[docs] class PipesBlobStoreMessageReader(PipesThreadedMessageReader): """Message reader that reads a sequence of message chunks written by an external process into a blob store such as S3, Azure blob storage, or GCS. The reader maintains a counter, starting at 1, that is synchronized with a message writer in some pipes process. The reader starts a thread that periodically attempts to read a chunk indexed by the counter at some location expected to be written by the pipes process. The chunk should be a file with each line corresponding to a JSON-encoded pipes message. When a chunk is successfully read, the messages are processed and the counter is incremented. The :py:class:`PipesBlobStoreMessageWriter` on the other end is expected to similarly increment a counter (starting from 1) on successful write, keeping counters on the read and write end in sync. If `log_readers` is passed, the message reader will start the passed log readers when the `opened` message is received from the external process. Args: interval (float): interval in seconds between attempts to download a chunk log_readers (Optional[Sequence[PipesLogReader]]): A set of log readers to use to read logs. """ counter: int def __init__( self, interval: float = 10, log_readers: Optional[Sequence["PipesLogReader"]] = None, ): super().__init__(interval=interval, log_readers=log_readers) self.counter = 1 @abstractmethod def download_messages_chunk(self, index: int, params: PipesParams) -> Optional[str]: ... # historical reasons, keeping the original interface of PipesBlobStoreMessageReader def download_messages( self, cursor: Optional[int], params: PipesParams ) -> Optional[Tuple[int, str]]: # mapping new interface to the old one # the old interface isn't using the cursor parameter, instead, it keeps track of counter in the "counter" attribute chunk = self.download_messages_chunk(self.counter, params) if chunk: self.counter += 1 return self.counter, chunk
class PipesLogReader(ABC): @abstractmethod def start(self, params: PipesParams, is_session_closed: Event) -> None: ... @abstractmethod def stop(self) -> None: ... @abstractmethod def is_running(self) -> bool: ... @abstractmethod def target_is_readable(self, params: PipesParams) -> bool: ... @property def name(self) -> str: """Override this to distinguish different log readers in error messages.""" return self.__class__.__name__ @property def debug_info(self) -> Optional[str]: """An optional message containing debug information about the log reader. It will be included in error messages when the log reader fails to start or throws an exception. """ def with_debug_info(self, message: str) -> str: """Helper method to include debug info in error messages.""" return f"{message}\nDebug info: {self.debug_info}" if self.debug_info else message class PipesChunkedLogReader(PipesLogReader): """Reader for reading stdout/stderr logs from a blob store such as S3, Azure blob storage, or GCS. Args: interval (float): interval in seconds between attempts to download a chunk. target_stream (IO[str]): The stream to which to write the logs. Typcially `sys.stdout` or `sys.stderr`. debug_info (Optional[str]): An optional message containing debug information about the log reader. """ def __init__( self, *, interval: float = 10, target_stream: IO[str], debug_info: Optional[str] = None ): self.interval = interval self.target_stream = target_stream self.thread: Optional[Thread] = None self._debug_info = debug_info @property def debug_info(self) -> Optional[str]: return self._debug_info @abstractmethod def download_log_chunk(self, params: PipesParams) -> Optional[str]: ... def start(self, params: PipesParams, is_session_closed: Event) -> None: self.thread = Thread(target=self._reader_thread, args=(params, is_session_closed)) self.thread.start() def stop(self) -> None: if self.thread is None: raise DagsterInvariantViolationError( "Attempted to wait for log reader to finish, but it was never started." ) _join_thread(self.thread, self.name) def is_running(self) -> bool: return self.thread is not None and self.thread.is_alive() def _reader_thread( self, params: PipesParams, is_session_closed: Event, ) -> None: start_or_last_download = datetime.datetime.now() after_execution_time_start = None while True: now = datetime.datetime.now() if (now - start_or_last_download).seconds > self.interval or is_session_closed.is_set(): start_or_last_download = now chunk = self.download_log_chunk(params) if chunk: self.target_stream.write(chunk) # After execution is complete, we don't want to immediately exit, because it is # possible the external system will take some time to flush logs to the external # storage system. Only exit after WAIT_FOR_LOGS_AFTER_EXECUTION_INTERVAL seconds # have elapsed. elif is_session_closed.is_set(): if after_execution_time_start is None: after_execution_time_start = datetime.datetime.now() elif ( datetime.datetime.now() - after_execution_time_start ).seconds > WAIT_FOR_LOGS_AFTER_EXECUTION_INTERVAL: break time.sleep(self.interval) def _join_thread(thread: Thread, thread_name: str) -> None: thread.join(timeout=THREAD_WAIT_TIMEOUT) if thread.is_alive(): raise DagsterPipesExecutionError(f"Timed out waiting for {thread_name} thread to finish.") def forward_only_logs_to_file(log_line: str, file: IO[str]): """Will write the log line to the file if it is not a Pipes message.""" try: message = json.loads(log_line) if PIPES_PROTOCOL_VERSION_FIELD in message.keys(): return else: file.writelines((log_line, "\n")) except Exception: file.writelines((log_line, "\n")) def extract_message_or_forward_to_file( handler: "PipesMessageHandler", log_line: str, file: IO[str] ): # exceptions as control flow, you love to see it try: message = json.loads(log_line) if PIPES_PROTOCOL_VERSION_FIELD in message.keys(): handler.handle_message(message) else: file.writelines((log_line, "\n")) except Exception: # move non-message logs in to file for compute log capture file.writelines((log_line, "\n")) def extract_message_or_forward_to_stdout(handler: "PipesMessageHandler", log_line: str): extract_message_or_forward_to_file(handler=handler, log_line=log_line, file=sys.stdout) _FAIL_TO_YIELD_ERROR_MESSAGE = ( "Did you forget to `yield from pipes_session.get_results()` or `return" " <PipesClient>.run(...).get_results`? If using `open_pipes_session`," " `pipes_session.get_results` should be called once after the `open_pipes_session` block has" " exited to yield any remaining buffered results via `<PipesSession>.get_results()`." " If using `<PipesClient>.run`, you should always return" " `<PipesClient>.run(...).get_results()` or `<PipesClient>.run(...).get_materialize_result()`." )
[docs] @contextmanager def open_pipes_session( context: Union[OpExecutionContext, AssetExecutionContext], context_injector: PipesContextInjector, message_reader: PipesMessageReader, extras: Optional[PipesExtras] = None, ) -> Iterator[PipesSession]: """Context manager that opens and closes a pipes session. This context manager should be used to wrap the launch of an external process using the pipe protocol to report results back to Dagster. The yielded :py:class:`PipesSession` should be used to (a) obtain the environment variables that need to be provided to the external process; (b) access results streamed back from the external process. This method is an alternative to :py:class:`PipesClient` subclasses for users who want more control over how pipes processes are launched. When using `open_pipes_session`, it is the user's responsibility to inject the message reader and context injector parameters available on the yielded `PipesSession` and pass them to the appropriate API when launching the external process. Typically these parameters should be set as environment variables. Args: context (Union[OpExecutionContext, AssetExecutionContext]): The context for the current op/asset execution. context_injector (PipesContextInjector): The context injector to use to inject context into the external process. message_reader (PipesMessageReader): The message reader to use to read messages from the external process. extras (Optional[PipesExtras]): Optional extras to pass to the external process via the injected context. Yields: PipesSession: Interface for interacting with the external process. .. code-block:: python import subprocess from dagster import open_pipes_session extras = {"foo": "bar"} @asset def ext_asset(context: AssetExecutionContext): with open_pipes_session( context=context, extras={"foo": "bar"}, context_injector=PipesTempFileContextInjector(), message_reader=PipesTempFileMessageReader(), ) as pipes_session: subprocess.Popen( ["/bin/python", "/path/to/script.py"], env={**pipes_session.get_bootstrap_env_vars()} ) while process.poll() is None: yield from pipes_session.get_results() yield from pipes_session.get_results() """ # if we are in the context of an asset, set up a defensive check to ensure the event stream got returned if context.has_assets_def: context.set_requires_typed_event_stream(error_message=_FAIL_TO_YIELD_ERROR_MESSAGE) context_data = build_external_execution_context_data(context, extras) message_handler = PipesMessageHandler(context, message_reader) try: with context_injector.inject_context( context_data ) as ci_params, message_handler.handle_messages() as mr_params: yield PipesSession( context_data=context_data, message_handler=message_handler, context_injector_params=ci_params, message_reader_params=mr_params, context=context, ) finally: if not message_handler.received_opened_message: context.log.warning( "[pipes] did not receive any messages from external process. Check stdout / stderr" " logs from the external process if" f" possible.\n{context_injector.__class__.__name__}:" f" {context_injector.no_messages_debug_text()}\n{message_reader.__class__.__name__}:" f" {message_reader.no_messages_debug_text()}\n" ) elif not message_handler.received_closed_message: context.log.warning( "[pipes] did not receive closed message from external process. Buffered messages" " may have been discarded without being delivered. Use `open_dagster_pipes` as a" " context manager (a with block) to ensure that cleanup is successfully completed." " If that is not possible, manually call `PipesContext.close()` before process" " exit." )