Asset Checks
Dagster allows you to define and execute checks on your software-defined assets. Each asset check verifies some property of a data asset, e.g. that is has no null values in a particular column.
- @dagster.asset_check
Create a definition for how to execute an asset check.
- asset (Union[AssetKey, Sequence[str], str, AssetsDefinition, SourceAsset]) – The asset that the check applies to.
- name (Optional[str]) – The name of the check. If not specified, the name of the decorated function will be used. Checks for the same asset must have unique names.
- description (Optional[str]) – The description of the check.
- blocking (bool) – When enabled, runs that include this check and any downstream assets that depend on asset will wait for this check to complete before starting the downstream assets. If the check fails with severity AssetCheckSeverity.ERROR, then the downstream assets won’t execute.
- additional_ins (Optional[Mapping[str, AssetIn]]) – A mapping from input name to information about the input. These inputs will apply to the underlying op that executes the check. These should not include the asset parameter, which is always included as a dependency.
- additional_deps (Optional[Iterable[CoercibleToAssetDep]]) – Assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. These dependencies will apply to the underlying op that executes the check. These should not include the asset parameter, which is always included as a dependency.
- required_resource_keys (Optional[Set[str]]) – A set of keys for resources that are required by the function that execute the check. These can alternatively be specified by including resource-typed parameters in the function signature.
- config_schema (Optional[ConfigSchema) – The configuration schema for the check’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that executes the check. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
- compute_kind (Optional[str]) – A string to represent the kind of computation that executes the check, e.g. “dbt” or “spark”.
- retry_policy (Optional[RetryPolicy]) – The retry policy for the op that executes the check.
- metadata (Optional[Mapping[str, Any]]) – A dictionary of static metadata for the check.
- automation_condition (Optional[AutomationCondition]) – An AutomationCondition which determines when this check should be executed.
Produces an
from dagster import asset, asset_check, AssetCheckResult
def my_asset() -> None:
@asset_check(asset=my_asset, description="Check that my asset has enough rows")
def my_asset_has_enough_rows() -> AssetCheckResult:
num_rows = ...
return AssetCheckResult(passed=num_rows > 5, metadata={"num_rows": num_rows})Example with a DataFrame Output:
from dagster import asset, asset_check, AssetCheckResult
from pandas import DataFrame
def my_asset() -> DataFrame:
@asset_check(asset=my_asset, description="Check that my asset has enough rows")
def my_asset_has_enough_rows(my_asset: DataFrame) -> AssetCheckResult:
num_rows = my_asset.shape[0]
return AssetCheckResult(passed=num_rows > 5, metadata={"num_rows": num_rows})
dagster.AssetCheckResultThe result of an asset check.
- asset_key (Optional[AssetKey]) – The asset key that was checked.
- check_name (Optional[str]) – The name of the check.
- passed (bool) – The pass/fail result of the check.
- metadata (Optional[Dict[str, RawMetadataValue]]) – Arbitrary metadata about the asset. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.
- severity (AssetCheckSeverity) – Severity of the check. Defaults to ERROR.
- description (Optional[str]) – A text description of the result of the check evaluation.
dagster.AssetCheckSpecDefines information about an asset check, except how to execute it.
AssetCheckSpec is often used as an argument to decorators that decorator a function that can execute multiple checks - e.g. @asset, and @multi_asset. It defines one of the checks that will be executed inside that function.
- name (str) – Name of the check.
- asset (Union[AssetKey, Sequence[str], str, AssetsDefinition, SourceAsset]) – The asset that the check applies to.
- description (Optional[str]) – Description for the check.
- additional_deps (Optional[Iterable[AssetDep]]) – Additional dependencies for the check. The check relies on these assets in some way, but the result of the check only applies to the asset specified by asset. For example, the check may test that asset has matching data with an asset in additional_deps. This field holds both additional_deps and additional_ins passed to @asset_check.
- metadata (Optional[Mapping[str, Any]]) – A dict of static metadata for this asset check.
dagster.AssetCheckSeveritySeverity level for an AssetCheckResult.
- WARN: a potential issue with the asset
- ERROR: a definite issue with the asset
Severity does not impact execution of the asset or downstream assets.
dagster.AssetCheckKeyCheck names are expected to be unique per-asset. Thus, this combination of asset key and check name uniquely identifies an asset check within a deployment.
- @dagster.multi_asset_check
Defines a set of asset checks that can be executed together with the same op.
- specs (Sequence[AssetCheckSpec]) – Specs for the asset checks.
- name (Optional[str]) – The name of the op. If not specified, the name of the decorated function will be used.
- description (Optional[str]) – Description of the op.
- required_resource_keys (Optional[Set[str]]) – A set of keys for resources that are required by the function that execute the checks. These can alternatively be specified by including resource-typed parameters in the function signature.
- config_schema (Optional[ConfigSchema) – The configuration schema for the asset checks’ underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that executes the checks. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
- compute_kind (Optional[str]) – A string to represent the kind of computation that executes the checks, e.g. “dbt” or “spark”.
- retry_policy (Optional[RetryPolicy]) – The retry policy for the op that executes the checks.
- can_subset (bool) – Whether the op can emit results for a subset of the asset checks keys, based on the context.selected_asset_check_keys argument. Defaults to False.
- ins (Optional[Mapping[str, Union[AssetKey, AssetIn]]]) – A mapping from input name to AssetIn depended upon by a given asset check. If an AssetKey is provided, it will be converted to an AssetIn with the same key.
AssetCheckSpec("enough_rows", asset="asset1"),
AssetCheckSpec("no_dupes", asset="asset1"),
AssetCheckSpec("enough_rows", asset="asset2"),
def checks():
yield AssetCheckResult(passed=True, asset_key="asset1", check_name="enough_rows")
yield AssetCheckResult(passed=False, asset_key="asset1", check_name="no_dupes")
yield AssetCheckResult(passed=True, asset_key="asset2", check_name="enough_rows")
- dagster.load_asset_checks_from_modules
Constructs a list of asset checks from the given modules. This is most often used in conjunction with a call to load_assets_from_modules.
- modules (Iterable[ModuleType]) – The Python modules to look for checks inside.
- asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_modules.
Returns: A list containing asset checks defined in the given modules.Return type: Sequence[AssetChecksDefinition]
- dagster.load_asset_checks_from_current_module
Constructs a list of asset checks from the module where this function is called. This is most often used in conjunction with a call to load_assets_from_current_module.
Parameters: asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_current_module.Returns: A list containing asset checks defined in the current module.Return type: Sequence[AssetChecksDefinition]
- dagster.load_asset_checks_from_package_module
Constructs a list of asset checks from all sub-modules of the given package module. This is most often used in conjunction with a call to load_assets_from_package_module.
- package_module (ModuleType) – The Python module to look for checks inside.
- asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_package_module.
Returns: A list containing asset checks defined in the package.Return type: Sequence[AssetChecksDefinition]
- dagster.load_asset_checks_from_package_name
Constructs a list of asset checks from all sub-modules of the given package. This is most often used in conjunction with a call to load_assets_from_package_name.
- package_name (str) – The name of the Python package to look for checks inside.
- asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_package_name.
Returns: A list containing asset checks defined in the package.Return type: Sequence[AssetChecksDefinition]
dagster.AssetChecksDefinitionDefines a set of checks that are produced by the same op or op graph.
AssetChecksDefinition should not be instantiated directly, but rather produced using the @asset_check decorator or AssetChecksDefinition.create method.
- dagster.build_last_update_freshness_checks
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
Constructs an AssetChecksDefinition that checks the freshness of the provided assets.
This check passes if the asset is found to be “fresh”, and fails if the asset is found to be “overdue”. An asset is considered fresh if a record (i.e. a materialization or observation) exists with a timestamp greater than the “lower bound” derived from the parameters of this function.
deadline_cron is a cron schedule that defines the deadline for when we should expect the asset to arrive by; if not provided, we consider the deadline to be the execution time of the check. lower_bound_delta is a timedelta that defines the lower bound for when a record could have arrived by. If the most recent recent record’s timestamp is earlier than deadline-lower_bound_delta, the asset is considered overdue.
Let’s use two examples, one with a deadline_cron set and one without. Let’s say I have an asset which runs on a schedule every day at 8:00 AM UTC, and usually takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the deadline_cron to “0 9 * * *”, and the lower_bound_delta to “45 minutes”. This would mean that starting at 9:00 AM, this check will expect a materialization record to have been created no earlier than 8:15 AM. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we’ll instead be checking for the most recently passed deadline, which is yesterday. Let’s say I have an observable source asset on a data source which I expect should never be more than 3 hours out of date. In this case, there’s no fixed schedule for when the data should be updated, so I would not provide a deadline_cron. Instead, I would set the lower_bound_delta parameter to “3 hours”. This would mean that the check will expect the most recent observation record to indicate data no older than 3 hours, relative to the current time, regardless of when it runs.
The check result will contain the following metadata: “dagster/freshness_params”: A dictionary containing the parameters used to construct the check “dagster/last_updated_time”: The time of the most recent update to the asset “dagster/overdue_seconds”: (Only present if asset is overdue) The number of seconds that the asset is overdue by. “dagster/overdue_deadline_timestamp”: The timestamp that we are expecting the asset to have arrived by. In the case of a provided deadline_cron, this is the timestamp of the most recent tick of the cron schedule. In the case of no deadline_cron, this is the current time.
# Example 1: Assets that are expected to be updated every day within 45 minutes of
# 9:00 AM UTC
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_daily_scheduled_assets_def
checks_def = build_last_update_freshness_checks(
[my_daily_scheduled_assets_def, AssetKey("my_other_daily_asset_key")],
deadline_cron="0 9 * * *",
# Example 2: Assets that are expected to be updated within 3 hours of the current time
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_observable_source_asset
checks_def = build_last_update_freshness_checks(
[my_observable_source_asset, AssetKey("my_other_observable_asset_key")],
- assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]) – The assets to construct checks for. All checks are incorporated into the same AssetChecksDefinition, which can be subsetted to run checks for specific assets.
- lower_bound_delta (datetime.timedelta) – The check will pass if the asset was updated within lower_bound_delta of the current_time (no cron) or the most recent tick of the cron (cron provided).
- deadline_cron (Optional[str]) – Defines the deadline for when we should start checking that the asset arrived. If not provided, the deadline is the execution time of the check.
- timezone (Optional[str]) – The timezone to use when calculating freshness and deadline. If not provided, defaults to “UTC”.
Returns: AssetChecksDefinition objects which execute freshness checks for the provided assets.
Return type: Sequence[AssetChecksDefinition]
- dagster.build_time_partition_freshness_checks
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
Construct an AssetChecksDefinition that checks the freshness of the provided assets.
This check passes if the asset is considered “fresh” by the time that execution begins. We consider an asset to be “fresh” if there exists a record for the most recent partition, once the deadline has passed.
deadline_cron is a cron schedule that defines the deadline for when we should expect the most recent partition to arrive by. Once a tick of the cron schedule has passed, this check will fail if the most recent partition has not been observed/materialized.
Let’s say I have a daily-partitioned asset which runs every day at 8:00 AM UTC, and takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the deadline_cron to “0 9 * * *”. This means that starting at 9:00 AM, this check will expect a record to exist for the previous day’s partition. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we’ll instead be checking for the most recently passed deadline, which is yesterday (meaning the partition representing the day before yesterday).
The timestamp of an observation record is the timestamp indicated by the “dagster/last_updated_timestamp” metadata key. The timestamp of a materialization record is the timestamp at which that record was created.
The check will fail at runtime if a non-time-window partitioned asset is passed in.
The check result will contain the following metadata: “dagster/freshness_params”: A dictionary containing the parameters used to construct the check. “dagster/last_updated_time”: (Only present if the asset has been observed/materialized before) The time of the most recent update to the asset. “dagster/overdue_seconds”: (Only present if asset is overdue) The number of seconds that the asset is overdue by. “dagster/overdue_deadline_timestamp”: The timestamp that we are expecting the asset to have arrived by. This is the timestamp of the most recent tick of the cron schedule.
from dagster import build_time_partition_freshness_checks, AssetKey
# A daily partitioned asset that is expected to be updated every day within 45 minutes
# of 9:00 AM UTC
from .somewhere import my_daily_scheduled_assets_def
checks_def = build_time_partition_freshness_checks(
deadline_cron="0 9 * * *",
- assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]) – The assets to construct checks for. For each passed in asset, there will be a corresponding constructed AssetChecksDefinition.
- deadline_cron (str) – The check will pass if the partition time window most recently completed by the time of the last cron tick has been observed/materialized.
- timezone (Optional[str]) – The timezone to use when calculating freshness and deadline. If not provided, defaults to “UTC”.
Returns: AssetChecksDefinition objects which execute freshness checks for the provided assets.
Return type: Sequence[AssetChecksDefinition]
- dagster.build_sensor_for_freshness_checks
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
Builds a sensor which kicks off evaluation of freshness checks.
This sensor will kick off an execution of a check in the following cases:
- The check has never been executed before.
- The check has been executed before, and the previous result was a success, but it is again possible for the check to be overdue based on the dagster/fresh_until_timestamp metadata on the check result.
Note that we will not execute if:
- The freshness check has been executed before, and the previous result was a failure. This is because whichever run materializes/observes the run to bring the check back to a passing state will end up also running the check anyway, so until that run occurs, there’s no point in evaluating the check.
- The freshness check has been executed before, and the previous result was a success, but it is not possible for the check to be overdue based on the dagster/fresh_until_timestamp metadata on the check result. Since the check cannot be overdue, we know the check result would not change with an additional execution.
- freshness_checks (Sequence[AssetChecksDefinition]) – The freshness checks to evaluate.
- minimum_interval_seconds (Optional[int]) – The duration in seconds between evaluations of the sensor.
- name (Optional[str]) – The name of the sensor. Defaults to “freshness_check_sensor”, but a name may need to be provided in case of multiple calls of this function.
- default_status (Optional[DefaultSensorStatus]) – The default status of the sensor. Defaults to stopped.
- tags (Optional[Dict[str, Any]]) – A dictionary of tags (string key-value pairs) to attach to the launched run.
Returns: The sensor that kicks off freshness evaluations.Return type: SensorDefinition
- dagster.build_column_schema_change_checks
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
Returns asset checks that pass if the column schema of the asset’s latest materialization is the same as the column schema of the asset’s previous materialization.
The underlying materializations are expected to have a metadata entry with key dagster/column_schema and type
. To learn more about how to add column schema metadata and other forms of tabular metadata to assets, see resulting checks will fail if any changes are detected in the column schema between materializations, including:
- Added columns
- Removed columns
- Changes to column types
The check failure message will detail exactly what changed in the schema.
- assets (Sequence[Union[AssetKey, str, AssetsDefinition, SourceAsset]]) – The assets to create asset checks for.
- severity (AssetCheckSeverity) – The severity if the check fails. Defaults to WARN.
Returns: Sequence[AssetsChecksDefinition] Examples:
First, define an asset with column schema metadata. You can attach schema metadata either as definition metadata (when schema is known at definition time) or as materialization metadata (when schema is only known at runtime):
import dagster as dg
# Using definition metadata when schema is known upfront
def people_table():
column_names = ...
column_types = ...
columns = [
dg.TableColumn(name, column_type)
for name, column_type in zip(column_names, column_types)
yield dg.MaterializeResult(
metadata={"dagster/column_schema": dg.TableSchema(columns=columns)}
)Once you have assets with column schema metadata, you can create schema change checks to monitor for changes in the schema between materializations:
# Create schema change checks for one or more assets
schema_checks = dg.build_column_schema_change_checks(
- dagster.build_metadata_bounds_checks
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
Returns asset checks that pass if the metadata value of the asset’s latest materialization is within the specified range.
- assets (Sequence[Union[AssetKey, str, AssetsDefinition, SourceAsset]]) – The assets to create asset checks for.
- severity (AssetCheckSeverity) – The severity if the check fails. Defaults to WARN.
- metadata_key (str) – The metadata key to check.
- min_value (Optional[Union[int, float]]) – The minimum value to check for. If None, no minimum value check is performed.
- max_value (Optional[Union[int, float]]) – The maximum value to check for. If None, no maximum value check is performed.
- exclusive_min (bool) – If True, the check will fail if the metadata value is equal to min_value. Defaults to False.
- exclusive_max (bool) – If True, the check will fail if the metadata value is equal to max_value. Defaults to False.
Returns: Sequence[AssetsChecksDefinition]