A freshness check is a type of asset check that allows you identify Dagster assets that are overdue for a data refresh.
Freshness checks don't depend on a specific root cause, which makes them helpful in accounting for unknowns. For example, you can use freshness checks to identify stale assets caused by:
The pipeline hitting an error and failing
Runs were never scheduled
A backed up run queue
Runs are taking longer than expected to complete
For example, let's say your team uses a dashboard as part of their daily stand up. You determine that the assets powering the dashboard should be materialized no later than 7AM to ensure the data is up-to-date. If a freshness check runs at that time and the assets haven't been updated, the check will fail. The freshness check ensures you know when data doesn't arrive as expected, needing to know the specific cause.
By the end of this guide, you'll understand what freshness checks are, how to implement them in your data pipelines, and how to be alerted when your assets are overdue.
An asset is considered fresh if it's been materialized or observed within a defined time window. Otherwise, assets are considered overdue and in need of an update. To identify these assets, you can use freshness checks.
Freshness checks can also communicate SLAs for their data freshness. For example, downstream asset consumers can look at the checks that are defined on those assets to determine when and how often they’re expected to be updated.
To determine if an asset is overdue, a freshness check will use the asset's latest update time. How Dagster determines the last update time varies on the type of asset being targeted:
For source assets, which are assets whose data feeds a Dagster pipeline, freshness checks depend on asset observations to report the time the asset was last updated. In this guide, we'll demonstrate how to use observable source assets and a schedule to achieve this.
For materializable assets, which are assets materialized by Dagster, Dagster infers the asset's last update time using the its latest materialization timestamp. In this guide, we'll demonstrate how to use @asset-decorated assets and a sensor to achieve this.
In this section, we'll demonstrate how to implement freshness checks for source assets. Source assets are assets whose data feeds a Dagster pipeline, but aren't materialized by Dagster.
To run freshness checks on source assets, the checks need to know when the source assets were last updated. Observable source assets can be used to track the update times of these assets.
The following example implements this using the @multi_observable_source_asset decorator. In it, multiple Snowflake tables are backed by an observation function that queries Snowflake to find the most recent time the tables were updated. The function yields the update time as metadata to be stored in the Dagster event log:
from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps
from dagster import(
AssetSpec,
MetadataValue,
ObserveResult,
multi_observable_source_asset,)
TABLE_SCHEMA ="PUBLIC"
table_names =["charges","customers"]
asset_specs =[AssetSpec(table_name)for table_name in table_names]@multi_observable_source_asset(specs=asset_specs)defsource_tables(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
freshness_results = fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=table_names,
schema=TABLE_SCHEMA,)for table_name, last_updated in freshness_results.items():yield ObserveResult(
asset_key=table_name,
metadata={"dagster/last_updated_timestamp": MetadataValue.timestamp(
last_updated
)},)
Next, we'll define a schedule that regularly executes the function in the source_tables observable source asset:
from dagster import AssetSelection, ScheduleDefinition, define_asset_job
source_tables_observation_schedule = ScheduleDefinition(
job=define_asset_job("source_tables_observation_job",
selection=AssetSelection.assets(source_tables),),# Runs every minute. Usually, a much less frequent cadence is necessary,# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",)
When the code location is loaded and the schedule is turned on, it will automatically kick off runs to observe the asset.
In our example, we expect the source tables to be updated no less than every two hours. We'll use the build_freshness_checks_for_non_partitioned_assets function to produce a set of asset checks that fail if an asset’s last_updated_timestamp is more than two hours before the current time:
from datetime import timedelta
from dagster import build_last_update_freshness_checks
source_table_freshness_checks = build_last_update_freshness_checks(
assets=[source_tables],
lower_bound_delta=timedelta(hours=2),)
These checks will automatically execute after the observations of the source assets they target, so an additional schedule isn't needed.
A Dagster+ Pro plan is required to use this feature.
Setting custom freshness policies on a large number of assets can be time-consuming. Dagster+ Pro users can take advantage of a time series anomaly detection model instead of applying policies on an asset-by-asset basis. Freshness checks that use this approach function the same way checks with hardcoded parameters do.
This model uses data from past materializations/observations to determine if data is arriving later than expected. Note: If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies.
In the following example, we'll use build_anomaly_detection_freshness_checks to accomplish this:
from dagster_cloud.anomaly_detection import build_anomaly_detection_freshness_checks
freshness_checks = build_anomaly_detection_freshness_checks(
assets=[source_tables], params=None)
The last step is to include the asset, checks, schedule, and resource in a Definitions object. This enables Dagster tools to load everything we've defined:
At this point, the finished code should look like this:
from datetime import timedelta
from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps
from dagster import(
AssetSelection,
AssetSpec,
Definitions,
EnvVar,
MetadataValue,
ObserveResult,
ScheduleDefinition,
build_last_update_freshness_checks,
define_asset_job,
multi_observable_source_asset,)
TABLE_SCHEMA ="PUBLIC"
table_names =["charges","customers"]
asset_specs =[AssetSpec(table_name)for table_name in table_names]@multi_observable_source_asset(specs=asset_specs)defsource_tables(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
freshness_results = fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=table_names,
schema=TABLE_SCHEMA,)for table_name, last_updated in freshness_results.items():yield ObserveResult(
asset_key=table_name,
metadata={"dagster/last_updated_timestamp": MetadataValue.timestamp(
last_updated
)},)
source_tables_observation_schedule = ScheduleDefinition(
job=define_asset_job("source_tables_observation_job",
selection=AssetSelection.assets(source_tables),),# Runs every minute. Usually, a much less frequent cadence is necessary,# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",)
source_table_freshness_checks = build_last_update_freshness_checks(
assets=[source_tables],
lower_bound_delta=timedelta(hours=2),)
defs = Definitions(
assets=[source_tables],
asset_checks=source_table_freshness_checks,
schedules=[source_tables_observation_schedule],
resources={"snowflake": SnowflakeResource(
user=EnvVar("SNOWFLAKE_USER"),
account=EnvVar("SNOWFLAKE_ACCOUNT"),
password=EnvVar("SNOWFLAKE_PASSWORD"),)},)
Defining freshness checks for materializable assets#
In this section, we'll demonstrate how to implement freshness checks for materializable assets. These are assets that are materialized by a Dagster pipeline.
In this example, we'll use the build_last_update_freshness_checks function to produce an asset check. This check will fail if an asset’s latest materialization is more than two hours before the current time:
from datetime import timedelta
from dagster import asset, build_last_update_freshness_checks
@assetdefmy_asset():...
asset1_freshness_checks = build_last_update_freshness_checks(
assets=[my_asset], lower_bound_delta=timedelta(hours=2))
A schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails.
In this example, we'll use build_sensor_for_freshness_checks to create a sensor to automatically run the check. Based on the check's parameters and last run time, the sensor will run the check when enough time has elapsed that the asset might fail the check:
from dagster import build_sensor_for_freshness_checks
freshness_checks_sensor = build_sensor_for_freshness_checks(
freshness_checks=asset1_freshness_checks
)
In Dagster+, you can set up alerts to notify you when assets are overdue for an update. By default, freshness checks will fail with a severity of WARN, but you can override this to fail with ERROR.
To alert on overdue assets, create an alert policy with the following settings:
Alert policy type - Asset alert
Target - The asset keys or groups you've defined checks for. You can also target all assets.
Events - Under Asset checks, check the box for the severity you've defined for failed checks: Failed (WARN) or Failed (ERROR)