Source code for dagster._core.storage.root
import os
from tempfile import TemporaryDirectory
from typing import Optional
from typing_extensions import TypedDict
from dagster import (
StringSource,
_check as check,
)
from dagster._config.config_schema import UserConfigSchema
from dagster._serdes import ConfigurableClass, ConfigurableClassData
class LocalArtifactStorageConfig(TypedDict):
base_dir: str
[docs]
class LocalArtifactStorage(ConfigurableClass):
def __init__(self, base_dir: str, inst_data: Optional[ConfigurableClassData] = None):
self._base_dir = base_dir
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data
@property
def base_dir(self) -> str:
return self._base_dir
def file_manager_dir(self, run_id: str) -> str:
check.str_param(run_id, "run_id")
return os.path.join(self.base_dir, "storage", run_id, "files")
@property
def storage_dir(self) -> str:
return os.path.join(self.base_dir, "storage")
@property
def schedules_dir(self) -> str:
return os.path.join(self.base_dir, "schedules")
@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: LocalArtifactStorageConfig
) -> "LocalArtifactStorage":
return LocalArtifactStorage(inst_data=inst_data, **config_value)
@classmethod
def config_type(cls) -> UserConfigSchema:
return {"base_dir": StringSource}
def dispose(self):
pass
class TemporaryLocalArtifactStorage(LocalArtifactStorage):
"""Used by ephemeral DagsterInstances, defers directory creation til
access since many uses of ephemeral instance do not require artifact directory.
"""
def __init__(self):
self._temp_dir = None
@property
def base_dir(self):
if self._temp_dir is None:
self._temp_dir = TemporaryDirectory()
return self._temp_dir.name
def dispose(self):
if self._temp_dir:
self._temp_dir.cleanup()