Ask AI

Source code for dagster._core.definitions.freshness_checks.last_update

import datetime
from typing import Optional, Sequence, Union

from dagster._annotations import experimental
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity

from ..asset_checks import AssetChecksDefinition
from ..assets import AssetsDefinition, SourceAsset
from ..events import CoercibleToAssetKey
from .shared_builder import build_freshness_checks_for_assets
from .utils import (
    DEFAULT_FRESHNESS_SEVERITY,
    DEFAULT_FRESHNESS_TIMEZONE,
)


[docs]@experimental def build_last_update_freshness_checks( *, assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]], lower_bound_delta: datetime.timedelta, deadline_cron: Optional[str] = None, timezone: str = DEFAULT_FRESHNESS_TIMEZONE, severity: AssetCheckSeverity = DEFAULT_FRESHNESS_SEVERITY, ) -> AssetChecksDefinition: r"""Constructs an `AssetChecksDefinition` that checks the freshness of the provided assets. This check passes if the asset is found to be "fresh", and fails if the asset is found to be "overdue". An asset is considered fresh if a record (i.e. a materialization or observation) exists with a timestamp greater than the "lower bound" derived from the parameters of this function. `deadline_cron` is a cron schedule that defines the deadline for when we should expect the asset to arrive by; if not provided, we consider the deadline to be the execution time of the check. `lower_bound_delta` is a timedelta that defines the lower bound for when a record could have arrived by. If the most recent recent record's timestamp is earlier than `deadline-lower_bound_delta`, the asset is considered overdue. Let's use two examples, one with a deadline_cron set and one without. Let's say I have an asset which runs on a schedule every day at 8:00 AM UTC, and usually takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the `deadline_cron` to "0 9 \* \* \*", and the `lower_bound_delta` to "45 minutes". This would mean that starting at 9:00 AM, this check will expect a materialization record to have been created no earlier than 8:15 AM. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we'll instead be checking for the most recently passed deadline, which is yesterday. Let's say I have an observable source asset on a data source which I expect should never be more than 3 hours out of date. In this case, there's no fixed schedule for when the data should be updated, so I would not provide a `deadline_cron`. Instead, I would set the `lower_bound_delta` parameter to "3 hours". This would mean that the check will expect the most recent observation record to indicate data no older than 3 hours, relative to the current time, regardless of when it runs. The check result will contain the following metadata: "dagster/freshness_params": A dictionary containing the parameters used to construct the check "dagster/last_updated_time": The time of the most recent update to the asset "dagster/overdue_seconds": (Only present if asset is overdue) The number of seconds that the asset is overdue by. "dagster/overdue_deadline_timestamp": The timestamp that we are expecting the asset to have arrived by. In the case of a provided deadline_cron, this is the timestamp of the most recent tick of the cron schedule. In the case of no deadline_cron, this is the current time. Examples: .. code-block:: python # Example 1: Assets that are expected to be updated every day within 45 minutes of # 9:00 AM UTC from dagster import build_last_update_freshness_checks, AssetKey from .somewhere import my_daily_scheduled_assets_def checks_def = build_last_update_freshness_checks( [my_daily_scheduled_assets_def, AssetKey("my_other_daily_asset_key")], lower_bound_delta=datetime.timedelta(minutes=45), deadline_cron="0 9 * * *", ) # Example 2: Assets that are expected to be updated within 3 hours of the current time from dagster import build_last_update_freshness_checks, AssetKey from .somewhere import my_observable_source_asset checks_def = build_last_update_freshness_checks( [my_observable_source_asset, AssetKey("my_other_observable_asset_key")], lower_bound_delta=datetime.timedelta(hours=3), ) Args: assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]): The assets to construct checks for. All checks are incorporated into the same `AssetChecksDefinition`, which can be subsetted to run checks for specific assets. lower_bound_delta (datetime.timedelta): The check will pass if the asset was updated within lower_bound_delta of the current_time (no cron) or the most recent tick of the cron (cron provided). deadline_cron (Optional[str]): Defines the deadline for when we should start checking that the asset arrived. If not provided, the deadline is the execution time of the check. timezone (Optional[str]): The timezone to use when calculating freshness and deadline. If not provided, defaults to "UTC". Returns: AssetChecksDefinition: An `AssetChecksDefinition` object, which can execute a freshness check for all provided assets. """ return build_freshness_checks_for_assets( assets=assets, deadline_cron=deadline_cron, timezone=timezone, severity=severity, lower_bound_delta=lower_bound_delta, )