Ask AI

Source code for dagster._core.definitions.asset_out

from typing import Any, Mapping, NamedTuple, Optional, Sequence, Type, Union

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
    AutomationCondition,
)
from dagster._core.definitions.events import (
    AssetKey,
    CoercibleToAssetKey,
    CoercibleToAssetKeyPrefix,
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.input import NoValueSentinel
from dagster._core.definitions.output import Out
from dagster._core.definitions.utils import DEFAULT_IO_MANAGER_KEY, resolve_automation_condition
from dagster._core.types.dagster_type import DagsterType, resolve_dagster_type
from dagster._utils.tags import normalize_tags
from dagster._utils.warnings import disable_dagster_warnings


[docs] @experimental_param(param="owners") @experimental_param(param="tags") class AssetOut( NamedTuple( "_AssetOut", [ ("key", PublicAttr[Optional[AssetKey]]), ("key_prefix", PublicAttr[Optional[Sequence[str]]]), ("metadata", PublicAttr[Optional[Mapping[str, Any]]]), ("io_manager_key", PublicAttr[str]), ("description", PublicAttr[Optional[str]]), ("is_required", PublicAttr[bool]), ("dagster_type", PublicAttr[Union[DagsterType, Type[NoValueSentinel]]]), ("group_name", PublicAttr[Optional[str]]), ("code_version", PublicAttr[Optional[str]]), ("freshness_policy", PublicAttr[Optional[FreshnessPolicy]]), ("automation_condition", PublicAttr[Optional[AutomationCondition]]), ("backfill_policy", PublicAttr[Optional[BackfillPolicy]]), ("owners", PublicAttr[Optional[Sequence[str]]]), ("tags", PublicAttr[Mapping[str, str]]), ], ) ): """Defines one of the assets produced by a :py:func:`@multi_asset <multi_asset>`. Attributes: key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the concatenation of the key_prefix and the asset's name. When using ``@multi_asset``, the asset name defaults to the key of the "outs" dictionary Only one of the "key_prefix" and "key" arguments should be provided. key (Optional[Union[str, Sequence[str], AssetKey]]): The asset's key. Only one of the "key_prefix" and "key" arguments should be provided. dagster_type (Optional[Union[Type, DagsterType]]]): The type of this output. Should only be set if the correct type can not be inferred directly from the type signature of the decorated function. description (Optional[str]): Human-readable description of the output. is_required (bool): Whether the presence of this field is required. (default: True) io_manager_key (Optional[str]): The resource key of the IO manager used for this output. (default: "io_manager"). metadata (Optional[Dict[str, Any]]): A dict of the metadata for the output. For example, users can provide a file path if the data object will be stored in a filesystem, or provide information of a database table when it is going to load the data into the table. group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided, the name "default" is used. code_version (Optional[str]): The version of the code that generates this asset. freshness_policy (Optional[FreshnessPolicy]): (Deprecated) A policy which indicates how up to date this asset is intended to be. automation_condition (Optional[AutomationCondition]): AutomationCondition to apply to the specified asset. backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset. owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each string can be a user's email address, or a team name prefixed with `team:`, e.g. `team:finops`. tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not attached to runs of the asset. """ def __new__( cls, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, key: Optional[CoercibleToAssetKey] = None, dagster_type: Union[Type, DagsterType] = NoValueSentinel, description: Optional[str] = None, is_required: bool = True, io_manager_key: Optional[str] = None, metadata: Optional[Mapping[str, Any]] = None, group_name: Optional[str] = None, code_version: Optional[str] = None, freshness_policy: Optional[FreshnessPolicy] = None, automation_condition: Optional[AutomationCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, owners: Optional[Sequence[str]] = None, tags: Optional[Mapping[str, str]] = None, # TODO: FOU-243 auto_materialize_policy: Optional[AutoMaterializePolicy] = None, ): if isinstance(key_prefix, str): key_prefix = [key_prefix] return super(AssetOut, cls).__new__( cls, key=AssetKey.from_coercible(key) if key is not None else None, key_prefix=check.opt_list_param(key_prefix, "key_prefix", of_type=str), dagster_type=( NoValueSentinel if dagster_type is NoValueSentinel else resolve_dagster_type(dagster_type) ), description=check.opt_str_param(description, "description"), is_required=check.bool_param(is_required, "is_required"), io_manager_key=check.opt_str_param( io_manager_key, "io_manager_key", default=DEFAULT_IO_MANAGER_KEY ), metadata=check.opt_mapping_param(metadata, "metadata", key_type=str), group_name=check.opt_str_param(group_name, "group_name"), code_version=check.opt_str_param(code_version, "code_version"), freshness_policy=check.opt_inst_param( freshness_policy, "freshness_policy", FreshnessPolicy ), automation_condition=check.opt_inst_param( resolve_automation_condition(automation_condition, auto_materialize_policy), "automation_condition", AutomationCondition, ), backfill_policy=check.opt_inst_param( backfill_policy, "backfill_policy", BackfillPolicy ), owners=check.opt_sequence_param(owners, "owners", of_type=str), tags=normalize_tags(tags or {}, strict=True), ) def to_out(self) -> Out: return Out( dagster_type=self.dagster_type, description=self.description, metadata=self.metadata, is_required=self.is_required, io_manager_key=self.io_manager_key, code_version=self.code_version, ) def to_spec( self, key: AssetKey, deps: Sequence[AssetDep], additional_tags: Mapping[str, str] = {} ) -> AssetSpec: with disable_dagster_warnings(): return AssetSpec.dagster_internal_init( key=key, metadata=self.metadata, description=self.description, skippable=not self.is_required, group_name=self.group_name, code_version=self.code_version, freshness_policy=self.freshness_policy, automation_condition=self.automation_condition, owners=self.owners, tags={**additional_tags, **self.tags} if self.tags else additional_tags, deps=deps, auto_materialize_policy=None, partitions_def=None, kinds=None, ) @property def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: return ( self.automation_condition.as_auto_materialize_policy() if self.automation_condition else None )