Source code for dagster.core.definitions.intermediate_storage

from functools import update_wrapper

from dagster import check
from dagster.core.definitions.configurable import ConfigurableDefinition

from .definition_config_schema import convert_user_facing_definition_config_schema
from .utils import check_valid_name


[docs]class IntermediateStorageDefinition(ConfigurableDefinition): """Defines intermediate data storage behaviors. Args: name (str): Name of the storage mode. is_persistent (bool): Whether the storage is persistent in a way that can cross process/node boundaries. Re-execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode. required_resource_keys(Optional[Set[str]]): The resources that this storage needs at runtime to function. config_schema (Optional[ConfigSchema]): The schema for the storage's configuration schema. Configuration data passed in this schema will be made available to the ``intermediate_storage_creation_fn`` under ``init_context.intermediate_storage_config``. intermediate_storage_creation_fn: (Callable[[InitIntermediateStorageContext], IntermediateStorage]) Called to construct the storage. This function should consume the init context and emit a :py:class:`IntermediateStorage`. """ def __init__( self, name, is_persistent, required_resource_keys, config_schema=None, intermediate_storage_creation_fn=None, description=None, ): self._name = check_valid_name(name) self._is_persistent = check.bool_param(is_persistent, "is_persistent") self._config_schema = convert_user_facing_definition_config_schema(config_schema) self._intermediate_storage_creation_fn = check.opt_callable_param( intermediate_storage_creation_fn, "intermediate_storage_creation_fn" ) self._required_resource_keys = frozenset( check.set_param( required_resource_keys if required_resource_keys else set(), "required_resource_keys", of_type=str, ) ) self._description = check.opt_str_param(description, "description") @property def name(self): return self._name @property def description(self): return self._description @property def is_persistent(self): return self._is_persistent @property def config_schema(self): return self._config_schema @property def intermediate_storage_creation_fn(self): return self._intermediate_storage_creation_fn @property def required_resource_keys(self): return self._required_resource_keys def copy_for_configured(self, name, description, config_schema, _): return IntermediateStorageDefinition( name=name or self.name, is_persistent=self.is_persistent, required_resource_keys=self.required_resource_keys, config_schema=config_schema, intermediate_storage_creation_fn=self.intermediate_storage_creation_fn, description=description or self.description, )
[docs]def intermediate_storage( required_resource_keys=None, name=None, is_persistent=True, config_schema=None ): """Creates an intermediate storage definition The decorated function will be passed as the ``intermediate_storage_creation_fn`` to a :py:class:`IntermediateStorageDefinition`. Args: name (str): The name of the intermediate storage. is_persistent (bool): Whether the storage is persistent in a way that can cross process/node boundaries. Re-execution with, for example, the multiprocess executor, or with dagster-airflow, requires a persistent storage mode. required_resource_keys (Optional[Set[str]]): The resources that this storage needs at runtime to function. config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in `init_context.intermediate_storage_config`. """ if callable(name): check.invariant(is_persistent is True) check.invariant(config_schema is None) check.invariant(required_resource_keys is None) return _IntermediateStorageDecoratorCallable()(name) return _IntermediateStorageDecoratorCallable( name=name, is_persistent=is_persistent, config_schema=config_schema, required_resource_keys=required_resource_keys, )
class _IntermediateStorageDecoratorCallable: def __init__( self, name=None, is_persistent=True, config_schema=None, required_resource_keys=None ): self.name = check.opt_str_param(name, "name") self.is_persistent = check.bool_param(is_persistent, "is_persistent") self.config_schema = config_schema # will be checked in definition self.required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys", of_type=str ) def __call__(self, fn): check.callable_param(fn, "fn") if not self.name: self.name = fn.__name__ storage_def = IntermediateStorageDefinition( name=self.name, is_persistent=self.is_persistent, config_schema=self.config_schema, intermediate_storage_creation_fn=fn, required_resource_keys=self.required_resource_keys, ) update_wrapper(storage_def, wrapped=fn) return storage_def