from typing import TYPE_CHECKING, Iterable, NamedTuple, Optional, Sequence, Union
import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.partition_mapping import (
PartitionMapping,
warn_if_partition_mapping_not_builtin,
)
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._utils.warnings import deprecation_warning
if TYPE_CHECKING:
from dagster._core.definitions.assets import AssetsDefinition
CoercibleToAssetDep = Union[
CoercibleToAssetKey, AssetSpec, "AssetsDefinition", SourceAsset, "AssetDep"
]
[docs]
class AssetDep(
NamedTuple(
"_AssetDep",
[
("asset_key", PublicAttr[AssetKey]),
("partition_mapping", PublicAttr[Optional[PartitionMapping]]),
],
)
):
"""Specifies a dependency on an upstream asset.
Attributes:
asset (Union[AssetKey, str, AssetSpec, AssetsDefinition, SourceAsset]): The upstream asset to depend on.
partition_mapping (Optional[PartitionMapping]): Defines what partitions to depend on in
the upstream asset. If not provided and the upstream asset is partitioned, defaults to
the default partition mapping for the partitions definition, which is typically maps
partition keys to the same partition keys in upstream assets.
Examples:
.. code-block:: python
upstream_asset = AssetSpec("upstream_asset")
downstream_asset = AssetSpec(
"downstream_asset",
deps=[
AssetDep(
upstream_asset,
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
)
]
)
"""
def __new__(
cls,
asset: Union[CoercibleToAssetKey, AssetSpec, "AssetsDefinition", SourceAsset],
*,
partition_mapping: Optional[PartitionMapping] = None,
):
from dagster._core.definitions.assets import AssetsDefinition
if isinstance(asset, list):
check.list_param(asset, "asset", of_type=str)
else:
check.inst_param(
asset, "asset", (AssetKey, str, AssetSpec, AssetsDefinition, SourceAsset)
)
if isinstance(asset, AssetsDefinition) and len(asset.keys) > 1:
# Only AssetsDefinition with a single asset can be passed
raise DagsterInvalidDefinitionError(
"Cannot create an AssetDep from a multi_asset AssetsDefinition."
" Instead, specify dependencies on the assets created by the multi_asset"
f" via AssetKeys or strings. For the multi_asset {asset.node_def.name}, the"
f" available keys are: {asset.keys}."
)
asset_key = _get_asset_key(asset)
if partition_mapping:
warn_if_partition_mapping_not_builtin(partition_mapping)
return super().__new__(
cls,
asset_key=asset_key,
partition_mapping=check.opt_inst_param(
partition_mapping,
"partition_mapping",
PartitionMapping,
),
)
@staticmethod
def from_coercible(arg: "CoercibleToAssetDep") -> "AssetDep":
# if arg is AssetDep, return the original object to retain partition_mapping
return arg if isinstance(arg, AssetDep) else AssetDep(asset=arg)
def _get_asset_key(arg: "CoercibleToAssetDep") -> AssetKey:
from dagster._core.definitions.assets import AssetsDefinition
if isinstance(arg, (AssetsDefinition, SourceAsset, AssetSpec)):
return arg.key
elif isinstance(arg, AssetDep):
return arg.asset_key
else:
return AssetKey.from_coercible(arg)
def coerce_to_deps_and_check_duplicates(
coercible_to_asset_deps: Optional[Iterable["CoercibleToAssetDep"]],
key: Optional[Union[AssetKey, AssetCheckKey]],
) -> Sequence[AssetDep]:
from dagster._core.definitions.assets import AssetsDefinition
if not coercible_to_asset_deps:
return []
# when AssetKey was a plain NamedTuple, it also happened to be Iterable[CoercibleToAssetKey]
# so continue to support it here
if isinstance(coercible_to_asset_deps, AssetKey):
deprecation_warning(
subject="Passing a single AssetKey to deps",
breaking_version="1.10.0",
)
coercible_to_asset_deps = [coercible_to_asset_deps]
# expand any multi_assets into a list of keys
all_deps = []
for dep in coercible_to_asset_deps:
if isinstance(dep, AssetsDefinition) and len(dep.keys) > 1:
all_deps.extend(dep.keys)
else:
all_deps.append(dep)
dep_set = {}
for dep in all_deps:
asset_dep = AssetDep.from_coercible(dep)
# we cannot do deduplication via a set because MultiPartitionMappings have an internal
# dictionary that cannot be hashed. Instead deduplicate by making a dictionary and checking
# for existing keys. If an asset is specified as a dependency more than once, only error if the
# dependency is different (ie has a different PartitionMapping)
if asset_dep.asset_key in dep_set and asset_dep != dep_set[asset_dep.asset_key]:
key_msg = f"for spec {key}." if key else "per asset."
raise DagsterInvariantViolationError(
f"Cannot set a dependency on asset {asset_dep.asset_key} more than once {key_msg}"
)
dep_set[asset_dep.asset_key] = asset_dep
return list(dep_set.values())