Ask AI

Source code for dagster._core.definitions.freshness_checks.time_partition

from typing import Sequence, Union

from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition

from ..asset_checks import AssetChecksDefinition
from ..assets import AssetsDefinition, SourceAsset
from ..events import CoercibleToAssetKey
from .shared_builder import build_freshness_checks_for_assets
from .utils import (
    DEFAULT_FRESHNESS_SEVERITY,
    DEFAULT_FRESHNESS_TIMEZONE,
)


[docs]@experimental def build_time_partition_freshness_checks( *, assets: Sequence[Union[SourceAsset, CoercibleToAssetKey, AssetsDefinition]], deadline_cron: str, timezone: str = DEFAULT_FRESHNESS_TIMEZONE, severity: AssetCheckSeverity = DEFAULT_FRESHNESS_SEVERITY, ) -> AssetChecksDefinition: r"""Construct an `AssetChecksDefinition` that checks the freshness of the provided assets. This check passes if the asset is considered "fresh" by the time that execution begins. We consider an asset to be "fresh" if there exists a record for the most recent partition, once the deadline has passed. `deadline_cron` is a cron schedule that defines the deadline for when we should expect the most recent partition to arrive by. Once a tick of the cron schedule has passed, this check will fail if the most recent partition has not been observed/materialized. Let's say I have a daily-partitioned asset which runs every day at 8:00 AM UTC, and takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the `deadline_cron` to "0 9 \* \* \*". This means that starting at 9:00 AM, this check will expect a record to exist for the previous day's partition. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we'll instead be checking for the most recently passed deadline, which is yesterday (meaning the partition representing the day before yesterday). The timestamp of an observation record is the timestamp indicated by the "dagster/last_updated_timestamp" metadata key. The timestamp of a materialization record is the timestamp at which that record was created. The check will fail at runtime if a non-time-window partitioned asset is passed in. The check result will contain the following metadata: "dagster/freshness_params": A dictionary containing the parameters used to construct the check. "dagster/last_updated_time": (Only present if the asset has been observed/materialized before) The time of the most recent update to the asset. "dagster/overdue_seconds": (Only present if asset is overdue) The number of seconds that the asset is overdue by. "dagster/overdue_deadline_timestamp": The timestamp that we are expecting the asset to have arrived by. This is the timestamp of the most recent tick of the cron schedule. Examples: .. code-block:: python from dagster import build_time_partition_freshness_checks, AssetKey # A daily partitioned asset that is expected to be updated every day within 45 minutes # of 9:00 AM UTC from .somewhere import my_daily_scheduled_assets_def checks_def = build_time_partition_freshness_checks( [my_daily_scheduled_assets_def], deadline_cron="0 9 * * *", ) Args: assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]): The assets to construct checks for. For each passed in asset, there will be a corresponding constructed `AssetChecksDefinition`. deadline_cron (str): The check will pass if the partition time window most recently completed by the time of the last cron tick has been observed/materialized. timezone (Optional[str]): The timezone to use when calculating freshness and deadline. If not provided, defaults to "UTC". Returns: AssetChecksDefinition: An `AssetChecksDefinition` object, which can execute a freshness check for each provided asset. """ return build_freshness_checks_for_assets( assets=assets, deadline_cron=deadline_cron, timezone=timezone, severity=severity, asset_property_enforcement_lambda=lambda assets_def: check.invariant( isinstance(assets_def.partitions_def, TimeWindowPartitionsDefinition), "Asset is not time-window partitioned.", ), )