from enum import Enum
from functools import cached_property
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Iterable,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)
import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param, public
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.utils import (
resolve_automation_condition,
validate_asset_owner,
validate_group_name,
)
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.storage.tags import KIND_PREFIX
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.internal_init import IHasInternalInit
from dagster._utils.tags import normalize_tags
if TYPE_CHECKING:
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset
# (which currently ends up on the Output associated with the asset key)
# whih encodes the execution type the of asset. "Unexecutable" assets are assets
# that cannot be materialized in Dagster, but can have events in the event
# log keyed off of them, making Dagster usable as a observability and lineage tool
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"
# SYSTEM_METADATA_KEY_IO_MANAGER_KEY lives on the metadata of an asset without a node def and
# determines the io_manager_key that can be used to load it. This is necessary because IO manager
# keys are otherwise encoded inside OutputDefinitions within NodeDefinitions.
SYSTEM_METADATA_KEY_IO_MANAGER_KEY = "dagster/io_manager_key"
# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of
# external assets resulting from a source asset conversion. It contains the
# `auto_observe_interval_minutes` value from the source asset and is consulted
# in the auto-materialize daemon. It should eventually be eliminated in favor
# of an implementation of `auto_observe_interval_minutes` in terms of
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"
# SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET lives on the metadata of external assets that are
# created for undefined but referenced assets during asset graph normalization. For example, in the
# below definitions, `foo` is referenced by upstream `bar` but has no corresponding definition:
#
#
# @asset(deps=["foo"])
# def bar(context: AssetExecutionContext):
# ...
#
# defs=Definitions(assets=[bar])
#
# During normalization we create a "stub" definition for `foo` and attach this metadata to it.
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET = "dagster/auto_created_stub_asset"
@whitelist_for_serdes
class AssetExecutionType(Enum):
OBSERVATION = "OBSERVATION"
UNEXECUTABLE = "UNEXECUTABLE"
MATERIALIZATION = "MATERIALIZATION"
def validate_kind_tags(kinds: Optional[AbstractSet[str]]) -> None:
if kinds is not None and len(kinds) > 3:
raise DagsterInvalidDefinitionError("Assets can have at most three kinds currently.")
[docs]
@experimental_param(param="owners")
@experimental_param(param="tags")
@experimental_param(param="kinds")
class AssetSpec(
NamedTuple(
"_AssetSpec",
[
("key", PublicAttr[AssetKey]),
("deps", PublicAttr[Iterable["AssetDep"]]),
("description", PublicAttr[Optional[str]]),
("metadata", PublicAttr[Mapping[str, Any]]),
("group_name", PublicAttr[Optional[str]]),
("skippable", PublicAttr[bool]),
("code_version", PublicAttr[Optional[str]]),
("freshness_policy", PublicAttr[Optional[FreshnessPolicy]]),
("automation_condition", PublicAttr[Optional[AutomationCondition]]),
("owners", PublicAttr[Sequence[str]]),
("tags", PublicAttr[Mapping[str, str]]),
("partitions_def", PublicAttr[Optional[PartitionsDefinition]]),
],
),
IHasInternalInit,
):
"""Specifies the core attributes of an asset, except for the function that materializes or
observes it.
An asset spec plus any materialization or observation function for the asset constitutes an
"asset definition".
Attributes:
key (AssetKey): The unique identifier for this asset.
deps (Optional[AbstractSet[AssetKey]]): The asset keys for the upstream assets that
materializing this asset depends on.
description (Optional[str]): Human-readable description of this asset.
metadata (Optional[Dict[str, Any]]): A dict of static metadata for this asset.
For example, users can provide information about the database table this
asset corresponds to.
skippable (bool): Whether this asset can be omitted during materialization, causing downstream
dependencies to skip.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If
not provided, the name "default" is used.
code_version (Optional[str]): The version of the code for this specific asset,
overriding the code version of the materialization function
freshness_policy (Optional[FreshnessPolicy]): (Deprecated) A policy which indicates how up
to date this asset is intended to be.
auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to
the specified asset.
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset.
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
e.g. `team:finops`.
tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not
attached to runs of the asset.
kinds: (Optional[Set[str]]): A list of strings representing the kinds of the asset. These
will be made visible in the Dagster UI.
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
"""
def __new__(
cls,
key: CoercibleToAssetKey,
*,
deps: Optional[Iterable["CoercibleToAssetDep"]] = None,
description: Optional[str] = None,
metadata: Optional[Mapping[str, Any]] = None,
skippable: bool = False,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
automation_condition: Optional[AutomationCondition] = None,
owners: Optional[Sequence[str]] = None,
tags: Optional[Mapping[str, str]] = None,
kinds: Optional[Set[str]] = None,
# TODO: FOU-243
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
partitions_def: Optional[PartitionsDefinition] = None,
):
from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates
key = AssetKey.from_coercible(key)
asset_deps = coerce_to_deps_and_check_duplicates(deps, key)
validate_group_name(group_name)
owners = check.opt_sequence_param(owners, "owners", of_type=str)
for owner in owners:
validate_asset_owner(owner, key)
tags_with_kinds = {
**(normalize_tags(tags or {}, strict=True)),
**{f"{KIND_PREFIX}{kind}": "" for kind in kinds or []},
}
kind_tags = {
tag_key for tag_key in tags_with_kinds.keys() if tag_key.startswith(KIND_PREFIX)
}
validate_kind_tags(kind_tags)
return super().__new__(
cls,
key=key,
deps=asset_deps,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
skippable=check.bool_param(skippable, "skippable"),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
freshness_policy=check.opt_inst_param(
freshness_policy,
"freshness_policy",
FreshnessPolicy,
),
automation_condition=check.opt_inst_param(
resolve_automation_condition(automation_condition, auto_materialize_policy),
"automation_condition",
AutomationCondition,
),
owners=owners,
tags=tags_with_kinds,
partitions_def=check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
),
)
@staticmethod
def dagster_internal_init(
*,
key: CoercibleToAssetKey,
deps: Optional[Iterable["CoercibleToAssetDep"]],
description: Optional[str],
metadata: Optional[Mapping[str, Any]],
skippable: bool,
group_name: Optional[str],
code_version: Optional[str],
freshness_policy: Optional[FreshnessPolicy],
automation_condition: Optional[AutomationCondition],
owners: Optional[Sequence[str]],
tags: Optional[Mapping[str, str]],
kinds: Optional[Set[str]],
auto_materialize_policy: Optional[AutoMaterializePolicy],
partitions_def: Optional[PartitionsDefinition],
) -> "AssetSpec":
check.invariant(auto_materialize_policy is None)
return AssetSpec(
key=key,
deps=deps,
description=description,
metadata=metadata,
skippable=skippable,
group_name=group_name,
code_version=code_version,
freshness_policy=freshness_policy,
automation_condition=automation_condition,
owners=owners,
tags=tags,
kinds=kinds,
partitions_def=partitions_def,
)
@cached_property
def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:
return {
dep.asset_key: dep.partition_mapping
for dep in self.deps
if dep.partition_mapping is not None
}
@property
def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
# TODO: FOU-243
return (
self.automation_condition.as_auto_materialize_policy()
if self.automation_condition
else None
)
@cached_property
def kinds(self) -> Set[str]:
return {tag[len(KIND_PREFIX) :] for tag in self.tags if tag.startswith(KIND_PREFIX)}
[docs]
@public
def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec":
"""Returns a copy of this AssetSpec with an extra metadata value that dictates which I/O
manager to use to load the contents of this asset in downstream computations.
Args:
io_manager_key (str): The I/O manager key. This will be used as the value for the
"dagster/io_manager_key" metadata key.
Returns:
AssetSpec
"""
return self._replace(
metadata={**self.metadata, SYSTEM_METADATA_KEY_IO_MANAGER_KEY: io_manager_key}
)
def replace_attributes(
spec: AssetSpec,
*,
key: CoercibleToAssetKey = ...,
deps: Optional[Iterable["CoercibleToAssetDep"]] = ...,
description: Optional[str] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
skippable: bool = ...,
group_name: Optional[str] = ...,
code_version: Optional[str] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
automation_condition: Optional[AutomationCondition] = ...,
owners: Optional[Sequence[str]] = ...,
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
auto_materialize_policy: Optional[AutoMaterializePolicy] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in spec.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
return spec.dagster_internal_init(
key=key if key is not ... else spec.key,
deps=deps if deps is not ... else spec.deps,
description=description if description is not ... else spec.description,
metadata=metadata if metadata is not ... else spec.metadata,
skippable=skippable if skippable is not ... else spec.skippable,
group_name=group_name if group_name is not ... else spec.group_name,
code_version=code_version if code_version is not ... else spec.code_version,
freshness_policy=freshness_policy if freshness_policy is not ... else spec.freshness_policy,
automation_condition=automation_condition
if automation_condition is not ...
else spec.automation_condition,
owners=owners if owners is not ... else spec.owners,
tags=tags if tags is not ... else current_tags_without_kinds,
kinds=kinds if kinds is not ... else spec.kinds,
auto_materialize_policy=auto_materialize_policy
if auto_materialize_policy is not ...
else spec.auto_materialize_policy,
partitions_def=partitions_def if partitions_def is not ... else spec.partitions_def,
)
def merge_attributes(
spec: AssetSpec,
*,
deps: Iterable["CoercibleToAssetDep"] = ...,
metadata: Mapping[str, Any] = ...,
owners: Sequence[str] = ...,
tags: Mapping[str, str] = ...,
kinds: Set[str] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes merged with the current attributes."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in spec.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
return spec.dagster_internal_init(
key=spec.key,
deps=[*spec.deps, *(deps if deps is not ... else [])],
description=spec.description,
metadata={**spec.metadata, **(metadata if metadata is not ... else {})},
skippable=spec.skippable,
group_name=spec.group_name,
code_version=spec.code_version,
freshness_policy=spec.freshness_policy,
automation_condition=spec.automation_condition,
owners=[*spec.owners, *(owners if owners is not ... else [])],
tags={**current_tags_without_kinds, **(tags if tags is not ... else {})},
kinds={*spec.kinds, *(kinds if kinds is not ... else {})},
auto_materialize_policy=spec.auto_materialize_policy,
partitions_def=spec.partitions_def,
)