from typing import Any, Callable, Mapping, NamedTuple, Optional, Union, cast
from typing_extensions import TypeAlias
import dagster._check as check
from dagster._builtins import BuiltinEnum
from dagster._config import (
ConfigType,
is_supported_config_python_builtin,
process_config,
resolve_defaults,
validate_config,
)
from dagster._core.definitions.definition_config_schema import (
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)
from dagster._core.errors import DagsterInvalidConfigError
ConfigMappingFn: TypeAlias = Callable[[Any], Any]
def is_callable_valid_config_arg(config: Union[Callable[..., Any], Mapping[str, object]]) -> bool:
return BuiltinEnum.contains(config) or is_supported_config_python_builtin(config)
[docs]
class ConfigMapping(
NamedTuple(
"_ConfigMapping",
[
("config_fn", Callable[[Any], Any]),
("config_schema", IDefinitionConfigSchema),
("receive_processed_config_values", Optional[bool]),
],
)
):
"""Defines a config mapping for a graph (or job).
By specifying a config mapping function, you can override the configuration for the child
ops and graphs contained within a graph.
Config mappings require the configuration schema to be specified as ``config_schema``, which will
be exposed as the configuration schema for the graph, as well as a configuration mapping
function, ``config_fn``, which maps the config provided to the graph to the config
that will be provided to the child nodes.
Args:
config_fn (Callable[[dict], dict]): The function that will be called
to map the graph config to a config appropriate for the child nodes.
config_schema (ConfigSchema): The schema of the graph config.
receive_processed_config_values (Optional[bool]): If true, config values provided to the config_fn
will be converted to their dagster types before being passed in. For example, if this
value is true, enum config passed to config_fn will be actual enums, while if false,
then enum config passed to config_fn will be strings.
"""
def __new__(
cls,
config_fn: ConfigMappingFn,
config_schema: Optional[Any] = None,
receive_processed_config_values: Optional[bool] = None,
):
return super(ConfigMapping, cls).__new__(
cls,
config_fn=check.callable_param(config_fn, "config_fn"),
config_schema=convert_user_facing_definition_config_schema(config_schema),
receive_processed_config_values=check.opt_bool_param(
receive_processed_config_values, "receive_processed_config_values"
),
)
def resolve_from_unvalidated_config(self, config: Any) -> Any:
"""Validates config against outer config schema, and calls mapping against validated config."""
receive_processed_config_values = check.opt_bool_param(
self.receive_processed_config_values, "receive_processed_config_values", default=True
)
if receive_processed_config_values:
outer_evr = process_config(
self.config_schema.config_type,
config,
)
else:
outer_evr = validate_config(
self.config_schema.config_type,
config,
)
if not outer_evr.success:
raise DagsterInvalidConfigError(
"Error in config mapping ",
outer_evr.errors,
config,
)
outer_config = outer_evr.value
if not receive_processed_config_values:
outer_config = resolve_defaults(
cast(ConfigType, self.config_schema.config_type),
outer_config,
).value
return self.config_fn(outer_config)
def resolve_from_validated_config(self, config: Any) -> Any:
if self.receive_processed_config_values is not None:
check.failed(
"`receive_processed_config_values` parameter has been set, but only applies to "
"unvalidated config."
)
return self.config_fn(config)