Ask AI

Source code for dagster._core.definitions.load_asset_checks_from_modules

import inspect
from importlib import import_module
from types import ModuleType
from typing import Iterable, Optional, Sequence, Set, cast

import dagster._check as check
from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_key import (
    CoercibleToAssetKeyPrefix,
    check_opt_coercible_to_asset_key_prefix_param,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.load_assets_from_modules import (
    find_modules_in_package,
    find_objects_in_module_of_types,
    prefix_assets,
)


def _checks_from_modules(modules: Iterable[ModuleType]) -> Sequence[AssetChecksDefinition]:
    checks = []
    ids: Set[int] = set()
    for module in modules:
        for c in find_objects_in_module_of_types(module, AssetsDefinition):
            if has_only_asset_checks(c) and id(c) not in ids:
                checks.append(cast(AssetChecksDefinition, c))
                ids.add(id(c))
    return checks


def _checks_with_attributes(
    checks_defs: Sequence[AssetChecksDefinition],
    asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
) -> Sequence[AssetChecksDefinition]:
    if asset_key_prefix:
        modified_checks, _ = prefix_assets(checks_defs, asset_key_prefix, [], None)
        return [
            AssetChecksDefinition.create(
                keys_by_input_name=c.keys_by_input_name,
                node_def=c.op,
                check_specs_by_output_name=c.check_specs_by_output_name,
                resource_defs=c.resource_defs,
                can_subset=c.can_subset,
            )
            for c in modified_checks
        ]
    else:
        return checks_defs


[docs] def load_asset_checks_from_modules( modules: Iterable[ModuleType], asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> Sequence[AssetChecksDefinition]: """Constructs a list of asset checks from the given modules. This is most often used in conjunction with a call to `load_assets_from_modules`. Args: modules (Iterable[ModuleType]): The Python modules to look for checks inside. asset_key_prefix (Optional[Union[str, Sequence[str]]]): The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_modules. Returns: Sequence[AssetChecksDefinition]: A list containing asset checks defined in the given modules. """ asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) return _checks_with_attributes(_checks_from_modules(modules), asset_key_prefix=asset_key_prefix)
[docs] def load_asset_checks_from_current_module( asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> Sequence[AssetChecksDefinition]: """Constructs a list of asset checks from the module where this function is called. This is most often used in conjunction with a call to `load_assets_from_current_module`. Args: asset_key_prefix (Optional[Union[str, Sequence[str]]]): The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_current_module. Returns: Sequence[AssetChecksDefinition]: A list containing asset checks defined in the current module. """ caller = inspect.stack()[1] module = inspect.getmodule(caller[0]) if module is None: check.failed("Could not find a module for the caller") asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) return _checks_with_attributes( _checks_from_modules([module]), asset_key_prefix=asset_key_prefix )
[docs] def load_asset_checks_from_package_module( package_module: ModuleType, asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None ) -> Sequence[AssetChecksDefinition]: """Constructs a list of asset checks from all sub-modules of the given package module. This is most often used in conjunction with a call to `load_assets_from_package_module`. Args: package_module (ModuleType): The Python module to look for checks inside. asset_key_prefix (Optional[Union[str, Sequence[str]]]): The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_package_module. Returns: Sequence[AssetChecksDefinition]: A list containing asset checks defined in the package. """ asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) return _checks_with_attributes( _checks_from_modules(find_modules_in_package(package_module)), asset_key_prefix=asset_key_prefix, )
[docs] def load_asset_checks_from_package_name( package_name: str, asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None ) -> Sequence[AssetChecksDefinition]: """Constructs a list of asset checks from all sub-modules of the given package. This is most often used in conjunction with a call to `load_assets_from_package_name`. Args: package_name (str): The name of the Python package to look for checks inside. asset_key_prefix (Optional[Union[str, Sequence[str]]]): The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_package_name. Returns: Sequence[AssetChecksDefinition]: A list containing asset checks defined in the package. """ asset_key_prefix = check_opt_coercible_to_asset_key_prefix_param( asset_key_prefix, "asset_key_prefix" ) package_module = import_module(package_name) return _checks_with_attributes( _checks_from_modules(find_modules_in_package(package_module)), asset_key_prefix=asset_key_prefix, )