Ask AI

Source code for dagster._core.definitions.result

from typing import Mapping, NamedTuple, Optional, Sequence

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.data_version import DataVersion
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.metadata import RawMetadataMapping


class AssetResult(
    NamedTuple(
        "_AssetResult",
        [
            ("asset_key", PublicAttr[Optional[AssetKey]]),
            ("metadata", PublicAttr[Optional[RawMetadataMapping]]),
            ("check_results", PublicAttr[Sequence[AssetCheckResult]]),
            ("data_version", PublicAttr[Optional[DataVersion]]),
            ("tags", PublicAttr[Optional[Mapping[str, str]]]),
        ],
    )
):
    """Base class for MaterializeResult and ObserveResult."""

    def __new__(
        cls,
        *,  # enforce kwargs
        asset_key: Optional[CoercibleToAssetKey] = None,
        metadata: Optional[RawMetadataMapping] = None,
        check_results: Optional[Sequence[AssetCheckResult]] = None,
        data_version: Optional[DataVersion] = None,
        tags: Optional[Mapping[str, str]] = None,
    ):
        from dagster._core.definitions.events import validate_asset_event_tags

        asset_key = AssetKey.from_coercible(asset_key) if asset_key else None

        return super().__new__(
            cls,
            asset_key=asset_key,
            metadata=check.opt_nullable_mapping_param(
                metadata,
                "metadata",
                key_type=str,
            ),
            check_results=check.opt_sequence_param(
                check_results, "check_results", of_type=AssetCheckResult
            ),
            data_version=check.opt_inst_param(data_version, "data_version", DataVersion),
            tags=validate_asset_event_tags(tags),
        )

    def check_result_named(self, check_name: str) -> AssetCheckResult:
        for check_result in self.check_results:
            if check_result.check_name == check_name:
                return check_result

        check.failed(f"Could not find check result named {check_name}")


[docs] class MaterializeResult(AssetResult): """An object representing a successful materialization of an asset. These can be returned from @asset and @multi_asset decorated functions to pass metadata or specify specific assets were materialized. Attributes: asset_key (Optional[AssetKey]): Optional in @asset, required in @multi_asset to discern which asset this refers to. metadata (Optional[RawMetadataMapping]): Metadata to record with the corresponding AssetMaterialization event. check_results (Optional[Sequence[AssetCheckResult]]): Check results to record with the corresponding AssetMaterialization event. data_version (Optional[DataVersion]): The data version of the asset that was observed. tags (Optional[Mapping[str, str]]): Tags to record with the corresponding AssetMaterialization event. """
[docs] @experimental class ObserveResult(AssetResult): """An object representing a successful observation of an asset. These can be returned from an @observable_source_asset decorated function to pass metadata. Attributes: asset_key (Optional[AssetKey]): The asset key. Optional to include. metadata (Optional[RawMetadataMapping]): Metadata to record with the corresponding AssetObservation event. check_results (Optional[Sequence[AssetCheckResult]]): Check results to record with the corresponding AssetObservation event. data_version (Optional[DataVersion]): The data version of the asset that was observed. tags (Optional[Mapping[str, str]]): Tags to record with the corresponding AssetObservation event. """