Source code for dagster_dbt.freshness_builder
import datetime
from typing import TYPE_CHECKING, Any, Dict, Mapping, Sequence
from dagster import (
AssetsDefinition,
_check as check,
)
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_factories.freshness_checks.last_update import (
build_last_update_freshness_checks,
)
from dagster._core.definitions.asset_check_factories.freshness_checks.time_partition import (
build_time_partition_freshness_checks,
)
from dagster._core.definitions.asset_check_factories.utils import (
DEFAULT_FRESHNESS_SEVERITY,
asset_to_keys_iterable,
assets_to_keys,
ensure_no_duplicate_assets,
)
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.errors import DagsterInvariantViolationError
if TYPE_CHECKING:
from dagster import AssetKey
from dagster_dbt.asset_utils import (
get_asset_keys_to_resource_props,
get_manifest_and_translator_from_dbt_assets,
)
[docs]
@experimental
def build_freshness_checks_from_dbt_assets(
*,
dbt_assets: Sequence[AssetsDefinition],
) -> Sequence[AssetChecksDefinition]:
"""Returns a sequence of freshness checks constructed from the provided dbt assets.
Freshness checks can be configured on a per-model basis in the model schema configuration.
For assets which are not partitioned based on time, the freshness check configuration mirrors
that of the :py:func:`build_last_update_freshness_checks` function. `lower_bound_delta` is provided in
terms of seconds, and `deadline_cron` is optional.
For time-partitioned assets, the freshness check configuration mirrors that of the
:py:func:`build_time_partition_freshness_checks` function.
Below is example of configuring a non-time-partitioned dbt asset with a freshness check.
This code would be placed in the schema.yml file for the dbt model.
.. code-block:: YAML
models:
- name: customers
...
meta:
dagster:
freshness_check:
lower_bound_delta_seconds: 86400 # 1 day
deadline_cron: "0 0 * * *" # Optional
severity: "WARN" # Optional, defaults to "WARN"
Below is an example of configuring a time-partitioned dbt asset with a freshness check.
This code would be placed in the schema.yml file for the dbt model.
.. code-block:: yaml
models:
- name: customers
...
meta:
dagster:
freshness_check:
deadline_cron: "0 0 * * *"
severity: "WARN" # Optional, defaults to "WARN"
Args:
dbt_assets (Sequence[AssetsDefinition]): A sequence of dbt assets to construct freshness
checks from.
Returns:
Sequence[AssetChecksDefinition]: A sequence of asset checks definitions representing the
freshness checks for the provided dbt assets.
"""
freshness_checks = []
dbt_assets = check.sequence_param(dbt_assets, "dbt_assets", AssetsDefinition)
ensure_no_duplicate_assets(dbt_assets)
asset_key_to_assets_def: Dict[AssetKey, AssetsDefinition] = {}
asset_key_to_resource_props: Mapping[AssetKey, Mapping[str, Any]] = {}
for assets_def in dbt_assets:
manifest, translator = get_manifest_and_translator_from_dbt_assets([assets_def])
asset_key_to_resource_props_for_def = get_asset_keys_to_resource_props(manifest, translator)
for asset_key in asset_to_keys_iterable(assets_def):
if asset_key not in asset_key_to_resource_props_for_def:
raise DagsterInvariantViolationError(
f"Could not find dbt resource properties for asset key {asset_key.to_user_string()}."
)
asset_key_to_assets_def[asset_key] = assets_def
asset_key_to_resource_props[asset_key] = asset_key_to_resource_props_for_def[asset_key]
for asset_key in assets_to_keys(dbt_assets):
dbt_resource_props = asset_key_to_resource_props[asset_key]
assets_def = asset_key_to_assets_def[asset_key]
dagster_metadata = dbt_resource_props.get("meta", {}).get("dagster", {})
freshness_check_config = dagster_metadata.get("freshness_check", {})
if not freshness_check_config:
continue
severity_str = freshness_check_config.get("severity")
severity = AssetCheckSeverity(severity_str) if severity_str else DEFAULT_FRESHNESS_SEVERITY
if assets_def.partitions_def and isinstance(
assets_def.partitions_def, TimeWindowPartitionsDefinition
):
freshness_checks.extend(
build_time_partition_freshness_checks(
assets=[asset_key],
deadline_cron=freshness_check_config.get("deadline_cron"),
severity=severity,
)
)
else:
try:
lower_bound_delta_seconds_str = freshness_check_config["lower_bound_delta_seconds"]
lower_bound_seconds = int(lower_bound_delta_seconds_str)
except:
raise DagsterInvariantViolationError(
"lower_bound_delta_seconds must be provided, and parseable as an integer "
"in the freshness_check config for a non-time-partitioned asset."
)
freshness_checks.extend(
build_last_update_freshness_checks(
assets=[translator.get_asset_key(dbt_resource_props)],
deadline_cron=freshness_check_config.get("deadline_cron"),
lower_bound_delta=datetime.timedelta(seconds=lower_bound_seconds),
severity=severity,
)
)
return freshness_checks