from typing import Any, Mapping, Optional, Sequence, Type, Union
import dagster._check as check
from dagster._annotations import (
experimental_param,
hidden_param,
only_allow_hidden_params_in_kwargs,
public,
)
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.events import (
AssetKey,
CoercibleToAssetKey,
CoercibleToAssetKeyPrefix,
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.input import NoValueSentinel
from dagster._core.definitions.output import Out
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.utils import resolve_automation_condition
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.types.dagster_type import DagsterType
from dagster._utils.tags import normalize_tags
from dagster._utils.warnings import disable_dagster_warnings
# Unfortunate, since AssetOut stores nearly all of the properties of
# an AssetSpec except for the key, we use a sentinel value to represent
# an unspecified key
EMPTY_ASSET_KEY_SENTINEL = AssetKey([])
[docs]
@experimental_param(param="owners")
@experimental_param(param="tags")
@hidden_param(
param="freshness_policy",
breaking_version="1.10.0",
additional_warn_text="use freshness checks instead",
)
@hidden_param(
param="auto_materialize_policy",
breaking_version="1.10.0",
additional_warn_text="use `automation_condition` instead",
)
class AssetOut:
"""Defines one of the assets produced by a :py:func:`@multi_asset <multi_asset>`."""
_spec: AssetSpec
key_prefix: Optional[Sequence[str]]
dagster_type: Union[Type, DagsterType]
is_required: bool
io_manager_key: Optional[str]
backfill_policy: Optional[BackfillPolicy]
def __init__(
self,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
key: Optional[CoercibleToAssetKey] = None,
dagster_type: Union[Type, DagsterType] = NoValueSentinel,
description: Optional[str] = None,
is_required: bool = True,
io_manager_key: Optional[str] = None,
metadata: Optional[Mapping[str, Any]] = None,
group_name: Optional[str] = None,
code_version: Optional[str] = None,
automation_condition: Optional[AutomationCondition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
owners: Optional[Sequence[str]] = None,
tags: Optional[Mapping[str, str]] = None,
**kwargs,
):
"""Defines an asset produced by a :py:func:`@multi_asset <multi_asset>`.
Args:
key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the
concatenation of the key_prefix and the asset's name. When using ``@multi_asset``, the
asset name defaults to the key of the "outs" dictionary Only one of the "key_prefix" and
"key" arguments should be provided.
key (Optional[Union[str, Sequence[str], AssetKey]]): The asset's key. Only one of the
"key_prefix" and "key" arguments should be provided.
dagster_type (Optional[Union[Type, DagsterType]]]):
The type of this output. Should only be set if the correct type can not
be inferred directly from the type signature of the decorated function.
description (Optional[str]): Human-readable description of the output.
is_required (bool): Whether the presence of this field is required. (default: True)
io_manager_key (Optional[str]): The resource key of the IO manager used for this output.
(default: "io_manager").
metadata (Optional[Dict[str, Any]]): A dict of the metadata for the output.
For example, users can provide a file path if the data object will be stored in a
filesystem, or provide information of a database table when it is going to load the data
into the table.
group_name (Optional[str]): A string name used to organize multiple assets into groups. If
not provided, the name "default" is used.
code_version (Optional[str]): The version of the code that generates this asset.
freshness_policy (Optional[FreshnessPolicy]): (Deprecated) A policy which indicates how up
to date this asset is intended to be.
automation_condition (Optional[AutomationCondition]): AutomationCondition to apply to the
specified asset.
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset.
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
e.g. `team:finops`.
tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not
attached to runs of the asset.
"""
# Accept a hidden "spec" argument to allow for the AssetOut to be constructed from an AssetSpec
# directly. This is used in the AssetOut.from_spec method.
spec = kwargs.get("spec")
if spec:
del kwargs["spec"]
only_allow_hidden_params_in_kwargs(AssetOut, kwargs)
if isinstance(key_prefix, str):
key_prefix = [key_prefix]
check.invariant(
not (key_prefix and key), "Only one of key_prefix and key should be provided"
)
auto_materialize_policy = kwargs.get("auto_materialize_policy")
freshness_policy = kwargs.get("freshness_policy")
has_any_spec_args = any(
[
key,
description,
metadata,
group_name,
code_version,
automation_condition,
auto_materialize_policy,
freshness_policy,
owners,
tags,
]
)
check.invariant(
not has_any_spec_args or not spec,
"Cannot provide both spec and spec-related arguments (key, description, metadata, etc.)",
)
with disable_dagster_warnings():
# This is a bit of a hack, since technically AssetOut does not hold all of the
# properties of an AssetSpec - chiefly, it is missing a key. Still, this reduces
# the amount of code duplication storing each of the properties in both AssetOut
# and AssetSpec, and allows us to implement the from_spec method below.
self._spec = spec or AssetSpec(
key=AssetKey.from_coercible(key) if key is not None else EMPTY_ASSET_KEY_SENTINEL,
description=check.opt_str_param(description, "description"),
metadata=check.opt_mapping_param(metadata, "metadata", key_type=str),
group_name=check.opt_str_param(group_name, "group_name"),
code_version=check.opt_str_param(code_version, "code_version"),
automation_condition=check.opt_inst_param(
resolve_automation_condition(automation_condition, auto_materialize_policy),
"automation_condition",
AutomationCondition,
),
freshness_policy=check.opt_inst_param(
freshness_policy, "freshness_policy", FreshnessPolicy
),
owners=check.opt_sequence_param(owners, "owners", of_type=str),
tags=normalize_tags(tags or {}, strict=True),
)
self.key_prefix = key_prefix
self.dagster_type = dagster_type
self.is_required = is_required
self.io_manager_key = io_manager_key
self.backfill_policy = backfill_policy
@property
def key(self) -> Optional[AssetKey]:
return self._spec.key if self._spec.key != EMPTY_ASSET_KEY_SENTINEL else None
@property
def metadata(self) -> Optional[Mapping[str, Any]]:
return self._spec.metadata
@property
def description(self) -> Optional[str]:
return self._spec.description
@property
def group_name(self) -> Optional[str]:
return self._spec.group_name
@property
def code_version(self) -> Optional[str]:
return self._spec.code_version
@property
def freshness_policy(self) -> Optional[FreshnessPolicy]:
return self._spec.freshness_policy
@property
def automation_condition(self) -> Optional[AutomationCondition]:
return self._spec.automation_condition
@property
def owners(self) -> Optional[Sequence[str]]:
return self._spec.owners
@property
def tags(self) -> Optional[Mapping[str, str]]:
return self._spec.tags
def to_out(self) -> Out:
return Out(
dagster_type=self.dagster_type,
description=self.description,
metadata=self.metadata,
is_required=self.is_required,
io_manager_key=self.io_manager_key,
code_version=self.code_version,
)
def to_spec(
self,
key: AssetKey,
deps: Sequence[AssetDep],
additional_tags: Mapping[str, str] = {},
partitions_def: Optional[PartitionsDefinition] = ...,
) -> AssetSpec:
return self._spec.replace_attributes(
key=key,
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
deps=[*self._spec.deps, *deps],
partitions_def=partitions_def if partitions_def is not None else ...,
)
[docs]
@public
@staticmethod
def from_spec(
spec: AssetSpec,
dagster_type: Union[Type, DagsterType] = NoValueSentinel,
is_required: bool = True,
io_manager_key: Optional[str] = None,
backfill_policy: Optional[BackfillPolicy] = None,
) -> "AssetOut":
"""Builds an AssetOut from the passed spec.
Args:
spec (AssetSpec): The spec to build the AssetOut from.
dagster_type (Optional[Union[Type, DagsterType]]): The type of this output. Should only
be set if the correct type can not be inferred directly from the type signature of
the decorated function.
is_required (bool): Whether the presence of this field is required. (default: True)
io_manager_key (Optional[str]): The resource key of the IO manager used for this output.
(default: "io_manager").
backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified
asset.
Returns:
AssetOut: The AssetOut built from the spec.
"""
if spec.deps:
raise DagsterInvalidDefinitionError(
"Currently, cannot build AssetOut from spec with deps"
)
return AssetOut(
spec=spec,
dagster_type=dagster_type,
is_required=is_required,
io_manager_key=io_manager_key,
backfill_policy=backfill_policy,
)
@property
def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]:
return (
self.automation_condition.as_auto_materialize_policy()
if self.automation_condition
else None
)