Ask AI

Source code for dagster._serdes.config_class

import importlib
from abc import ABC, abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    Mapping,
    NamedTuple,
    Optional,
    Type,
    TypeVar,
    Union,
    cast,
    overload,
)

from typing_extensions import Self

import dagster._check as check
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._utils import convert_dagster_submodule_name
from dagster._utils.yaml_utils import load_run_config_yaml

if TYPE_CHECKING:
    from dagster._config.config_schema import UserConfigSchema

# This should have a bound of ConfigurableClass, but the type checker has difficulty with putting
# the ConfigurableClass interface on our storage classes. The concrete implementations of these
# classes end up implementing the ConfigurableClass interface without inheriting from it, so we
# don't actually bound this var.
T_ConfigurableClass = TypeVar("T_ConfigurableClass")


class ConfigurableClassDataSerializer(NamedTupleSerializer["ConfigurableClassData"]):
    def pack_items(self, *args, **kwargs):
        for k, v in super().pack_items(*args, **kwargs):
            if k == "module_name":
                yield k, convert_dagster_submodule_name(v, "public")  # pyright: ignore[reportArgumentType]
            else:
                yield k, v


[docs] @whitelist_for_serdes(serializer=ConfigurableClassDataSerializer) class ConfigurableClassData( NamedTuple( "_ConfigurableClassData", [ ("module_name", str), ("class_name", str), ("config_yaml", str), ], ) ): """Serializable tuple describing where to find a class and the config fragment that should be used to instantiate it. Users should not instantiate this class directly. Classes intended to be serialized in this way should implement the :py:class:`dagster.serdes.ConfigurableClass` mixin. """ def __new__(cls, module_name: str, class_name: str, config_yaml: str): return super(ConfigurableClassData, cls).__new__( cls, convert_dagster_submodule_name(check.str_param(module_name, "module_name"), "private"), check.str_param(class_name, "class_name"), check.str_param(config_yaml, "config_yaml"), ) @property def config_dict(self) -> Mapping[str, Any]: return check.is_dict(load_run_config_yaml(self.config_yaml), key_type=str) def info_dict(self) -> Mapping[str, Any]: return { "module": self.module_name, "class": self.class_name, "config": self.config_dict, } @overload def rehydrate(self, as_type: None = ...) -> "ConfigurableClass": ... @overload def rehydrate(self, as_type: Type[T_ConfigurableClass]) -> T_ConfigurableClass: ... def rehydrate( self, as_type: Optional[Type[T_ConfigurableClass]] = None ) -> Union["ConfigurableClass", T_ConfigurableClass]: from dagster._config import process_config, resolve_to_config_type from dagster._core.errors import DagsterInvalidConfigError try: module = importlib.import_module(self.module_name) except ModuleNotFoundError: check.failed( f"Couldn't import module {self.module_name} when attempting to load the " f"configurable class {self.module_name}.{self.class_name}" ) try: # All rehydrated classes are expected to implement the ConfigurableClass interface and # will error when we call `klass.from_config_value` and `klass.config_type` below if # they do not. However, not all rehydrated classes actually have `ConfigurableClass` as # an ancestor due to some subtleties around multiple abstract classes that cause an # error when `ConfigurableClass` is added as an ancestor to storage classes. klass = cast(Type[ConfigurableClass], getattr(module, self.class_name)) except AttributeError: check.failed( f"Couldn't find class {self.class_name} in module when attempting to load the " f"configurable class {self.module_name}.{self.class_name}" ) if not issubclass(klass, as_type or ConfigurableClass): raise check.CheckError( klass, f"class {self.class_name} in module {self.module_name}", ConfigurableClass, ) config_dict = self.config_dict result = process_config(resolve_to_config_type(klass.config_type()), config_dict) if not result.success: raise DagsterInvalidConfigError( f"Errors whilst loading configuration for {klass.config_type()}.", result.errors, config_dict, ) return klass.from_config_value(self, check.not_none(result.value))
[docs] class ConfigurableClass(ABC): """Abstract mixin for classes that can be loaded from config. This supports a powerful plugin pattern which avoids both a) a lengthy, hard-to-synchronize list of conditional imports / optional extras_requires in dagster core and b) a magic directory or file in which third parties can place plugin packages. Instead, the intention is to make, e.g., run storage, pluggable with a config chunk like: .. code-block:: yaml run_storage: module: very_cool_package.run_storage class: SplendidRunStorage config: magic_word: "quux" This same pattern should eventually be viable for other system components, e.g. engines. The ``ConfigurableClass`` mixin provides the necessary hooks for classes to be instantiated from an instance of ``ConfigurableClassData``. Pieces of the Dagster system which we wish to make pluggable in this way should consume a config type such as: .. code-block:: python {'module': str, 'class': str, 'config': Field(Permissive())} """ @property @abstractmethod def inst_data(self) -> Optional[ConfigurableClassData]: """Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path. """ @classmethod @abstractmethod def config_type(cls) -> "UserConfigSchema": """Get the config type against which to validate a config yaml fragment. The only place config values matching this type are used is inside `from_config_value`. This is an alternative constructor for a class. It is a common pattern for the config type to match constructor arguments, so `from_config_value` The config type against which to validate a config yaml fragment serialized in an instance of ``ConfigurableClassData``. """ ... # We need to raise `NotImplementedError` here because nothing prevents abstract class # methods from being called. raise NotImplementedError(f"{cls.__name__} must implement the config_type classmethod") @classmethod @abstractmethod def from_config_value( cls, inst_data: ConfigurableClassData, config_value: Mapping[str, Any] ) -> Self: """Create an instance of the ConfigurableClass from a validated config value. The config value used here should be derived from the accompanying `inst_data` argument. `inst_data` contains the yaml-serialized config-- this must be parsed and validated/normalized, then passed to this method for object instantiation. This is done in ConfigurableClassData.rehydrate. Args: config_value (dict): The validated config value to use. Typically this should be the ``value`` attribute of a :py:class:`~dagster._core.types.evaluator.evaluation.EvaluateValueResult`. A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass's constructor: .. code-block:: python @classmethod def from_config_value(cls, inst_data, config_value): return MyConfigurableClass(inst_data=inst_data, **config_value) """
def class_from_code_pointer(module_name: str, class_name: str) -> Type[object]: try: module = importlib.import_module(module_name) except ModuleNotFoundError: check.failed( "Couldn't import module {module_name} when attempting to load the class {klass}".format( module_name=module_name, klass=module_name + "." + class_name, ) ) try: return getattr(module, class_name) except AttributeError: check.failed( "Couldn't find class {class_name} in module when attempting to load the " "class {klass}".format( class_name=class_name, klass=module_name + "." + class_name, ) )