import typing
from enum import Enum as PythonEnum
from typing import TYPE_CHECKING, Dict, Iterator, Optional, Sequence, cast
import dagster._check as check
from dagster._annotations import public
from dagster._builtins import BuiltinEnum
from dagster._config import UserConfigSchema
from dagster._serdes import whitelist_for_serdes
if TYPE_CHECKING:
from dagster._config.snap import ConfigSchemaSnapshot, ConfigTypeSnap
@whitelist_for_serdes
class ConfigTypeKind(PythonEnum):
ANY = "ANY"
SCALAR = "SCALAR"
ENUM = "ENUM"
SELECTOR = "SELECTOR"
STRICT_SHAPE = "STRICT_SHAPE"
PERMISSIVE_SHAPE = "PERMISSIVE_SHAPE"
SCALAR_UNION = "SCALAR_UNION"
MAP = "MAP"
# Closed generic types
ARRAY = "ARRAY"
NONEABLE = "NONEABLE"
@staticmethod
def has_fields(kind: "ConfigTypeKind") -> bool:
check.inst_param(kind, "kind", ConfigTypeKind)
return kind == ConfigTypeKind.SELECTOR or ConfigTypeKind.is_shape(kind)
@staticmethod
def is_closed_generic(kind: "ConfigTypeKind") -> bool:
check.inst_param(kind, "kind", ConfigTypeKind)
return (
kind == ConfigTypeKind.ARRAY
or kind == ConfigTypeKind.NONEABLE
or kind == ConfigTypeKind.SCALAR_UNION
or kind == ConfigTypeKind.MAP
)
@staticmethod
def is_shape(kind: "ConfigTypeKind") -> bool:
check.inst_param(kind, "kind", ConfigTypeKind)
return kind == ConfigTypeKind.STRICT_SHAPE or kind == ConfigTypeKind.PERMISSIVE_SHAPE
@staticmethod
def is_selector(kind: "ConfigTypeKind") -> bool:
check.inst_param(kind, "kind", ConfigTypeKind)
return kind == ConfigTypeKind.SELECTOR
class ConfigType:
"""The class backing DagsterTypes as they are used processing configuration data."""
def __init__(
self,
key: str,
kind: ConfigTypeKind,
given_name: Optional[str] = None,
description: Optional[str] = None,
type_params: Optional[Sequence["ConfigType"]] = None,
):
self.key: str = check.str_param(key, "key")
self.kind: ConfigTypeKind = check.inst_param(kind, "kind", ConfigTypeKind)
self.given_name: Optional[str] = check.opt_str_param(given_name, "given_name")
self._description: Optional[str] = check.opt_str_param(description, "description")
self.type_params: Optional[Sequence[ConfigType]] = (
check.sequence_param(type_params, "type_params", of_type=ConfigType)
if type_params
else None
)
# memoized snap representation
self._snap: Optional["ConfigTypeSnap"] = None
@property
def description(self) -> Optional[str]:
return self._description
@staticmethod
def from_builtin_enum(builtin_enum: typing.Any) -> "ConfigType":
check.invariant(BuiltinEnum.contains(builtin_enum), "param must be member of BuiltinEnum")
return _CONFIG_MAP[builtin_enum]
def post_process(self, value):
"""Implement this in order to take a value provided by the user
and perform computation on it. This can be done to coerce data types,
fetch things from the environment (e.g. environment variables), or
to do custom validation. If the value is not valid, throw a
PostProcessingError. Otherwise return the coerced value.
"""
return value
def get_snapshot(self) -> "ConfigTypeSnap":
from dagster._config.snap import snap_from_config_type
if self._snap is None:
self._snap = snap_from_config_type(self)
return self._snap
def type_iterator(self) -> Iterator["ConfigType"]:
yield self
def get_schema_snapshot(self) -> "ConfigSchemaSnapshot":
from dagster._config.snap import ConfigSchemaSnapshot
return ConfigSchemaSnapshot(
all_config_snaps_by_key={ct.key: ct.get_snapshot() for ct in self.type_iterator()}
)
@whitelist_for_serdes
class ConfigScalarKind(PythonEnum):
INT = "INT"
STRING = "STRING"
FLOAT = "FLOAT"
BOOL = "BOOL"
# Scalars, Composites, Selectors, Lists, Optional, Any
class ConfigScalar(ConfigType):
def __init__(
self,
key: str,
given_name: Optional[str],
scalar_kind: ConfigScalarKind,
**kwargs: typing.Any,
):
self.scalar_kind = check.inst_param(scalar_kind, "scalar_kind", ConfigScalarKind)
super(ConfigScalar, self).__init__(
key, kind=ConfigTypeKind.SCALAR, given_name=given_name, **kwargs
)
class BuiltinConfigScalar(ConfigScalar):
def __init__(self, scalar_kind, description=None):
super(BuiltinConfigScalar, self).__init__(
key=type(self).__name__,
given_name=type(self).__name__,
scalar_kind=scalar_kind,
description=description,
)
class Int(BuiltinConfigScalar):
def __init__(self):
super(Int, self).__init__(scalar_kind=ConfigScalarKind.INT, description="")
class String(BuiltinConfigScalar):
def __init__(self):
super(String, self).__init__(scalar_kind=ConfigScalarKind.STRING, description="")
class Bool(BuiltinConfigScalar):
def __init__(self):
super(Bool, self).__init__(scalar_kind=ConfigScalarKind.BOOL, description="")
class Float(BuiltinConfigScalar):
def __init__(self):
super(Float, self).__init__(scalar_kind=ConfigScalarKind.FLOAT, description="")
def post_process(self, value):
return float(value)
class Any(ConfigType):
def __init__(self):
super(Any, self).__init__(
key="Any",
given_name="Any",
kind=ConfigTypeKind.ANY,
)
[docs]
class Noneable(ConfigType):
"""Defines a configuration type that is the union of ``NoneType`` and the type ``inner_type``.
Args:
inner_type (type):
The type of the values that this configuration type can contain.
**Examples:**
.. code-block:: python
config_schema={"name": Noneable(str)}
config={"name": "Hello"} # Ok
config={"name": None} # Ok
config={} # Error
"""
def __init__(self, inner_type: object):
from dagster._config.field import resolve_to_config_type
self.inner_type = cast(ConfigType, resolve_to_config_type(inner_type))
super(Noneable, self).__init__(
key=f"Noneable.{self.inner_type.key}",
kind=ConfigTypeKind.NONEABLE,
type_params=[self.inner_type],
)
def type_iterator(self) -> Iterator["ConfigType"]:
yield from self.inner_type.type_iterator()
yield from super().type_iterator()
[docs]
class Array(ConfigType):
"""Defines an array (list) configuration type that contains values of type ``inner_type``.
Args:
inner_type (type):
The type of the values that this configuration type can contain.
"""
def __init__(self, inner_type: object):
from dagster._config.field import resolve_to_config_type
self.inner_type = cast(ConfigType, resolve_to_config_type(inner_type))
super(Array, self).__init__(
key=f"Array.{self.inner_type.key}",
type_params=[self.inner_type],
kind=ConfigTypeKind.ARRAY,
)
@public
@property
def description(self) -> str:
"""A human-readable description of this Array type."""
return f"List of {self.key}"
def type_iterator(self) -> Iterator["ConfigType"]:
yield from self.inner_type.type_iterator()
yield from super().type_iterator()
[docs]
class EnumValue:
"""Define an entry in a :py:class:`Enum`.
Args:
config_value (str):
The string representation of the config to accept when passed.
python_value (Optional[Any]):
The python value to convert the enum entry in to. Defaults to the ``config_value``.
description (Optional[str]):
A human-readable description of the enum entry.
"""
def __init__(
self,
config_value: str,
python_value: Optional[object] = None,
description: Optional[str] = None,
):
self.config_value = check.str_param(config_value, "config_value")
self.python_value = config_value if python_value is None else python_value
self.description = check.opt_str_param(description, "description")
[docs]
class Enum(ConfigType):
"""Defines a enum configuration type that allows one of a defined set of possible values.
Args:
name (str):
The name of the enum configuration type.
enum_values (List[EnumValue]):
The set of possible values for the enum configuration type.
**Examples:**
.. code-block:: python
@op(
config_schema=Field(
Enum(
'CowboyType',
[
EnumValue('good'),
EnumValue('bad'),
EnumValue('ugly'),
]
)
)
)
def resolve_standoff(context):
# ...
"""
def __init__(self, name: str, enum_values: Sequence[EnumValue]):
check.str_param(name, "name")
super(Enum, self).__init__(key=name, given_name=name, kind=ConfigTypeKind.ENUM)
self.enum_values = check.sequence_param(enum_values, "enum_values", of_type=EnumValue)
self._valid_python_values = {ev.python_value for ev in enum_values}
check.invariant(len(self._valid_python_values) == len(enum_values))
self._valid_config_values = {ev.config_value for ev in enum_values}
check.invariant(len(self._valid_config_values) == len(enum_values))
@property
def config_values(self):
return [ev.config_value for ev in self.enum_values]
def is_valid_config_enum_value(self, config_value):
return config_value in self._valid_config_values
def post_process(self, value: typing.Any) -> typing.Any:
if isinstance(value, PythonEnum):
value = value.name
for ev in self.enum_values:
if ev.config_value == value:
return ev.python_value
check.failed(f"Should never reach this. config_value should be pre-validated. Got {value}")
@classmethod
def from_python_enum(cls, enum, name=None):
"""Create a Dagster enum corresponding to an existing Python enum.
Args:
enum (enum.EnumMeta):
The class representing the enum.
name (Optional[str]):
The name for the enum. If not present, `enum.__name__` will be used.
Example:
.. code-block:: python
class Color(enum.Enum):
RED = enum.auto()
GREEN = enum.auto()
BLUE = enum.auto()
@op(
config_schema={"color": Field(Enum.from_python_enum(Color))}
)
def select_color(context):
assert context.op_config["color"] == Color.RED
"""
if name is None:
name = enum.__name__
return cls(name, [EnumValue(v.name, python_value=v) for v in enum])
@classmethod
def from_python_enum_direct_values(cls, enum, name=None):
"""Create a Dagster enum corresponding to an existing Python enum, where the direct values are passed instead of symbolic values (IE, enum.symbol.value as opposed to enum.symbol).
This is necessary for internal usage, as the symbolic values are not serializable.
Args:
enum (enum.EnumMeta):
The class representing the enum.
name (Optional[str]):
The name for the enum. If not present, `enum.__name__` will be used.
Example:
.. code-block:: python
class Color(enum.Enum):
RED = enum.auto()
GREEN = enum.auto()
BLUE = enum.auto()
@op(
config_schema={"color": Field(Enum.from_python_enum(Color))}
)
def select_color(context):
assert context.op_config["color"] == Color.RED.value
"""
if name is None:
name = enum.__name__
return cls(name, [EnumValue(v.name, python_value=v.value) for v in enum])
[docs]
class ScalarUnion(ConfigType):
"""Defines a configuration type that accepts a scalar value OR a non-scalar value like a
:py:class:`~dagster.List`, :py:class:`~dagster.Dict`, or :py:class:`~dagster.Selector`.
This allows runtime scalars to be configured without a dictionary with the key ``value`` and
instead just use the scalar value directly. However this still leaves the option to
load scalars from a json or pickle file.
Args:
scalar_type (type):
The scalar type of values that this configuration type can hold. For example,
:py:class:`~python:int`, :py:class:`~python:float`, :py:class:`~python:bool`,
or :py:class:`~python:str`.
non_scalar_schema (ConfigSchema):
The schema of a non-scalar Dagster configuration type. For example, :py:class:`List`,
:py:class:`Dict`, or :py:class:`~dagster.Selector`.
key (Optional[str]):
The configuation type's unique key. If not set, then the key will be set to
``ScalarUnion.{scalar_type}-{non_scalar_schema}``.
**Examples:**
.. code-block:: yaml
graph:
transform_word:
inputs:
word:
value: foobar
becomes, optionally,
.. code-block:: yaml
graph:
transform_word:
inputs:
word: foobar
"""
def __init__(
self,
scalar_type: typing.Any,
non_scalar_schema: UserConfigSchema,
_key: Optional[str] = None,
):
from dagster._config.field import resolve_to_config_type
self.scalar_type = check.inst(resolve_to_config_type(scalar_type), ConfigType)
self.non_scalar_type = resolve_to_config_type(non_scalar_schema)
check.param_invariant(self.scalar_type.kind == ConfigTypeKind.SCALAR, "scalar_type")
check.param_invariant(
self.non_scalar_type.kind
in {ConfigTypeKind.STRICT_SHAPE, ConfigTypeKind.SELECTOR, ConfigTypeKind.ARRAY},
"non_scalar_type",
)
# https://github.com/dagster-io/dagster/issues/2133
key = check.opt_str_param(
_key, "_key", f"ScalarUnion.{self.scalar_type.key}-{self.non_scalar_type.key}"
)
super(ScalarUnion, self).__init__(
key=key,
kind=ConfigTypeKind.SCALAR_UNION,
type_params=[self.scalar_type, self.non_scalar_type],
)
def type_iterator(self) -> Iterator["ConfigType"]:
yield from self.scalar_type.type_iterator()
yield from self.non_scalar_type.type_iterator()
yield from super().type_iterator()
ConfigAnyInstance: Any = Any()
ConfigBoolInstance: Bool = Bool()
ConfigFloatInstance: Float = Float()
ConfigIntInstance: Int = Int()
ConfigStringInstance: String = String()
_CONFIG_MAP: Dict[typing.Any, ConfigType] = {
BuiltinEnum.ANY: ConfigAnyInstance,
BuiltinEnum.BOOL: ConfigBoolInstance,
BuiltinEnum.FLOAT: ConfigFloatInstance,
BuiltinEnum.INT: ConfigIntInstance,
BuiltinEnum.STRING: ConfigStringInstance,
}
_CONFIG_MAP_BY_NAME: Dict[str, ConfigType] = {
"Any": ConfigAnyInstance,
"Bool": ConfigBoolInstance,
"Float": ConfigFloatInstance,
"Int": ConfigIntInstance,
"String": ConfigStringInstance,
}
ALL_CONFIG_BUILTINS = set(_CONFIG_MAP.values())
def get_builtin_scalar_by_name(type_name: str):
if type_name not in _CONFIG_MAP_BY_NAME:
check.failed(f"Scalar {type_name} is not supported")
return _CONFIG_MAP_BY_NAME[type_name]