Ask AI

Source code for dagster._core.definitions.config

from typing import Any, Callable, Mapping, NamedTuple, Optional, Union, cast

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._builtins import BuiltinEnum
from dagster._config import (
    ConfigType,
    is_supported_config_python_builtin,
    process_config,
    resolve_defaults,
    validate_config,
)
from dagster._core.definitions.definition_config_schema import (
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from dagster._core.errors import DagsterInvalidConfigError

ConfigMappingFn: TypeAlias = Callable[[Any], Any]


def is_callable_valid_config_arg(config: Union[Callable[..., Any], Mapping[str, object]]) -> bool:
    return BuiltinEnum.contains(config) or is_supported_config_python_builtin(config)


[docs] class ConfigMapping( NamedTuple( "_ConfigMapping", [ ("config_fn", Callable[[Any], Any]), ("config_schema", IDefinitionConfigSchema), ("receive_processed_config_values", Optional[bool]), ], ) ): """Defines a config mapping for a graph (or job). By specifying a config mapping function, you can override the configuration for the child ops and graphs contained within a graph. Config mappings require the configuration schema to be specified as ``config_schema``, which will be exposed as the configuration schema for the graph, as well as a configuration mapping function, ``config_fn``, which maps the config provided to the graph to the config that will be provided to the child nodes. Args: config_fn (Callable[[dict], dict]): The function that will be called to map the graph config to a config appropriate for the child nodes. config_schema (ConfigSchema): The schema of the graph config. receive_processed_config_values (Optional[bool]): If true, config values provided to the config_fn will be converted to their dagster types before being passed in. For example, if this value is true, enum config passed to config_fn will be actual enums, while if false, then enum config passed to config_fn will be strings. """ def __new__( cls, config_fn: ConfigMappingFn, config_schema: Optional[Any] = None, receive_processed_config_values: Optional[bool] = None, ): return super(ConfigMapping, cls).__new__( cls, config_fn=check.callable_param(config_fn, "config_fn"), config_schema=convert_user_facing_definition_config_schema(config_schema), receive_processed_config_values=check.opt_bool_param( receive_processed_config_values, "receive_processed_config_values" ), ) def resolve_from_unvalidated_config(self, config: Any) -> Any: """Validates config against outer config schema, and calls mapping against validated config.""" receive_processed_config_values = check.opt_bool_param( self.receive_processed_config_values, "receive_processed_config_values", default=True ) if receive_processed_config_values: outer_evr = process_config( self.config_schema.config_type, config, ) else: outer_evr = validate_config( self.config_schema.config_type, config, ) if not outer_evr.success: raise DagsterInvalidConfigError( "Error in config mapping ", outer_evr.errors, config, ) outer_config = outer_evr.value if not receive_processed_config_values: outer_config = resolve_defaults( cast(ConfigType, self.config_schema.config_type), outer_config, ).value return self.config_fn(outer_config) def resolve_from_validated_config(self, config: Any) -> Any: if self.receive_processed_config_values is not None: check.failed( "`receive_processed_config_values` parameter has been set, but only applies to " "unvalidated config." ) return self.config_fn(config)