Ask AI

Source code for dagster._core.definitions.asset_check_spec

from enum import Enum
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, CoercibleToAssetKey
from dagster._serdes.serdes import whitelist_for_serdes

if TYPE_CHECKING:
    from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
    from dagster._core.definitions.assets import AssetsDefinition
    from dagster._core.definitions.declarative_automation.automation_condition import (
        AutomationCondition,
    )
    from dagster._core.definitions.source_asset import SourceAsset


[docs] @whitelist_for_serdes class AssetCheckSeverity(Enum): """Severity level for an AssetCheckResult. - WARN: a potential issue with the asset - ERROR: a definite issue with the asset Severity does not impact execution of the asset or downstream assets. """ WARN = "WARN" ERROR = "ERROR"
[docs] class AssetCheckSpec( NamedTuple( "_AssetCheckSpec", [ ("name", PublicAttr[str]), ("asset_key", PublicAttr[AssetKey]), ("description", PublicAttr[Optional[str]]), ("additional_deps", PublicAttr[Iterable["AssetDep"]]), ( "blocking", # intentionally not public, see https://github.com/dagster-io/dagster/issues/20659 bool, ), ("metadata", PublicAttr[Optional[Mapping[str, Any]]]), ("automation_condition", Optional["AutomationCondition[AssetCheckKey]"]), ], ) ): """Defines information about an asset check, except how to execute it. AssetCheckSpec is often used as an argument to decorators that decorator a function that can execute multiple checks - e.g. `@asset`, and `@multi_asset`. It defines one of the checks that will be executed inside that function. Args: name (str): Name of the check. asset (Union[AssetKey, Sequence[str], str, AssetsDefinition, SourceAsset]): The asset that the check applies to. description (Optional[str]): Description for the check. additional_deps (Optional[Iterable[AssetDep]]): Additional dependencies for the check. The check relies on these assets in some way, but the result of the check only applies to the asset specified by `asset`. For example, the check may test that `asset` has matching data with an asset in `additional_deps`. This field holds both `additional_deps` and `additional_ins` passed to @asset_check. metadata (Optional[Mapping[str, Any]]): A dict of static metadata for this asset check. """ def __new__( cls, name: str, *, asset: Union[CoercibleToAssetKey, "AssetsDefinition", "SourceAsset"], description: Optional[str] = None, additional_deps: Optional[Iterable["CoercibleToAssetDep"]] = None, blocking: bool = False, metadata: Optional[Mapping[str, Any]] = None, automation_condition: Optional["AutomationCondition[AssetCheckKey]"] = None, ): from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) asset_key = AssetKey.from_coercible_or_definition(asset) additional_asset_deps = coerce_to_deps_and_check_duplicates( additional_deps, AssetCheckKey(asset_key, name) ) for dep in additional_asset_deps: if dep.asset_key == asset_key: raise ValueError( f"Asset check {name} for asset {asset_key.to_string()} cannot have an additional " f"dependency on asset {asset_key.to_string()}." ) return super().__new__( cls, name=check.str_param(name, "name"), asset_key=asset_key, description=check.opt_str_param(description, "description"), additional_deps=additional_asset_deps, blocking=check.bool_param(blocking, "blocking"), metadata=check.opt_mapping_param(metadata, "metadata", key_type=str), automation_condition=check.opt_inst_param( automation_condition, "automation_condition", AutomationCondition ), ) def get_python_identifier(self) -> str: """Returns a string uniquely identifying the asset check, that uses only the characters allowed in a Python identifier. """ return f"{self.asset_key.to_python_identifier()}_{self.name}".replace(".", "_") @property def key(self) -> AssetCheckKey: return AssetCheckKey(self.asset_key, self.name) def replace_key(self, key: AssetCheckKey) -> "AssetCheckSpec": return self._replace(asset_key=key.asset_key, name=key.name)