Ask AI

Source code for dagster._core.definitions.load_assets_from_modules

import inspect
import pkgutil
from importlib import import_module
from types import ModuleType
from typing import (
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
    cast,
)

import dagster._check as check
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.errors import DagsterInvalidDefinitionError

from .asset_key import (
    AssetKey,
    CoercibleToAssetKeyPrefix,
    check_opt_coercible_to_asset_key_prefix_param,
)
from .assets import AssetsDefinition
from .cacheable_assets import CacheableAssetsDefinition
from .source_asset import SourceAsset


def find_objects_in_module_of_types(module: ModuleType, types) -> Iterator:
    """Yields objects of the given type(s)."""
    for attr in dir(module):
        value = getattr(module, attr)
        if isinstance(value, types):
            yield value
        elif isinstance(value, list) and all(isinstance(el, types) for el in value):
            yield from value


def assets_from_modules(
    modules: Iterable[ModuleType], extra_source_assets: Optional[Sequence[SourceAsset]] = None
) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]:
    """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable
    assets from the given modules.

    Args:
        modules (Iterable[ModuleType]): The Python modules to look for assets inside.
        extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the
            group in addition to the source assets found in the modules.

    Returns:
        Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]]:
            A tuple containing a list of assets, a list of source assets, and a list of
            cacheable assets defined in the given modules.
    """
    asset_ids: Set[int] = set()
    asset_keys: Dict[AssetKey, ModuleType] = dict()
    source_assets: List[SourceAsset] = list(
        check.opt_sequence_param(extra_source_assets, "extra_source_assets", of_type=SourceAsset)
    )
    cacheable_assets: List[CacheableAssetsDefinition] = []
    assets: Dict[AssetKey, AssetsDefinition] = {}
    for module in modules:
        for asset in find_objects_in_module_of_types(
            module, (AssetsDefinition, SourceAsset, CacheableAssetsDefinition)
        ):
            asset = cast(Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition], asset)
            if id(asset) not in asset_ids:
                asset_ids.add(id(asset))
                if isinstance(asset, CacheableAssetsDefinition):
                    cacheable_assets.append(asset)
                else:
                    keys = asset.keys if isinstance(asset, AssetsDefinition) else [asset.key]
                    for key in keys:
                        if key in asset_keys:
                            modules_str = ", ".join(
                                set([asset_keys[key].__name__, module.__name__])
                            )
                            error_str = (
                                f"Asset key {key} is defined multiple times. Definitions found in"
                                f" modules: {modules_str}. "
                            )

                            if key in assets and isinstance(asset, AssetsDefinition):
                                if assets[key].node_def == asset.node_def:
                                    error_str += (
                                        "One possible cause of this bug is a call to with_resources"
                                        " outside of a repository definition, causing a duplicate"
                                        " asset definition."
                                    )

                            raise DagsterInvalidDefinitionError(error_str)
                        else:
                            asset_keys[key] = module
                            if isinstance(asset, AssetsDefinition):
                                assets[key] = asset
                    if isinstance(asset, SourceAsset):
                        source_assets.append(asset)
    return list(set(assets.values())), source_assets, cacheable_assets


[docs]def load_assets_from_modules( modules: Iterable[ModuleType], group_name: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, *, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets and source assets from the given modules. Args: modules (Iterable[ModuleType]): The Python modules to look for assets inside. group_name (Optional[str]): Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added. key_prefix (Optional[Union[str, Sequence[str]]]): Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended. freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded assets. auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to all the loaded assets. backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended. Returns: Sequence[Union[AssetsDefinition, SourceAsset]]: A list containing assets and source assets defined in the given modules. """ group_name = check.opt_str_param(group_name, "group_name") key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) auto_materialize_policy = check.opt_inst_param( auto_materialize_policy, "auto_materialize_policy", AutoMaterializePolicy ) backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) ( assets, source_assets, cacheable_assets, ) = assets_from_modules(modules) return assets_with_attributes( assets, source_assets, cacheable_assets, key_prefix=key_prefix, group_name=group_name, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, )
[docs]def load_assets_from_current_module( group_name: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, *, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets, source assets, and cacheable assets from the module where this function is called. Args: group_name (Optional[str]): Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added. key_prefix (Optional[Union[str, Sequence[str]]]): Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended. freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded assets. auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to all the loaded assets. backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended. Returns: Sequence[Union[AssetsDefinition, SourceAsset, CachableAssetsDefinition]]: A list containing assets, source assets, and cacheable assets defined in the module. """ caller = inspect.stack()[1] module = inspect.getmodule(caller[0]) if module is None: check.failed("Could not find a module for the caller") return load_assets_from_modules( [module], group_name=group_name, key_prefix=key_prefix, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, )
def assets_from_package_module( package_module: ModuleType, extra_source_assets: Optional[Sequence[SourceAsset]] = None, ) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: """Constructs three lists, a list of assets, a list of source assets, and a list of cacheable assets from the given package module. Args: package_module (ModuleType): The package module to looks for assets inside. extra_source_assets (Optional[Sequence[SourceAsset]]): Source assets to include in the group in addition to the source assets found in the modules. Returns: Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset], Sequence[CacheableAssetsDefinition]]: A tuple containing a list of assets, a list of source assets, and a list of cacheable assets defined in the given modules. """ return assets_from_modules( find_modules_in_package(package_module), extra_source_assets=extra_source_assets )
[docs]def load_assets_from_package_module( package_module: ModuleType, group_name: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, *, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets and source assets that includes all asset definitions, source assets, and cacheable assets in all sub-modules of the given package module. A package module is the result of importing a package. Args: package_module (ModuleType): The package module to looks for assets inside. group_name (Optional[str]): Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added. key_prefix (Optional[Union[str, Sequence[str]]]): Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended. freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded assets. auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to all the loaded assets. backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended. Returns: Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: A list containing assets, source assets, and cacheable assets defined in the module. """ group_name = check.opt_str_param(group_name, "group_name") key_prefix = check_opt_coercible_to_asset_key_prefix_param(key_prefix, "key_prefix") freshness_policy = check.opt_inst_param(freshness_policy, "freshness_policy", FreshnessPolicy) auto_materialize_policy = check.opt_inst_param( auto_materialize_policy, "auto_materialize_policy", AutoMaterializePolicy ) backfill_policy = check.opt_inst_param(backfill_policy, "backfill_policy", BackfillPolicy) ( assets, source_assets, cacheable_assets, ) = assets_from_package_module(package_module) return assets_with_attributes( assets, source_assets, cacheable_assets, key_prefix=key_prefix, group_name=group_name, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, )
[docs]def load_assets_from_package_name( package_name: str, group_name: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, *, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, source_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: """Constructs a list of assets, source assets, and cacheable assets that includes all asset definitions and source assets in all sub-modules of the given package. Args: package_name (str): The name of a Python package to look for assets inside. group_name (Optional[str]): Group name to apply to the loaded assets. The returned assets will be copies of the loaded objects, with the group name added. key_prefix (Optional[Union[str, Sequence[str]]]): Prefix to prepend to the keys of the loaded assets. The returned assets will be copies of the loaded objects, with the prefix prepended. freshness_policy (Optional[FreshnessPolicy]): FreshnessPolicy to apply to all the loaded assets. auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to all the loaded assets. backfill_policy (Optional[AutoMaterializePolicy]): BackfillPolicy to apply to all the loaded assets. source_key_prefix (bool): Prefix to prepend to the keys of loaded SourceAssets. The returned assets will be copies of the loaded objects, with the prefix prepended. Returns: Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: A list containing assets, source assets, and cacheable assets defined in the module. """ package_module = import_module(package_name) return load_assets_from_package_module( package_module, group_name=group_name, key_prefix=key_prefix, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, source_key_prefix=source_key_prefix, )
def find_modules_in_package(package_module: ModuleType) -> Iterable[ModuleType]: yield package_module if package_module.__file__: for _, modname, is_pkg in pkgutil.walk_packages( package_module.__path__, prefix=package_module.__name__ + "." ): submodule = import_module(modname) if is_pkg: yield from find_modules_in_package(submodule) else: yield submodule else: raise ValueError( f"Tried to find modules in package {package_module}, but its __file__ is None" ) def prefix_assets( assets_defs: Sequence[AssetsDefinition], key_prefix: CoercibleToAssetKeyPrefix, source_assets: Sequence[SourceAsset], source_key_prefix: Optional[CoercibleToAssetKeyPrefix], ) -> Tuple[Sequence[AssetsDefinition], Sequence[SourceAsset]]: """Given a list of assets, prefix the input and output asset keys and check specs with key_prefix. The prefix is not added to source assets. Input asset keys that reference other assets within assets_defs are "brought along" - i.e. prefixed as well. Example with a single asset: .. code-block:: python @asset def asset1(): ... result = prefixed_asset_key_replacements([asset_1], "my_prefix") assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) Example with dependencies within the list of assets: .. code-block:: python @asset def asset1(): ... @asset def asset2(asset1): ... result = prefixed_asset_key_replacements([asset1, asset2], "my_prefix") assert result.assets[0].asset_key == AssetKey(["my_prefix", "asset1"]) assert result.assets[1].asset_key == AssetKey(["my_prefix", "asset2"]) assert result.assets[1].dependency_keys == {AssetKey(["my_prefix", "asset1"])} """ asset_keys = {asset_key for assets_def in assets_defs for asset_key in assets_def.keys} check_target_keys = { key.asset_key for assets_def in assets_defs for key in assets_def.check_keys } source_asset_keys = {source_asset.key for source_asset in source_assets} if isinstance(key_prefix, str): key_prefix = [key_prefix] key_prefix = check.is_list(key_prefix, of_type=str) result_assets: List[AssetsDefinition] = [] for assets_def in assets_defs: output_asset_key_replacements = { asset_key: AssetKey([*key_prefix, *asset_key.path]) for asset_key in assets_def.keys } input_asset_key_replacements = {} for dep_asset_key in assets_def.keys_by_input_name.values(): if dep_asset_key in asset_keys or dep_asset_key in check_target_keys: input_asset_key_replacements[dep_asset_key] = AssetKey( [*key_prefix, *dep_asset_key.path] ) elif source_key_prefix and dep_asset_key in source_asset_keys: input_asset_key_replacements[dep_asset_key] = AssetKey( [*source_key_prefix, *dep_asset_key.path] ) check_specs_by_output_name = { output_name: check_spec.with_asset_key_prefix(key_prefix) for output_name, check_spec in assets_def.check_specs_by_output_name.items() } selected_asset_check_keys = { key.with_asset_key_prefix(key_prefix) for key in assets_def.check_keys } result_assets.append( assets_def.with_attributes( output_asset_key_replacements=output_asset_key_replacements, input_asset_key_replacements=input_asset_key_replacements, check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=selected_asset_check_keys, ) ) if source_key_prefix: result_source_assets = [ source_asset.with_attributes(key=AssetKey([*source_key_prefix, *source_asset.key.path])) for source_asset in source_assets ] else: result_source_assets = source_assets return result_assets, result_source_assets def assets_with_attributes( assets_defs: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset], cacheable_assets: Sequence[CacheableAssetsDefinition], key_prefix: Optional[Sequence[str]], group_name: Optional[str], freshness_policy: Optional[FreshnessPolicy], auto_materialize_policy: Optional[AutoMaterializePolicy], backfill_policy: Optional[BackfillPolicy], source_key_prefix: Optional[Sequence[str]], ) -> Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]: # There is a tricky edge case here where if a non-cacheable asset depends on a cacheable asset, # and the assets are prefixed, the non-cacheable asset's dependency will not be prefixed since # at prefix-time it is not known that its dependency is one of the cacheable assets. # https://github.com/dagster-io/dagster/pull/10389#pullrequestreview-1170913271 if key_prefix: assets_defs, source_assets = prefix_assets( assets_defs, key_prefix, source_assets, source_key_prefix ) cacheable_assets = [ cached_asset.with_prefix_for_all(key_prefix) for cached_asset in cacheable_assets ] if group_name or freshness_policy or auto_materialize_policy or backfill_policy: assets_defs = [ asset.with_attributes( group_names_by_key=( {asset_key: group_name for asset_key in asset.keys} if group_name is not None else None ), freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, ) for asset in assets_defs ] if group_name: source_assets = [ source_asset.with_attributes(group_name=group_name) for source_asset in source_assets ] cacheable_assets = [ cached_asset.with_attributes_for_all( group_name, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, ) for cached_asset in cacheable_assets ] return [*assets_defs, *source_assets, *cacheable_assets]