Ask AI

Source code for dagster._core.definitions.resource_definition

from functools import update_wrapper
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Any,
    Callable,
    Dict,
    Iterator,
    Mapping,
    Optional,
    Union,
    cast,
    overload,
)

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import experimental_param, public
from dagster._core.decorator_utils import format_docstring_for_description
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.configurable import AnonymousConfigurableDefinition
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidInvocationError
from dagster._utils import IHasInternalInit

from ..decorator_utils import (
    get_function_params,
    has_at_least_one_parameter,
    is_required_param,
    positional_arg_name_list,
    validate_expected_params,
)
from .definition_config_schema import (
    CoercableToConfigSchema,
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from .resource_invocation import resource_invocation_result
from .resource_requirement import (
    RequiresResources,
    ResourceDependencyRequirement,
    ResourceRequirement,
)
from .scoped_resources_builder import (  # re-exported
    IContainsGenerator as IContainsGenerator,
    Resources as Resources,
    ScopedResourcesBuilder as ScopedResourcesBuilder,
)

if TYPE_CHECKING:
    from dagster._core.execution.resources_init import InitResourceContext

ResourceFunctionWithContext: TypeAlias = Callable[["InitResourceContext"], Any]
ResourceFunctionWithoutContext: TypeAlias = Callable[[], Any]
ResourceFunction: TypeAlias = Union[
    ResourceFunctionWithContext,
    ResourceFunctionWithoutContext,
]


[docs]@experimental_param(param="version") class ResourceDefinition(AnonymousConfigurableDefinition, RequiresResources, IHasInternalInit): """Core class for defining resources. Resources are scoped ways to make external resources (like database connections) available to ops and assets during job execution and to clean up after execution resolves. If resource_fn yields once rather than returning (in the manner of functions decorable with :py:func:`@contextlib.contextmanager <python:contextlib.contextmanager>`) then the body of the function after the yield will be run after execution resolves, allowing users to write their own teardown/cleanup logic. Depending on your executor, resources may be instantiated and cleaned up more than once in a job execution. Args: resource_fn (Callable[[InitResourceContext], Any]): User-provided function to instantiate the resource, which will be made available to executions keyed on the ``context.resources`` object. config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check that config provided for the resource matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the resource. description (Optional[str]): A human-readable description of the resource. required_resource_keys: (Optional[Set[str]]) Keys for the resources required by this resource. A DagsterInvariantViolationError will be raised during initialization if dependencies are cyclic. version (Optional[str]): (Experimental) The version of the resource's definition fn. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs. """ def __init__( self, resource_fn: ResourceFunction, config_schema: CoercableToConfigSchema = None, description: Optional[str] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ): self._resource_fn = check.callable_param(resource_fn, "resource_fn") self._config_schema = convert_user_facing_definition_config_schema(config_schema) self._description = check.opt_str_param(description, "description") self._required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys" ) self._version = check.opt_str_param(version, "version") # this attribute will be updated by the @dagster_maintained_resource and @dagster_maintained_io_manager decorators self._dagster_maintained = False self._hardcoded_resource_type = None @staticmethod def dagster_internal_init( *, resource_fn: ResourceFunction, config_schema: CoercableToConfigSchema, description: Optional[str], required_resource_keys: Optional[AbstractSet[str]], version: Optional[str], ) -> "ResourceDefinition": return ResourceDefinition( resource_fn=resource_fn, config_schema=config_schema, description=description, required_resource_keys=required_resource_keys, version=version, ) @property def resource_fn(self) -> ResourceFunction: return self._resource_fn @property def config_schema(self) -> IDefinitionConfigSchema: return self._config_schema @public @property def description(self) -> Optional[str]: """A human-readable description of the resource.""" return self._description @public @property def version(self) -> Optional[str]: """A string which can be used to identify a particular code version of a resource definition.""" return self._version @public @property def required_resource_keys(self) -> AbstractSet[str]: """A set of the resource keys that this resource depends on. These keys will be made available to the resource's init context during execution, and the resource will not be instantiated until all required resources are available. """ return self._required_resource_keys def _is_dagster_maintained(self) -> bool: return self._dagster_maintained
[docs] @public @staticmethod def none_resource(description: Optional[str] = None) -> "ResourceDefinition": """A helper function that returns a none resource. Args: description ([Optional[str]]): The description of the resource. Defaults to None. Returns: [ResourceDefinition]: A resource that does nothing. """ return ResourceDefinition.hardcoded_resource(value=None, description=description)
[docs] @public @staticmethod def hardcoded_resource(value: Any, description: Optional[str] = None) -> "ResourceDefinition": """A helper function that creates a ``ResourceDefinition`` with a hardcoded object. Args: value (Any): The value that will be accessible via context.resources.resource_name. description ([Optional[str]]): The description of the resource. Defaults to None. Returns: [ResourceDefinition]: A hardcoded resource. """ resource_def = ResourceDefinition( resource_fn=lambda _init_context: value, description=description ) # Make sure telemetry info gets passed in to hardcoded resources if hasattr(value, "_is_dagster_maintained"): resource_def._dagster_maintained = value._is_dagster_maintained() # noqa: SLF001 resource_def._hardcoded_resource_type = type(value) # noqa: SLF001 return resource_def
[docs] @public @staticmethod def mock_resource(description: Optional[str] = None) -> "ResourceDefinition": """A helper function that creates a ``ResourceDefinition`` which wraps a ``mock.MagicMock``. Args: description ([Optional[str]]): The description of the resource. Defaults to None. Returns: [ResourceDefinition]: A resource that creates the magic methods automatically and helps you mock existing resources. """ from unittest import mock return ResourceDefinition( resource_fn=lambda _init_context: mock.MagicMock(), description=description )
[docs] @public @staticmethod def string_resource(description: Optional[str] = None) -> "ResourceDefinition": """Creates a ``ResourceDefinition`` which takes in a single string as configuration and returns this configured string to any ops or assets which depend on it. Args: description ([Optional[str]]): The description of the string resource. Defaults to None. Returns: [ResourceDefinition]: A resource that takes in a single string as configuration and returns that string. """ return ResourceDefinition( resource_fn=lambda init_context: init_context.resource_config, config_schema=str, description=description, )
def copy_for_configured( self, description: Optional[str], config_schema: CoercableToConfigSchema, ) -> "ResourceDefinition": resource_def = ResourceDefinition.dagster_internal_init( config_schema=config_schema, description=description or self.description, resource_fn=self.resource_fn, required_resource_keys=self.required_resource_keys, version=self.version, ) resource_def._dagster_maintained = self._is_dagster_maintained() # noqa: SLF001 return resource_def def __call__(self, *args, **kwargs): from dagster._core.execution.context.init import UnboundInitResourceContext if has_at_least_one_parameter(self.resource_fn): if len(args) + len(kwargs) == 0: raise DagsterInvalidInvocationError( "Resource initialization function has context argument, but no context was" " provided when invoking." ) if len(args) + len(kwargs) > 1: raise DagsterInvalidInvocationError( "Initialization of resource received multiple arguments. Only a first " "positional context parameter should be provided when invoking." ) context_param_name = get_function_params(self.resource_fn)[0].name if args: check.opt_inst_param(args[0], context_param_name, UnboundInitResourceContext) return resource_invocation_result( self, cast(Optional[UnboundInitResourceContext], args[0]) ) else: if context_param_name not in kwargs: raise DagsterInvalidInvocationError( f"Resource initialization expected argument '{context_param_name}'." ) check.opt_inst_param( kwargs[context_param_name], context_param_name, UnboundInitResourceContext ) return resource_invocation_result( self, cast(Optional[UnboundInitResourceContext], kwargs[context_param_name]) ) elif len(args) + len(kwargs) > 0: raise DagsterInvalidInvocationError( "Attempted to invoke resource with argument, but underlying function has no context" " argument. Either specify a context argument on the resource function, or remove" " the passed-in argument." ) else: return resource_invocation_result(self, None) def get_resource_requirements( self, outer_context: Optional[object] = None ) -> Iterator[ResourceRequirement]: source_key = cast(str, outer_context) for resource_key in sorted(list(self.required_resource_keys)): yield ResourceDependencyRequirement(key=resource_key, source_key=source_key)
def dagster_maintained_resource( resource_def: ResourceDefinition, ) -> ResourceDefinition: resource_def._dagster_maintained = True # noqa: SLF001 return resource_def class _ResourceDecoratorCallable: def __init__( self, config_schema: Optional[Mapping[str, Any]] = None, description: Optional[str] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ): self.config_schema = config_schema # checked by underlying definition self.description = check.opt_str_param(description, "description") self.version = check.opt_str_param(version, "version") self.required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys" ) def __call__(self, resource_fn: ResourceFunction) -> ResourceDefinition: check.callable_param(resource_fn, "resource_fn") any_name = ["*"] if has_at_least_one_parameter(resource_fn) else [] params = get_function_params(resource_fn) missing_positional = validate_expected_params(params, any_name) if missing_positional: raise DagsterInvalidDefinitionError( f"@resource decorated function '{resource_fn.__name__}' expects a single " "positional argument." ) extras = params[len(any_name) :] required_extras = list(filter(is_required_param, extras)) if required_extras: raise DagsterInvalidDefinitionError( f"@resource decorated function '{resource_fn.__name__}' expects only a single" " positional required argument. Got required extra params" f" {', '.join(positional_arg_name_list(required_extras))}" ) resource_def = ResourceDefinition.dagster_internal_init( resource_fn=resource_fn, config_schema=self.config_schema, description=self.description or format_docstring_for_description(resource_fn), version=self.version, required_resource_keys=self.required_resource_keys, ) # `update_wrapper` typing cannot currently handle a Union of Callables correctly update_wrapper(resource_def, wrapped=resource_fn) # type: ignore return resource_def @overload def resource(config_schema: ResourceFunction) -> ResourceDefinition: ... @overload def resource( config_schema: CoercableToConfigSchema = ..., description: Optional[str] = ..., required_resource_keys: Optional[AbstractSet[str]] = ..., version: Optional[str] = ..., ) -> Callable[[ResourceFunction], "ResourceDefinition"]: ...
[docs]def resource( config_schema: Union[ResourceFunction, CoercableToConfigSchema] = None, description: Optional[str] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Union[Callable[[ResourceFunction], "ResourceDefinition"], "ResourceDefinition"]: """Define a resource. The decorated function should accept an :py:class:`InitResourceContext` and return an instance of the resource. This function will become the ``resource_fn`` of an underlying :py:class:`ResourceDefinition`. If the decorated function yields once rather than returning (in the manner of functions decorable with :py:func:`@contextlib.contextmanager <python:contextlib.contextmanager>`) then the body of the function after the yield will be run after execution resolves, allowing users to write their own teardown/cleanup logic. Args: config_schema (Optional[ConfigSchema]): The schema for the config. Configuration data available in `init_context.resource_config`. If not set, Dagster will accept any config provided. description(Optional[str]): A human-readable description of the resource. version (Optional[str]): (Experimental) The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs. required_resource_keys (Optional[Set[str]]): Keys for the resources required by this resource. """ # This case is for when decorator is used bare, without arguments. # E.g. @resource versus @resource() if callable(config_schema) and not is_callable_valid_config_arg(config_schema): return _ResourceDecoratorCallable()(config_schema) def _wrap(resource_fn: ResourceFunction) -> "ResourceDefinition": return _ResourceDecoratorCallable( config_schema=cast(Optional[Dict[str, Any]], config_schema), description=description, required_resource_keys=required_resource_keys, version=version, )(resource_fn) return _wrap
[docs]def make_values_resource(**kwargs: Any) -> ResourceDefinition: """A helper function that creates a ``ResourceDefinition`` to take in user-defined values. This is useful for sharing values between ops. Args: **kwargs: Arbitrary keyword arguments that will be passed to the config schema of the returned resource definition. If not set, Dagster will accept any config provided for the resource. For example: .. code-block:: python @op(required_resource_keys={"globals"}) def my_op(context): print(context.resources.globals["my_str_var"]) @job(resource_defs={"globals": make_values_resource(my_str_var=str, my_int_var=int)}) def my_job(): my_op() Returns: ResourceDefinition: A resource that passes in user-defined values. """ return ResourceDefinition( resource_fn=lambda init_context: init_context.resource_config, config_schema=kwargs or Any, )