Ask AI

Source code for dagster._core.storage.noop_compute_log_manager

from contextlib import contextmanager
from typing import IO, Any, Generator, Mapping, Optional, Sequence

from typing_extensions import Self

import dagster._check as check
from dagster._core.storage.captured_log_manager import (
    CapturedLogContext,
    CapturedLogData,
    CapturedLogManager,
    CapturedLogMetadata,
    CapturedLogSubscription,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData

from .compute_log_manager import (
    MAX_BYTES_FILE_READ,
    ComputeIOType,
    ComputeLogFileData,
    ComputeLogManager,
)


[docs]class NoOpComputeLogManager(CapturedLogManager, ComputeLogManager, ConfigurableClass): """When enabled for a Dagster instance, stdout and stderr will not be available for any step.""" def __init__(self, inst_data: Optional[ConfigurableClassData] = None): self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) @property def inst_data(self): return self._inst_data @classmethod def config_type(cls): return {} @classmethod def from_config_value( cls, inst_data: ConfigurableClassData, config_value: Mapping[str, Any] ) -> Self: return cls(inst_data=inst_data, **config_value) def enabled(self, _dagster_run, _step_key): return False def _watch_logs(self, dagster_run, step_key=None): pass def get_local_path(self, run_id: str, key: str, io_type: ComputeIOType) -> str: raise NotImplementedError() def is_watch_completed(self, run_id, key): return True def on_watch_start(self, dagster_run, step_key): pass def on_watch_finish(self, dagster_run, step_key): pass def download_url(self, run_id, key, io_type): return None def read_logs_file(self, run_id, key, io_type, cursor=0, max_bytes=MAX_BYTES_FILE_READ): return ComputeLogFileData( path=f"{key}.{io_type}", data=None, cursor=0, size=0, download_url=None ) def on_subscribe(self, subscription): pass def on_unsubscribe(self, subscription): pass @contextmanager def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]: yield CapturedLogContext(log_key=log_key) def is_capture_complete(self, log_key: Sequence[str]): return True @contextmanager def open_log_stream( self, log_key: Sequence[str], io_type: ComputeIOType ) -> Generator[Optional[IO], None, None]: yield None def get_log_data( self, log_key: Sequence[str], cursor: Optional[str] = None, max_bytes: Optional[int] = None, ) -> CapturedLogData: return CapturedLogData(log_key=log_key) def get_log_metadata(self, log_key: Sequence[str]) -> CapturedLogMetadata: return CapturedLogMetadata() def delete_logs( self, log_key: Optional[Sequence[str]] = None, prefix: Optional[Sequence[str]] = None ): pass def subscribe( self, log_key: Sequence[str], cursor: Optional[str] = None ) -> CapturedLogSubscription: return CapturedLogSubscription(self, log_key, cursor) def unsubscribe(self, subscription: CapturedLogSubscription): pass