Ask AI

Source code for dagster._core.storage.compute_log_manager

import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from enum import Enum
from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple

from typing_extensions import Final, Self

import dagster._check as check
from dagster._core.captured_log_api import LogLineCursor
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance

MAX_BYTES_CHUNK_READ: Final = 4194304  # 4 MB


class ComputeIOType(Enum):
    STDOUT = "stdout"
    STDERR = "stderr"


class CapturedLogContext(
    NamedTuple(
        "_CapturedLogContext",
        [
            ("log_key", Sequence[str]),
            ("external_url", Optional[str]),
            ("external_stdout_url", Optional[str]),
            ("external_stderr_url", Optional[str]),
        ],
    )
):
    """Object representing the context in which logs are captured.  Can be used by external logging
    sidecar implementations to point the Dagster UI to an external url to view compute logs instead of a
    Dagster-managed location.
    """

    def __new__(
        cls,
        log_key: Sequence[str],
        external_stdout_url: Optional[str] = None,
        external_stderr_url: Optional[str] = None,
        external_url: Optional[str] = None,
    ):
        if external_url and (external_stdout_url or external_stderr_url):
            check.failed(
                "Cannot specify both `external_url` and one of"
                " `external_stdout_url`/`external_stderr_url`"
            )

        return super(CapturedLogContext, cls).__new__(
            cls,
            log_key,
            external_stdout_url=external_stdout_url,
            external_stderr_url=external_stderr_url,
            external_url=external_url,
        )


class CapturedLogData(
    NamedTuple(
        "_CapturedLogData",
        [
            ("log_key", Sequence[str]),
            ("stdout", Optional[bytes]),
            ("stderr", Optional[bytes]),
            ("cursor", Optional[str]),
        ],
    )
):
    """Object representing captured log data, either a partial chunk of the log data or the full
    capture.  Contains the raw bytes and optionally the cursor offset for the partial chunk.
    """

    def __new__(
        cls,
        log_key: Sequence[str],
        stdout: Optional[bytes] = None,
        stderr: Optional[bytes] = None,
        cursor: Optional[str] = None,
    ):
        return super(CapturedLogData, cls).__new__(cls, log_key, stdout, stderr, cursor)


class CapturedLogMetadata(
    NamedTuple(
        "_CapturedLogMetadata",
        [
            ("stdout_location", Optional[str]),
            ("stderr_location", Optional[str]),
            ("stdout_download_url", Optional[str]),
            ("stderr_download_url", Optional[str]),
        ],
    )
):
    """Object representing metadata info for the captured log data, containing a display string for
    the location of the log data and a URL for direct download of the captured log data.
    """

    def __new__(
        cls,
        stdout_location: Optional[str] = None,
        stderr_location: Optional[str] = None,
        stdout_download_url: Optional[str] = None,
        stderr_download_url: Optional[str] = None,
    ):
        return super(CapturedLogMetadata, cls).__new__(
            cls,
            stdout_location=stdout_location,
            stderr_location=stderr_location,
            stdout_download_url=stdout_download_url,
            stderr_download_url=stderr_download_url,
        )


class CapturedLogSubscription:
    def __init__(
        self,
        manager: "ComputeLogManager[T_DagsterInstance]",
        log_key: Sequence[str],
        cursor: Optional[str],
    ):
        self._manager = manager
        self._log_key = log_key
        self._cursor = cursor
        self._observer: Optional[Callable[[CapturedLogData], None]] = None
        self.is_complete = False

    def __call__(self, observer: Optional[Callable[[CapturedLogData], None]]) -> Self:
        self._observer = observer
        self.fetch()
        if self._manager.is_capture_complete(self._log_key):
            self.complete()
        return self

    @property
    def log_key(self) -> Sequence[str]:
        return self._log_key

    def dispose(self) -> None:
        self._observer = None
        self._manager.unsubscribe(self)

    def fetch(self) -> None:
        if not self._observer:
            return

        should_fetch = True
        while should_fetch:
            log_data = self._manager.get_log_data(
                self._log_key,
                self._cursor,
                max_bytes=MAX_BYTES_CHUNK_READ,
            )
            if not self._cursor or log_data.cursor != self._cursor:
                self._observer(log_data)
                self._cursor = log_data.cursor
            should_fetch = _has_max_data(log_data.stdout) or _has_max_data(log_data.stderr)

    def complete(self) -> None:
        self.is_complete = True


def _has_max_data(chunk: Optional[bytes]) -> bool:
    # function is used as predicate but does not actually return a boolean
    return chunk and len(chunk) >= MAX_BYTES_CHUNK_READ  # type: ignore


[docs] class ComputeLogManager(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract base class for capturing the unstructured logs (stdout/stderr) in the current process, stored / retrieved with a provided log_key. """ @abstractmethod @contextmanager def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]: """Context manager for capturing the stdout/stderr within the current process, and persisting it under the given log key. Args: log_key (List[String]): The log key identifying the captured logs """ @abstractmethod @contextmanager def open_log_stream( self, log_key: Sequence[str], io_type: ComputeIOType ) -> Iterator[Optional[IO[bytes]]]: """Context manager for providing an IO stream that enables the caller to write to a log stream managed by the captured log manager, to be read later using the given log key. Args: log_key (List[String]): The log key identifying the captured logs """ @abstractmethod def is_capture_complete(self, log_key: Sequence[str]) -> bool: """Flag indicating when the log capture for a given log key has completed. Args: log_key (List[String]): The log key identifying the captured logs Returns: Boolean """ @abstractmethod def get_log_data( self, log_key: Sequence[str], cursor: Optional[str] = None, max_bytes: Optional[int] = None, ) -> CapturedLogData: """Returns a chunk of the captured stdout logs for a given log key. Args: log_key (List[String]): The log key identifying the captured logs cursor (Optional[str]): A cursor representing the position of the log chunk to fetch max_bytes (Optional[int]): A limit on the size of the log chunk to fetch Returns: CapturedLogData """ @abstractmethod def get_log_metadata(self, log_key: Sequence[str]) -> CapturedLogMetadata: """Returns the metadata of the captured logs for a given log key, including displayable information on where the logs are persisted. Args: log_key (List[String]): The log key identifying the captured logs Returns: CapturedLogMetadata """ @abstractmethod def delete_logs( self, log_key: Optional[Sequence[str]] = None, prefix: Optional[Sequence[str]] = None ) -> None: """Deletes the captured logs for a given log key. Args: log_key(Optional[List[String]]): The log key of the logs to delete prefix(Optional[List[String]]): The prefix of the log keys to delete """ @abstractmethod def subscribe( self, log_key: Sequence[str], cursor: Optional[str] = None ) -> CapturedLogSubscription: """Registers an observable object for log data. Args: log_key (List[String]): The log key identifying the captured logs cursor (Optional[String]): The string cursor marking the position within the log stream Returns: CapturedLogSubscription """ def unsubscribe(self, subscription: CapturedLogSubscription) -> None: """Deregisters an observable object from receiving log updates. Args: subscription (CapturedLogSubscription): subscription object which manages when to send back data to the subscriber """ pass def dispose(self): pass def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]: """Legacy adapter to translate run_id/key to captured log manager-based log_key.""" return [run_id, "compute_logs", step_key] def get_log_keys_for_log_key_prefix( self, log_key_prefix: Sequence[str], io_type: ComputeIOType ) -> Sequence[Sequence[str]]: """Returns the logs keys for a given log key prefix. This is determined by looking at the directory defined by the log key prefix and creating a log_key for each file in the directory. """ raise NotImplementedError("Must implement get_log_keys_for_log_key_prefix") def _get_log_lines_for_log_key( self, log_key: Sequence[str], io_type: ComputeIOType ) -> Sequence[str]: """For a log key, gets the corresponding file, and splits the file into lines.""" log_data = self.get_log_data(log_key) if io_type == ComputeIOType.STDOUT: raw_logs = log_data.stdout.decode("utf-8") if log_data.stdout else "" else: raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else "" log_lines = raw_logs.split("\n") return log_lines def read_log_lines_for_log_key_prefix( self, log_key_prefix: Sequence[str], cursor: Optional[str], io_type: ComputeIOType ) -> Tuple[Sequence[str], Optional[LogLineCursor]]: """For a given directory defined by log_key_prefix that contains files, read the logs from the files as if they are a single continuous file. Reads env var DAGSTER_CAPTURED_LOG_CHUNK_SIZE lines at a time. Returns the lines read and the next cursor. Note that the has_more_now attribute of the cursor indicates if there are more logs that can be read immediately. If has_more_now if False, the process producing logs could still be running and dump more logs into the directory at a later time. """ num_lines = int(os.getenv("DAGSTER_CAPTURED_LOG_CHUNK_SIZE", "1000")) # find all of the log_keys to read from and sort them in the order to be read log_keys = sorted( self.get_log_keys_for_log_key_prefix(log_key_prefix, io_type=io_type), key=lambda x: "/".join(x), ) if len(log_keys) == 0: return [], None log_cursor = LogLineCursor.parse(cursor) if cursor else None if log_cursor is None: log_key_to_fetch_idx = 0 line_cursor = 0 else: log_key_to_fetch_idx = log_keys.index(log_cursor.log_key) line_cursor = log_cursor.line if line_cursor == -1: # line_cursor for -1 means the entirety of the file has been read, but the next file # didn't exist yet. So we see if a new file has been added. # if the next file doesn't exist yet, return if log_key_to_fetch_idx + 1 >= len(log_keys): return [], log_cursor log_key_to_fetch_idx += 1 line_cursor = 0 log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx], io_type=io_type) records = [] has_more = True while len(records) < num_lines: remaining_log_lines = log_lines[line_cursor:] remaining_lines_to_fetch = num_lines - len(records) if remaining_lines_to_fetch < len(remaining_log_lines): records.extend(remaining_log_lines[:remaining_lines_to_fetch]) line_cursor += remaining_lines_to_fetch else: records.extend(remaining_log_lines) line_cursor = -1 if line_cursor == -1: # we've read the entirety of the file, update the cursor if log_key_to_fetch_idx + 1 >= len(log_keys): # no more files to process has_more = False break log_key_to_fetch_idx += 1 line_cursor = 0 if len(records) < num_lines: # we still need more records, so fetch the next file log_lines = self._get_log_lines_for_log_key( log_keys[log_key_to_fetch_idx], io_type=io_type ) new_cursor = LogLineCursor( log_key=log_keys[log_key_to_fetch_idx], line=line_cursor, has_more_now=has_more ) return records, new_cursor