Ask AI

Source code for dagster._core.definitions.utils

import keyword
import os
import re
from glob import glob
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Sequence,
    Set,
    Tuple,
    TypeVar,
    Union,
    cast,
)

import yaml

import dagster._check as check
from dagster._core.definitions.asset_key import AssetCheckKey, EntityKey
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.utils import is_valid_email
from dagster._utils.warnings import deprecation_warning, disable_dagster_warnings
from dagster._utils.yaml_utils import merge_yaml_strings, merge_yamls

DEFAULT_OUTPUT = "result"
DEFAULT_GROUP_NAME = "default"  # asset group_name used when none is provided
DEFAULT_IO_MANAGER_KEY = "io_manager"

DISALLOWED_NAMES = set(
    [
        "context",
        "conf",
        "config",
        "meta",
        "arg_dict",
        "dict",
        "input_arg_dict",
        "output_arg_dict",
        "int",
        "str",
        "float",
        "bool",
        "input",
        "output",
        "type",
    ]
    + list(keyword.kwlist)  # just disallow all python keywords
)

INVALID_NAME_CHARS = r"[^A-Za-z0-9_]"
VALID_NAME_REGEX_STR = r"^[A-Za-z0-9_]+$"
VALID_NAME_REGEX = re.compile(VALID_NAME_REGEX_STR)

INVALID_TITLE_CHARACTERS_REGEX_STR = r"[\%\*\"]"
INVALID_TITLE_CHARACTERS_REGEX = re.compile(INVALID_TITLE_CHARACTERS_REGEX_STR)
MAX_TITLE_LENGTH = 100

if TYPE_CHECKING:
    from dagster._core.definitions.asset_key import AssetKey
    from dagster._core.definitions.asset_selection import AssetSelection
    from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
    from dagster._core.definitions.base_asset_graph import BaseAssetGraph
    from dagster._core.definitions.declarative_automation.automation_condition import (
        AutomationCondition,
    )
    from dagster._core.definitions.sensor_definition import SensorDefinition
    from dagster._core.remote_representation.external import RemoteSensor


class NoValueSentinel:
    """Sentinel value to distinguish unset from None."""


def has_valid_name_chars(name: str) -> bool:
    return bool(VALID_NAME_REGEX.match(name))


def check_valid_name(name: str, allow_list: Optional[List[str]] = None) -> str:
    check.str_param(name, "name")

    if allow_list and name in allow_list:
        return name

    if name in DISALLOWED_NAMES:
        raise DagsterInvalidDefinitionError(
            f'"{name}" is not a valid name in Dagster. It conflicts with a Dagster or python'
            " reserved keyword."
        )

    check_valid_chars(name)

    check.invariant(is_valid_name(name))
    return name


def check_valid_chars(name: str):
    if not has_valid_name_chars(name):
        raise DagsterInvalidDefinitionError(
            f'"{name}" is not a valid name in Dagster. Names must be in regex'
            f" {VALID_NAME_REGEX_STR}."
        )


def is_valid_name(name: str) -> bool:
    check.str_param(name, "name")

    return name not in DISALLOWED_NAMES and has_valid_name_chars(name)


def is_valid_title_and_reason(title: Optional[str]) -> Tuple[bool, Optional[str]]:
    check.opt_str_param(title, "title")

    if title is None:
        return True, None

    if len(title) > MAX_TITLE_LENGTH:
        return (
            False,
            f'"{title}" ({len(title)} characters) is not a valid title in Dagster. Titles must not be longer than {MAX_TITLE_LENGTH}.',
        )

    if not is_valid_title_chars(title):
        return (
            False,
            f'"{title}" is not a valid title in Dagster. Titles must not contain regex {INVALID_TITLE_CHARACTERS_REGEX_STR}.',
        )

    return True, None


def check_valid_title(title: Optional[str]) -> Optional[str]:
    """A title is distinguished from a name in that the title is a descriptive string meant for display in the UI.
    It is not used as an identifier for an object.
    """
    is_valid, reason = is_valid_title_and_reason(title)
    if not is_valid:
        raise DagsterInvariantViolationError(reason)

    return title


def is_valid_title(title: Optional[str]) -> bool:
    return is_valid_title_and_reason(title)[0]


def is_valid_title_chars(title: str):
    return not bool(INVALID_TITLE_CHARACTERS_REGEX.search(title))


def _kv_str(key: object, value: object) -> str:
    return f'{key}="{value!r}"'


def struct_to_string(name: str, **kwargs: object) -> str:
    # Sort the kwargs to ensure consistent representations across Python versions
    props_str = ", ".join([_kv_str(key, value) for key, value in sorted(kwargs.items())])
    return f"{name}({props_str})"


def validate_asset_owner(owner: str, key: "AssetKey") -> None:
    if not is_valid_email(owner) and not (owner.startswith("team:") and len(owner) > 5):
        raise DagsterInvalidDefinitionError(
            f"Invalid owner '{owner}' for asset '{key}'. Owner must be an email address or a team "
            "name prefixed with 'team:'."
        )


def validate_group_name(group_name: Optional[str]) -> None:
    """Ensures a string name is valid and returns a default if no name provided."""
    if group_name:
        check_valid_chars(group_name)
    elif group_name == "":
        raise DagsterInvalidDefinitionError(
            "Empty asset group name was provided, which is not permitted. "
            "Set group_name=None to use the default group_name or set non-empty string"
        )


def normalize_group_name(group_name: Optional[str]) -> str:
    """Ensures a string name is valid and returns a default if no name provided."""
    validate_group_name(group_name)
    return group_name or DEFAULT_GROUP_NAME


[docs] def config_from_files(config_files: Sequence[str]) -> Mapping[str, Any]: """Constructs run config from YAML files. Args: config_files (List[str]): List of paths or glob patterns for yaml files to load and parse as the run config. Returns: Dict[str, Any]: A run config dictionary constructed from provided YAML files. Raises: FileNotFoundError: When a config file produces no results DagsterInvariantViolationError: When one of the YAML files is invalid and has a parse error. """ config_files = check.opt_sequence_param(config_files, "config_files") filenames = [] for file_glob in config_files or []: globbed_files = glob(file_glob) if not globbed_files: raise DagsterInvariantViolationError( f'File or glob pattern "{file_glob}" for "config_files" produced no results.' ) filenames += [os.path.realpath(globbed_file) for globbed_file in globbed_files] try: run_config = merge_yamls(filenames) except yaml.YAMLError as err: raise DagsterInvariantViolationError( f"Encountered error attempting to parse yaml. Parsing files {filenames} " f"loaded by file/patterns {config_files}." ) from err return check.is_dict(cast(Dict[str, object], run_config), key_type=str)
[docs] def config_from_yaml_strings(yaml_strings: Sequence[str]) -> Mapping[str, Any]: """Static constructor for run configs from YAML strings. Args: yaml_strings (List[str]): List of yaml strings to parse as the run config. Returns: Dict[Str, Any]: A run config dictionary constructed from the provided yaml strings Raises: DagsterInvariantViolationError: When one of the YAML documents is invalid and has a parse error. """ yaml_strings = check.sequence_param(yaml_strings, "yaml_strings", of_type=str) try: run_config = merge_yaml_strings(yaml_strings) except yaml.YAMLError as err: raise DagsterInvariantViolationError( f"Encountered error attempting to parse yaml. Parsing YAMLs {yaml_strings} " ) from err return check.is_dict(cast(Dict[str, object], run_config), key_type=str)
[docs] def config_from_pkg_resources(pkg_resource_defs: Sequence[Tuple[str, str]]) -> Mapping[str, Any]: """Load a run config from a package resource, using :py:func:`pkg_resources.resource_string`. Example: .. code-block:: python config_from_pkg_resources( pkg_resource_defs=[ ('dagster_examples.airline_demo.environments', 'local_base.yaml'), ('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'), ], ) Args: pkg_resource_defs (List[(str, str)]): List of pkg_resource modules/files to load as the run config. Returns: Dict[Str, Any]: A run config dictionary constructed from the provided yaml strings Raises: DagsterInvariantViolationError: When one of the YAML documents is invalid and has a parse error. """ import pkg_resources # expensive, import only on use pkg_resource_defs = check.sequence_param(pkg_resource_defs, "pkg_resource_defs", of_type=tuple) try: yaml_strings = [ pkg_resources.resource_string(*pkg_resource_def).decode("utf-8") for pkg_resource_def in pkg_resource_defs ] except (ModuleNotFoundError, FileNotFoundError, UnicodeDecodeError) as err: raise DagsterInvariantViolationError( "Encountered error attempting to parse yaml. Loading YAMLs from " f"package resources {pkg_resource_defs}." ) from err return config_from_yaml_strings(yaml_strings=yaml_strings)
def resolve_automation_condition( automation_condition: Optional["AutomationCondition"], auto_materialize_policy: Optional["AutoMaterializePolicy"], ) -> Optional["AutomationCondition"]: if auto_materialize_policy is not None: deprecation_warning( "Parameter `auto_materialize_policy`", "1.9", additional_warn_text="Use `automation_condition` instead.", ) if automation_condition is not None: raise DagsterInvariantViolationError( "Cannot supply both `automation_condition` and `auto_materialize_policy`" ) return auto_materialize_policy.to_automation_condition() else: return automation_condition T = TypeVar("T") def dedupe_object_refs(objects: Optional[Iterable[T]]) -> Sequence[T]: """Dedupe definitions by reference equality.""" return list({id(obj): obj for obj in objects}.values()) if objects is not None else [] def get_default_automation_condition_sensor( sensors: Sequence["SensorDefinition"], asset_graph: "BaseAssetGraph", ) -> Optional["SensorDefinition"]: """Given a list of existing sensors, adds an AutomationConditionSensorDefinition with name `default_automation_condition_sensor` that targets all assets/asset_checks that have an automation_condition and are not targeted by an existing AutomationConditionSensorDefinition if any such untargeted assets/asset_checks exist. """ from dagster._core.definitions.automation_condition_sensor_definition import ( DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, AutomationConditionSensorDefinition, ) with disable_dagster_warnings(): sensor_selection = get_default_automation_condition_sensor_selection(sensors, asset_graph) if sensor_selection: return AutomationConditionSensorDefinition( DEFAULT_AUTOMATION_CONDITION_SENSOR_NAME, target=sensor_selection ) return None def get_default_automation_condition_sensor_selection( sensors: Sequence[Union["SensorDefinition", "RemoteSensor"]], asset_graph: "BaseAssetGraph" ) -> Optional["AssetSelection"]: from dagster._core.definitions.asset_selection import AssetSelection from dagster._core.definitions.sensor_definition import SensorType automation_condition_sensors = sorted( ( s for s in sensors if s.sensor_type in (SensorType.AUTO_MATERIALIZE, SensorType.AUTOMATION) ), key=lambda s: s.name, ) automation_condition_keys = set() for k in asset_graph.materializable_asset_keys | asset_graph.asset_check_keys: if asset_graph.get(k).automation_condition is not None: automation_condition_keys.add(k) has_auto_observe_keys = False for k in asset_graph.observable_asset_keys: if ( # for backcompat, treat auto-observe assets as if they have a condition asset_graph.get(k).automation_condition is not None or asset_graph.get(k).auto_observe_interval_minutes is not None ): has_auto_observe_keys = True automation_condition_keys.add(k) # get the set of keys that are handled by an existing sensor covered_keys: Set[EntityKey] = set() for sensor in automation_condition_sensors: selection = check.not_none(sensor.asset_selection) covered_keys = covered_keys.union( selection.resolve(asset_graph) | selection.resolve_checks(asset_graph) ) default_sensor_keys = automation_condition_keys - covered_keys if len(default_sensor_keys) > 0: # Use AssetSelection.all if the default sensor is the only sensor - otherwise # enumerate the assets that are not already included in some other # non-default sensor default_sensor_asset_selection = AssetSelection.all(include_sources=has_auto_observe_keys) # if there are any asset checks, include checks in the selection if any(isinstance(k, AssetCheckKey) for k in default_sensor_keys): default_sensor_asset_selection |= AssetSelection.all_asset_checks() # remove any selections that are already covered for sensor in automation_condition_sensors: default_sensor_asset_selection = default_sensor_asset_selection - check.not_none( sensor.asset_selection ) return default_sensor_asset_selection # no additional sensor required else: return None