Ask AI

Using asset checks to check data freshness#

A freshness check is a type of asset check that allows you identify Dagster assets that are overdue for a data refresh.

As freshness checks don't depend on a specific root cause, it makes them helpful in accounting for unknowns. For example, freshness checks can identify stale assets caused by any of the following:

  • The pipeline hitting an error and failing
  • Runs were never scheduled
  • A backed up run queue
  • Runs are taking longer than expected to complete

By the end of this guide, you'll understand what freshness checks are and how to implement them in your data pipelines.


How it works#

An asset is considered fresh if it's been materialized or observed within a defined time window. Otherwise, assets are considered overdue and in need of an update. To identify these assets, you can use freshness checks.

Let's say members of your team review a dashboard every morning as part of their daily stand up. To ensure they have the most up-to-date information, assets powering the dashboard should be materialized no later than 7AM. Using a freshness check, we can define the time that the asset should be updated by. If the check runs at that time and the asset hasn't been updated, the asset will be considered overdue and the check will fail.

Additionally, freshness checks can help communicate SLAs for their data freshness. For example, downstream asset consumers can look at the checks that are defined on those assets to determine when and how often they’re expected to be updated.

Freshness checks use an asset's latest update time to determine if it's overdue. How Dagster determines the last update time varies on the type of asset being targeted:

  • For source assets, which are assets whose data feeds a Dagster pipeline, freshness checks depend on asset observations to report the time the asset was last updated. In this guide, we'll demonstrate how to use observable source assets and a schedule to achieve this.

  • For materializable assets, which are assets materialized by Dagster, Dagster infers the asset's last update time using the its latest materialization timestamp. In this guide, we'll demonstrate how to use @asset-decorated assets and a sensor to achieve this.


Prerequisites#

Before continuing, you should be familiar with:


Defining freshness checks for source assets#

In this section, we'll demonstrate how to implement freshness checks for source assets. Source assets are assets whose data feeds a Dagster pipeline, but aren't materialized by Dagster.

Step 1: Track the asset's last update time#

To run freshness checks on source assets, the checks need to know when the source assets were last updated. Observable source assets can be used to track the update times of these assets.

The following example implements this using the @multi_observable_source_asset decorator. In it, multiple Snowflake tables are backed by an observation function that queries Snowflake to find the most recent time the tables were updated. The function yields the update time as metadata to be stored in the Dagster event log:

from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps

from dagster import (
    AssetSpec,
    MetadataValue,
    ObserveResult,
    multi_observable_source_asset,
)

TABLE_SCHEMA = "PUBLIC"
table_names = ["charges", "customers"]
asset_specs = [AssetSpec(table_name) for table_name in table_names]


@multi_observable_source_asset(specs=asset_specs)
def source_tables(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        freshness_results = fetch_last_updated_timestamps(
            snowflake_connection=conn.cursor(),
            tables=table_names,
            schema=TABLE_SCHEMA,
        )
        for table_name, last_updated in freshness_results.items():
            yield ObserveResult(
                asset_key=table_name,
                metadata={
                    "dagster/last_updated_timestamp": MetadataValue.timestamp(
                        last_updated
                    )
                },
            )

Step 2: Schedule the observations#

Next, we'll define a schedule that regularly executes the function in the source_tables observable source asset:

from dagster import AssetSelection, ScheduleDefinition, define_asset_job

source_tables_observation_schedule = ScheduleDefinition(
    job=define_asset_job(
        "source_tables_observation_job",
        selection=AssetSelection.assets(source_tables),
    ),
    # Runs every minute. Usually, a much less frequent cadence is necessary,
    # but a short cadence makes it easier to play around with this example.
    cron_schedule="* * * * *",
)

When the code location is loaded and the schedule is turned on, it will automatically kick off runs to observe the asset.

Step 3: Define the freshness check#

Option 1: Use hardcoded parameters#

In our example, we expect the source tables to be updated no less than every two hours. We'll use the build_freshness_checks_for_non_partitioned_assets function to produce a set of asset checks that fail if an asset’s last_updated_timestamp is more than two hours before the current time:

from datetime import timedelta

from dagster import build_last_update_freshness_checks

source_table_freshness_checks = build_last_update_freshness_checks(
    assets=[source_tables],
    lower_bound_delta=timedelta(hours=2),
)

These checks will automatically execute after the observations of the source assets they target, so an additional schedule isn't needed.

Step 4: Create a Definitions object#

The last step is to include the asset, checks, schedule, and resource in a Definitions object. This enables Dagster tools to load everything we've defined:

from dagster import Definitions, EnvVar

defs = Definitions(
    assets=[source_tables],
    asset_checks=[source_table_freshness_checks],
    schedules=[source_tables_observation_schedule],
    resources={
        "snowflake": SnowflakeResource(
            user=EnvVar("SNOWFLAKE_USER"),
            account=EnvVar("SNOWFLAKE_ACCOUNT"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
        )
    },
)

At this point, the finished code should look like this:

from datetime import timedelta

from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps

from dagster import (
    AssetSelection,
    AssetSpec,
    Definitions,
    EnvVar,
    MetadataValue,
    ObserveResult,
    ScheduleDefinition,
    build_last_update_freshness_checks,
    define_asset_job,
    multi_observable_source_asset,
)

TABLE_SCHEMA = "PUBLIC"
table_names = ["charges", "customers"]
asset_specs = [AssetSpec(table_name) for table_name in table_names]


@multi_observable_source_asset(specs=asset_specs)
def source_tables(snowflake: SnowflakeResource):
    with snowflake.get_connection() as conn:
        freshness_results = fetch_last_updated_timestamps(
            snowflake_connection=conn.cursor(),
            tables=table_names,
            schema=TABLE_SCHEMA,
        )
        for table_name, last_updated in freshness_results.items():
            yield ObserveResult(
                asset_key=table_name,
                metadata={
                    "dagster/last_updated_timestamp": MetadataValue.timestamp(
                        last_updated
                    )
                },
            )


source_tables_observation_schedule = ScheduleDefinition(
    job=define_asset_job(
        "source_tables_observation_job",
        selection=AssetSelection.assets(source_tables),
    ),
    # Runs every minute. Usually, a much less frequent cadence is necessary,
    # but a short cadence makes it easier to play around with this example.
    cron_schedule="* * * * *",
)


source_table_freshness_checks = build_last_update_freshness_checks(
    assets=[source_tables],
    lower_bound_delta=timedelta(hours=2),
)


defs = Definitions(
    assets=[source_tables],
    asset_checks=[source_table_freshness_checks],
    schedules=[source_tables_observation_schedule],
    resources={
        "snowflake": SnowflakeResource(
            user=EnvVar("SNOWFLAKE_USER"),
            account=EnvVar("SNOWFLAKE_ACCOUNT"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
        )
    },
)

From here, you can view and manually execute the checks in the Dagster UI.


Defining freshness checks for materializable assets#

In this section, we'll demonstrate how to implement freshness checks for materializable assets. These are assets that are materialized by a Dagster pipeline.

Step 1: Define the freshness check#

In this example, we'll use the build_last_update_freshness_checks function to produce an asset check. This check will fail if an asset’s latest materialization is more than two hours before the current time:

from datetime import timedelta

from dagster import asset, build_last_update_freshness_checks


@asset
def my_asset(): ...


asset1_freshness_checks = build_last_update_freshness_checks(
    assets=[my_asset], lower_bound_delta=timedelta(hours=2)
)

Step 2: Create a sensor to execute the check#

A schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails.

In this example, we'll use build_sensor_for_freshness_checks to create a sensor to automatically run the check. Based on the check's parameters and last run time, the sensor will run the check when enough time has elapsed that the asset might fail the check:

from dagster import build_sensor_for_freshness_checks

freshness_checks_sensor = build_sensor_for_freshness_checks(
    freshness_checks=[asset1_freshness_checks]
)

Step 3: Create a Definitions object#

The last step is to include the asset, check, and sensor in a Definitions object. This enables Dagster tools to load everything we've defined:

from dagster import Definitions

defs = Definitions(
    assets=[my_asset],
    asset_checks=[asset1_freshness_checks],
    sensors=[freshness_checks_sensor],
)

At this point, the finished code should look like this:

from datetime import timedelta

from dagster import (
    Definitions,
    asset,
    build_last_update_freshness_checks,
    build_sensor_for_freshness_checks,
)


@asset
def my_asset(): ...


asset1_freshness_checks = build_last_update_freshness_checks(
    assets=[my_asset], lower_bound_delta=timedelta(hours=2)
)
freshness_checks_sensor = build_sensor_for_freshness_checks(
    freshness_checks=[asset1_freshness_checks]
)
defs = Definitions(
    assets=[my_asset],
    asset_checks=[asset1_freshness_checks],
    sensors=[freshness_checks_sensor],
)

From here, you can view and manually execute the checks in the Dagster UI.


Freshness checks in the Dagster UI#

To view a freshness check, navigate to the Asset details page for the asset and click the Checks tab.

This page allows you to see metadata about the check and its latest execution. You can also manually execute the check by clicking the Execute button.

The Asset Checks page, opened to a freshness check in the Dagster UI

APIs in this guide#

NameDescription
@asset_checkA decorator used to define asset checks that execute in their own op.
build_last_update_freshness_checksA function that constructs a freshness check definition for each provided asset
build_time_partition_freshness_checksA function that constructs a freshness check definition for each provided time-window partitioned asset
build_sensor_for_freshness_checksBuilds a sensor that triggers evaluation of freshness checks
ObserveResultAn object representing a successful observation of an asset. Used with observable source assets to pass metadata.