import io
import os
import shutil
import uuid
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import BinaryIO, ContextManager, Iterator, Optional, TextIO, Union
from typing_extensions import TypeAlias
import dagster._check as check
from dagster._annotations import public
from dagster._config import Field, StringSource
from dagster._core.definitions.resource_definition import dagster_maintained_resource, resource
from dagster._core.execution.context.init import InitResourceContext
from dagster._core.instance import DagsterInstance
from dagster._core.storage.temp_file_manager import TempfileManager
from dagster._utils import mkdir_p
IOStream: TypeAlias = Union[TextIO, BinaryIO]
[docs]
class FileHandle(ABC):
"""A reference to a file as manipulated by a FileManager.
Subclasses may handle files that are resident on the local file system, in an object store, or
in any arbitrary place where a file can be stored.
This exists to handle the very common case where you wish to write a computation that reads,
transforms, and writes files, but where you also want the same code to work in local development
as well as on a cluster where the files will be stored in a globally available object store
such as S3.
"""
@public
@property
@abstractmethod
def path_desc(self) -> str:
"""A representation of the file path for display purposes only."""
raise NotImplementedError()
[docs]
class LocalFileHandle(FileHandle):
"""A reference to a file on a local filesystem."""
def __init__(self, path: str):
self._path = check.str_param(path, "path")
@public
@property
def path(self) -> str:
"""The file's path."""
return self._path
@public
@property
def path_desc(self) -> str:
"""A representation of the file path for display purposes only."""
return self._path
[docs]
class FileManager(ABC):
"""Base class for all file managers in dagster.
The file manager is an interface that can be implemented by resources to provide abstract
access to a file system such as local disk, S3, or other cloud storage.
For examples of usage, see the documentation of the concrete file manager implementations.
"""
[docs]
@public
@abstractmethod
def copy_handle_to_local_temp(self, file_handle: FileHandle) -> str:
"""Copy a file represented by a file handle to a temp file.
In an implementation built around an object store such as S3, this method would be expected
to download the file from S3 to local filesystem in a location assigned by the standard
library's :py:mod:`python:tempfile` module.
Temp files returned by this method are *not* guaranteed to be reusable across solid
boundaries. For files that must be available across solid boundaries, use the
:py:meth:`~dagster._core.storage.file_manager.FileManager.read`,
:py:meth:`~dagster._core.storage.file_manager.FileManager.read_data`,
:py:meth:`~dagster._core.storage.file_manager.FileManager.write`, and
:py:meth:`~dagster._core.storage.file_manager.FileManager.write_data` methods.
Args:
file_handle (FileHandle): The handle to the file to make available as a local temp file.
Returns:
str: Path to the local temp file.
"""
raise NotImplementedError()
[docs]
@public
@abstractmethod
def delete_local_temp(self) -> None:
"""Delete all local temporary files created by previous calls to
:py:meth:`~dagster._core.storage.file_manager.FileManager.copy_handle_to_local_temp`.
Should typically only be called by framework implementors.
"""
raise NotImplementedError()
[docs]
@public
@abstractmethod
def read(self, file_handle: FileHandle, mode: str = "rb") -> ContextManager[IOStream]:
"""Return a file-like stream for the file handle.
This may incur an expensive network call for file managers backed by object stores
such as S3.
Args:
file_handle (FileHandle): The file handle to make available as a stream.
mode (str): The mode in which to open the file. Default: ``"rb"``.
Returns:
Union[TextIO, BinaryIO]: A file-like stream.
"""
raise NotImplementedError()
[docs]
@public
@abstractmethod
def read_data(self, file_handle: FileHandle) -> bytes:
"""Return the bytes for a given file handle. This may incur an expensive network
call for file managers backed by object stores such as s3.
Args:
file_handle (FileHandle): The file handle for which to return bytes.
Returns:
bytes: Bytes for a given file handle.
"""
raise NotImplementedError()
[docs]
@public
@abstractmethod
def write(self, file_obj: IOStream, mode: str = "wb", ext: Optional[str] = None) -> FileHandle:
"""Write the bytes contained within the given file object into the file manager.
Args:
file_obj (Union[TextIO, StringIO]): A file-like object.
mode (Optional[str]): The mode in which to write the file into the file manager.
Default: ``"wb"``.
ext (Optional[str]): For file managers that support file extensions, the extension with
which to write the file. Default: ``None``.
Returns:
FileHandle: A handle to the newly created file.
"""
raise NotImplementedError()
[docs]
@public
@abstractmethod
def write_data(self, data: bytes, ext: Optional[str] = None) -> FileHandle:
"""Write raw bytes into the file manager.
Args:
data (bytes): The bytes to write into the file manager.
ext (Optional[str]): For file managers that support file extensions, the extension with
which to write the file. Default: ``None``.
Returns:
FileHandle: A handle to the newly created file.
"""
raise NotImplementedError()
[docs]
@dagster_maintained_resource
@resource(config_schema={"base_dir": Field(StringSource, is_required=False)})
def local_file_manager(init_context: InitResourceContext) -> "LocalFileManager":
"""FileManager that provides abstract access to a local filesystem.
By default, files will be stored in `<local_artifact_storage>/storage/file_manager` where
`<local_artifact_storage>` can be configured the ``dagster.yaml`` file in ``$DAGSTER_HOME``.
Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API.
Examples:
.. code-block:: python
import tempfile
from dagster import job, local_file_manager, op
@op(required_resource_keys={"file_manager"})
def write_files(context):
fh_1 = context.resources.file_manager.write_data(b"foo")
with tempfile.NamedTemporaryFile("w+") as fd:
fd.write("bar")
fd.seek(0)
fh_2 = context.resources.file_manager.write(fd, mode="w", ext=".txt")
return (fh_1, fh_2)
@op(required_resource_keys={"file_manager"})
def read_files(context, file_handles):
fh_1, fh_2 = file_handles
assert context.resources.file_manager.read_data(fh_2) == b"bar"
fd = context.resources.file_manager.read(fh_2, mode="r")
assert fd.read() == "foo"
fd.close()
@job(resource_defs={"file_manager": local_file_manager})
def files_pipeline():
read_files(write_files())
Or to specify the file directory:
.. code-block:: python
@job(
resource_defs={
"file_manager": local_file_manager.configured({"base_dir": "/my/base/dir"})
}
)
def files_pipeline():
read_files(write_files())
"""
return LocalFileManager(
base_dir=init_context.resource_config.get(
"base_dir",
os.path.join(init_context.instance.storage_directory(), "file_manager"), # type: ignore # (possible none)
)
)
def check_file_like_obj(obj: object) -> None:
check.invariant(obj and hasattr(obj, "read") and hasattr(obj, "write"))
class LocalFileManager(FileManager):
def __init__(self, base_dir: str):
self.base_dir = base_dir
self._base_dir_ensured = False
self._temp_file_manager = TempfileManager()
@staticmethod
def for_instance(instance: DagsterInstance, run_id: str) -> "LocalFileManager":
check.inst_param(instance, "instance", DagsterInstance)
return LocalFileManager(instance.file_manager_directory(run_id))
def ensure_base_dir_exists(self) -> None:
if self._base_dir_ensured:
return
mkdir_p(self.base_dir)
self._base_dir_ensured = True
def copy_handle_to_local_temp(self, file_handle: FileHandle) -> str:
check.inst_param(file_handle, "file_handle", FileHandle)
with self.read(file_handle, "rb") as handle_obj: # type: ignore # (??)
temp_file_obj = self._temp_file_manager.tempfile()
temp_file_obj.write(handle_obj.read())
temp_name = temp_file_obj.name
temp_file_obj.close()
return temp_name
@contextmanager
def read(self, file_handle: LocalFileHandle, mode: str = "rb") -> Iterator[IOStream]:
check.inst_param(file_handle, "file_handle", LocalFileHandle)
check.str_param(mode, "mode")
check.param_invariant(mode in {"r", "rb"}, "mode")
encoding = None if mode == "rb" else "utf8"
with open(file_handle.path, mode, encoding=encoding) as file_obj:
yield file_obj # type: ignore # (??)
def read_data(self, file_handle: LocalFileHandle) -> bytes:
with self.read(file_handle, mode="rb") as file_obj:
return file_obj.read() # type: ignore # (??)
def write_data(self, data: bytes, ext: Optional[str] = None):
check.inst_param(data, "data", bytes)
return self.write(io.BytesIO(data), mode="wb", ext=ext)
def write(
self, file_obj: IOStream, mode: str = "wb", ext: Optional[str] = None
) -> LocalFileHandle:
check_file_like_obj(file_obj)
check.opt_str_param(ext, "ext")
self.ensure_base_dir_exists()
dest_file_path = os.path.join(
self.base_dir, str(uuid.uuid4()) + (("." + ext) if ext is not None else "")
)
encoding = None if "b" in mode else "utf8"
with open(dest_file_path, mode, encoding=encoding) as dest_file_obj:
shutil.copyfileobj(file_obj, dest_file_obj) # type: ignore # (??)
return LocalFileHandle(dest_file_path)
def delete_local_temp(self) -> None:
self._temp_file_manager.close()