Ask AI

Source code for dagster._core.storage.input_manager

from abc import ABC, abstractmethod
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Callable, Optional, Union, cast, overload

from typing_extensions import TypeAlias, TypeGuard

import dagster._check as check
from dagster._core.decorator_utils import has_at_least_one_parameter
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.definition_config_schema import (
    CoercableToConfigSchema,
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from dagster._core.definitions.resource_definition import ResourceDefinition, ResourceFunction

if TYPE_CHECKING:
    from dagster._core.execution.context.input import InputContext

InputLoadFn: TypeAlias = Union[
    Callable[["InputContext"], object],
    Callable[[], object],
]


[docs]class InputManager(ABC): """Base interface for classes that are responsible for loading solid inputs.""" @abstractmethod def load_input(self, context: "InputContext") -> object: """The user-defined read method that loads an input to a solid. Args: context (InputContext): The input context. Returns: Any: The data object. """
class IInputManagerDefinition: @property @abstractmethod def input_config_schema(self) -> IDefinitionConfigSchema: """The schema for per-input configuration for inputs that are managed by this input manager. """
[docs]class InputManagerDefinition(ResourceDefinition, IInputManagerDefinition): """Definition of an input manager resource. Input managers load op inputs. An InputManagerDefinition is a :py:class:`ResourceDefinition` whose resource_fn returns an :py:class:`InputManager`. The easiest way to create an InputManagerDefinition is with the :py:func:`@input_manager <input_manager>` decorator. """ def __init__( self, resource_fn: ResourceFunction, config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ): self._input_config_schema = convert_user_facing_definition_config_schema( input_config_schema ) super(InputManagerDefinition, self).__init__( resource_fn=resource_fn, config_schema=config_schema, description=description, required_resource_keys=required_resource_keys, version=version, ) @property def input_config_schema(self) -> IDefinitionConfigSchema: return self._input_config_schema def copy_for_configured( self, description: Optional[str], config_schema: CoercableToConfigSchema, ) -> "InputManagerDefinition": return InputManagerDefinition( config_schema=config_schema, description=description or self.description, resource_fn=self.resource_fn, required_resource_keys=self.required_resource_keys, input_config_schema=self.input_config_schema, )
@overload def input_manager( config_schema: InputLoadFn, ) -> InputManagerDefinition: ... @overload def input_manager( config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Callable[[InputLoadFn], InputManagerDefinition]: ...
[docs]def input_manager( config_schema: Union[InputLoadFn, Optional[CoercableToConfigSchema]] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Union[InputManagerDefinition, Callable[[InputLoadFn], InputManagerDefinition]]: """Define an input manager. Input managers load op inputs, either from upstream outputs or by providing default values. The decorated function should accept a :py:class:`InputContext` and resource config, and return a loaded object that will be passed into one of the inputs of an op. The decorator produces an :py:class:`InputManagerDefinition`. Args: config_schema (Optional[ConfigSchema]): The schema for the resource-level config. If not set, Dagster will accept any config provided. description (Optional[str]): A human-readable description of the resource. input_config_schema (Optional[ConfigSchema]): A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided. required_resource_keys (Optional[Set[str]]): Keys for the resources required by the input manager. version (Optional[str]): (Experimental) the version of the input manager definition. **Examples:** .. code-block:: python from dagster import input_manager, op, job, In @input_manager def csv_loader(_): return read_csv("some/path") @op(ins={"input1": In(input_manager_key="csv_loader_key")}) def my_op(_, input1): do_stuff(input1) @job(resource_defs={"csv_loader_key": csv_loader}) def my_job(): my_op() @input_manager(config_schema={"base_dir": str}) def csv_loader(context): return read_csv(context.resource_config["base_dir"] + "/some/path") @input_manager(input_config_schema={"path": str}) def csv_loader(context): return read_csv(context.config["path"]) """ if _is_input_load_fn(config_schema): return _InputManagerDecoratorCallable()(config_schema) def _wrap(load_fn: InputLoadFn) -> InputManagerDefinition: return _InputManagerDecoratorCallable( config_schema=cast(CoercableToConfigSchema, config_schema), description=description, version=version, input_config_schema=input_config_schema, required_resource_keys=required_resource_keys, )(load_fn) return _wrap
def _is_input_load_fn(obj: Union[InputLoadFn, CoercableToConfigSchema]) -> TypeGuard[InputLoadFn]: return callable(obj) and not is_callable_valid_config_arg(obj) class InputManagerWrapper(InputManager): def __init__(self, load_fn: InputLoadFn): self._load_fn = load_fn def load_input(self, context: "InputContext") -> object: # the @input_manager decorated function (self._load_fn) may return a direct value that # should be used or an instance of an InputManager. So we call self._load_fn and see if the # result is an InputManager. If so we call it's load_input method intermediate = ( # type-ignore because function being used as attribute self._load_fn(context) if has_at_least_one_parameter(self._load_fn) else self._load_fn() # type: ignore # (strict type guard) ) if isinstance(intermediate, InputManager): return intermediate.load_input(context) return intermediate class _InputManagerDecoratorCallable: def __init__( self, config_schema: CoercableToConfigSchema = None, description: Optional[str] = None, version: Optional[str] = None, input_config_schema: CoercableToConfigSchema = None, required_resource_keys: Optional[AbstractSet[str]] = None, ): self.config_schema = config_schema self.description = check.opt_str_param(description, "description") self.version = check.opt_str_param(version, "version") self.input_config_schema = input_config_schema self.required_resource_keys = required_resource_keys def __call__(self, load_fn: InputLoadFn) -> InputManagerDefinition: check.callable_param(load_fn, "load_fn") def _resource_fn(_): return InputManagerWrapper(load_fn) input_manager_def = InputManagerDefinition( resource_fn=_resource_fn, config_schema=self.config_schema, description=self.description, version=self.version, input_config_schema=self.input_config_schema, required_resource_keys=self.required_resource_keys, ) # `update_wrapper` typing cannot currently handle a Union of Callables correctly update_wrapper(input_manager_def, wrapped=load_fn) # type: ignore return input_manager_def