Ask AI

Source code for dagster._core.definitions.asset_check_spec

from enum import Enum
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.asset_key import (
    AssetKey,
    CoercibleToAssetKey,
    CoercibleToAssetKeyPrefix,
)
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._serdes.serdes import whitelist_for_serdes

if TYPE_CHECKING:
    from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
    from dagster._core.definitions.assets import AssetsDefinition
    from dagster._core.definitions.source_asset import SourceAsset


[docs]@whitelist_for_serdes class AssetCheckSeverity(Enum): """Severity level for an AssetCheckResult. - WARN: a potential issue with the asset - ERROR: a definite issue with the asset Severity does not impact execution of the asset or downstream assets. """ WARN = "WARN" ERROR = "ERROR"
[docs]@whitelist_for_serdes(old_storage_names={"AssetCheckHandle"}) class AssetCheckKey(NamedTuple): """Check names are expected to be unique per-asset. Thus, this combination of asset key and check name uniquely identifies an asset check within a deployment. """ asset_key: PublicAttr[AssetKey] name: PublicAttr[str] @staticmethod def from_graphql_input(graphql_input: Mapping[str, Any]) -> "AssetCheckKey": return AssetCheckKey( asset_key=AssetKey.from_graphql_input(graphql_input["assetKey"]), name=graphql_input["name"], ) def with_asset_key_prefix(self, prefix: CoercibleToAssetKeyPrefix) -> "AssetCheckKey": return self._replace(asset_key=self.asset_key.with_prefix(prefix)) def to_user_string(self) -> str: return f"{self.asset_key.to_user_string()}:{self.name}"
[docs]class AssetCheckSpec( NamedTuple( "_AssetCheckSpec", [ ("name", PublicAttr[str]), ("asset_key", PublicAttr[AssetKey]), ("description", PublicAttr[Optional[str]]), ("additional_deps", PublicAttr[Iterable["AssetDep"]]), ( "blocking", # intentionally not public, see https://github.com/dagster-io/dagster/issues/20659 bool, ), ("metadata", PublicAttr[Optional[Mapping[str, Any]]]), ], ) ): """Defines information about an asset check, except how to execute it. AssetCheckSpec is often used as an argument to decorators that decorator a function that can execute multiple checks - e.g. `@asset`, and `@multi_asset`. It defines one of the checks that will be executed inside that function. Args: name (str): Name of the check. asset (Union[AssetKey, Sequence[str], str, AssetsDefinition, SourceAsset]): The asset that the check applies to. description (Optional[str]): Description for the check. additional_deps (Optional[Iterable[AssetDep]]): Additional dependencies for the check. The check relies on these assets in some way, but the result of the check only applies to the asset specified by `asset`. For example, the check may test that `asset` has matching data with an asset in `additional_deps`. This field holds both `additional_deps` and `additional_ins` passed to @asset_check. metadata (Optional[Mapping[str, Any]]): A dict of static metadata for this asset check. """ def __new__( cls, name: str, *, asset: Union[CoercibleToAssetKey, "AssetsDefinition", "SourceAsset"], description: Optional[str] = None, additional_deps: Optional[Iterable["CoercibleToAssetDep"]] = None, blocking: bool = False, metadata: Optional[RawMetadataMapping] = None, ): from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates asset_key = AssetKey.from_coercible_or_definition(asset) additional_asset_deps = coerce_to_deps_and_check_duplicates( additional_deps, AssetCheckKey(asset_key, name) ) for dep in additional_asset_deps: if dep.asset_key == asset_key: raise ValueError( f"Asset check {name} for asset {asset_key.to_string()} cannot have an additional " f"dependency on asset {asset_key.to_string()}." ) return super().__new__( cls, name=check.str_param(name, "name"), asset_key=asset_key, description=check.opt_str_param(description, "description"), additional_deps=additional_asset_deps, blocking=check.bool_param(blocking, "blocking"), metadata=check.opt_mapping_param(metadata, "metadata", key_type=str), ) def get_python_identifier(self) -> str: """Returns a string uniquely identifying the asset check, that uses only the characters allowed in a Python identifier. """ return f"{self.asset_key.to_python_identifier()}_{self.name}".replace(".", "_") @property def key(self) -> AssetCheckKey: return AssetCheckKey(self.asset_key, self.name) def with_asset_key_prefix(self, prefix: CoercibleToAssetKeyPrefix) -> "AssetCheckSpec": return self._replace(asset_key=self.asset_key.with_prefix(prefix))