Ask AI

Source code for dagster._core.storage.file_manager

import io
import os
import shutil
import uuid
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import BinaryIO, ContextManager, Iterator, Optional, TextIO, Union

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import public
from dagster._config import Field, StringSource
from dagster._core.definitions.resource_definition import dagster_maintained_resource, resource
from dagster._core.execution.context.init import InitResourceContext
from dagster._core.instance import DagsterInstance
from dagster._core.storage.temp_file_manager import TempfileManager
from dagster._utils import mkdir_p

IOStream: TypeAlias = Union[TextIO, BinaryIO]


[docs] class FileHandle(ABC): """A reference to a file as manipulated by a FileManager. Subclasses may handle files that are resident on the local file system, in an object store, or in any arbitrary place where a file can be stored. This exists to handle the very common case where you wish to write a computation that reads, transforms, and writes files, but where you also want the same code to work in local development as well as on a cluster where the files will be stored in a globally available object store such as S3. """ @public @property @abstractmethod def path_desc(self) -> str: """A representation of the file path for display purposes only.""" raise NotImplementedError()
[docs] class LocalFileHandle(FileHandle): """A reference to a file on a local filesystem.""" def __init__(self, path: str): self._path = check.str_param(path, "path") @public @property def path(self) -> str: """The file's path.""" return self._path @public @property def path_desc(self) -> str: """A representation of the file path for display purposes only.""" return self._path
[docs] class FileManager(ABC): """Base class for all file managers in dagster. The file manager is an interface that can be implemented by resources to provide abstract access to a file system such as local disk, S3, or other cloud storage. For examples of usage, see the documentation of the concrete file manager implementations. """
[docs] @public @abstractmethod def copy_handle_to_local_temp(self, file_handle: FileHandle) -> str: """Copy a file represented by a file handle to a temp file. In an implementation built around an object store such as S3, this method would be expected to download the file from S3 to local filesystem in a location assigned by the standard library's :py:mod:`python:tempfile` module. Temp files returned by this method are *not* guaranteed to be reusable across solid boundaries. For files that must be available across solid boundaries, use the :py:meth:`~dagster._core.storage.file_manager.FileManager.read`, :py:meth:`~dagster._core.storage.file_manager.FileManager.read_data`, :py:meth:`~dagster._core.storage.file_manager.FileManager.write`, and :py:meth:`~dagster._core.storage.file_manager.FileManager.write_data` methods. Args: file_handle (FileHandle): The handle to the file to make available as a local temp file. Returns: str: Path to the local temp file. """ raise NotImplementedError()
[docs] @public @abstractmethod def delete_local_temp(self) -> None: """Delete all local temporary files created by previous calls to :py:meth:`~dagster._core.storage.file_manager.FileManager.copy_handle_to_local_temp`. Should typically only be called by framework implementors. """ raise NotImplementedError()
[docs] @public @abstractmethod def read(self, file_handle: FileHandle, mode: str = "rb") -> ContextManager[IOStream]: """Return a file-like stream for the file handle. This may incur an expensive network call for file managers backed by object stores such as S3. Args: file_handle (FileHandle): The file handle to make available as a stream. mode (str): The mode in which to open the file. Default: ``"rb"``. Returns: Union[TextIO, BinaryIO]: A file-like stream. """ raise NotImplementedError()
[docs] @public @abstractmethod def read_data(self, file_handle: FileHandle) -> bytes: """Return the bytes for a given file handle. This may incur an expensive network call for file managers backed by object stores such as s3. Args: file_handle (FileHandle): The file handle for which to return bytes. Returns: bytes: Bytes for a given file handle. """ raise NotImplementedError()
[docs] @public @abstractmethod def write(self, file_obj: IOStream, mode: str = "wb", ext: Optional[str] = None) -> FileHandle: """Write the bytes contained within the given file object into the file manager. Args: file_obj (Union[TextIO, StringIO]): A file-like object. mode (Optional[str]): The mode in which to write the file into the file manager. Default: ``"wb"``. ext (Optional[str]): For file managers that support file extensions, the extension with which to write the file. Default: ``None``. Returns: FileHandle: A handle to the newly created file. """ raise NotImplementedError()
[docs] @public @abstractmethod def write_data(self, data: bytes, ext: Optional[str] = None) -> FileHandle: """Write raw bytes into the file manager. Args: data (bytes): The bytes to write into the file manager. ext (Optional[str]): For file managers that support file extensions, the extension with which to write the file. Default: ``None``. Returns: FileHandle: A handle to the newly created file. """ raise NotImplementedError()
[docs] @dagster_maintained_resource @resource(config_schema={"base_dir": Field(StringSource, is_required=False)}) def local_file_manager(init_context: InitResourceContext) -> "LocalFileManager": """FileManager that provides abstract access to a local filesystem. By default, files will be stored in `<local_artifact_storage>/storage/file_manager` where `<local_artifact_storage>` can be configured the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. Examples: .. code-block:: python import tempfile from dagster import job, local_file_manager, op @op(required_resource_keys={"file_manager"}) def write_files(context): fh_1 = context.resources.file_manager.write_data(b"foo") with tempfile.NamedTemporaryFile("w+") as fd: fd.write("bar") fd.seek(0) fh_2 = context.resources.file_manager.write(fd, mode="w", ext=".txt") return (fh_1, fh_2) @op(required_resource_keys={"file_manager"}) def read_files(context, file_handles): fh_1, fh_2 = file_handles assert context.resources.file_manager.read_data(fh_2) == b"bar" fd = context.resources.file_manager.read(fh_2, mode="r") assert fd.read() == "foo" fd.close() @job(resource_defs={"file_manager": local_file_manager}) def files_pipeline(): read_files(write_files()) Or to specify the file directory: .. code-block:: python @job( resource_defs={ "file_manager": local_file_manager.configured({"base_dir": "/my/base/dir"}) } ) def files_pipeline(): read_files(write_files()) """ return LocalFileManager( base_dir=init_context.resource_config.get( "base_dir", os.path.join(init_context.instance.storage_directory(), "file_manager"), # type: ignore # (possible none) ) )
def check_file_like_obj(obj: object) -> None: check.invariant(obj and hasattr(obj, "read") and hasattr(obj, "write")) class LocalFileManager(FileManager): def __init__(self, base_dir: str): self.base_dir = base_dir self._base_dir_ensured = False self._temp_file_manager = TempfileManager() @staticmethod def for_instance(instance: DagsterInstance, run_id: str) -> "LocalFileManager": check.inst_param(instance, "instance", DagsterInstance) return LocalFileManager(instance.file_manager_directory(run_id)) def ensure_base_dir_exists(self) -> None: if self._base_dir_ensured: return mkdir_p(self.base_dir) self._base_dir_ensured = True def copy_handle_to_local_temp(self, file_handle: FileHandle) -> str: check.inst_param(file_handle, "file_handle", FileHandle) with self.read(file_handle, "rb") as handle_obj: # type: ignore # (??) temp_file_obj = self._temp_file_manager.tempfile() temp_file_obj.write(handle_obj.read()) # pyright: ignore[reportCallIssue,reportArgumentType] temp_name = temp_file_obj.name temp_file_obj.close() return temp_name @contextmanager def read(self, file_handle: LocalFileHandle, mode: str = "rb") -> Iterator[IOStream]: check.inst_param(file_handle, "file_handle", LocalFileHandle) check.str_param(mode, "mode") check.param_invariant(mode in {"r", "rb"}, "mode") encoding = None if mode == "rb" else "utf8" with open(file_handle.path, mode, encoding=encoding) as file_obj: yield file_obj # type: ignore # (??) def read_data(self, file_handle: LocalFileHandle) -> bytes: with self.read(file_handle, mode="rb") as file_obj: return file_obj.read() # type: ignore # (??) def write_data(self, data: bytes, ext: Optional[str] = None): check.inst_param(data, "data", bytes) return self.write(io.BytesIO(data), mode="wb", ext=ext) def write( self, file_obj: IOStream, mode: str = "wb", ext: Optional[str] = None ) -> LocalFileHandle: check_file_like_obj(file_obj) check.opt_str_param(ext, "ext") self.ensure_base_dir_exists() dest_file_path = os.path.join( self.base_dir, str(uuid.uuid4()) + (("." + ext) if ext is not None else "") ) encoding = None if "b" in mode else "utf8" with open(dest_file_path, mode, encoding=encoding) as dest_file_obj: shutil.copyfileobj(file_obj, dest_file_obj) # type: ignore # (??) return LocalFileHandle(dest_file_path) def delete_local_temp(self) -> None: self._temp_file_manager.close()