Ask AI

Asset Checks

Dagster allows you to define and execute checks on your software-defined assets. Each asset check verifies some property of a data asset, e.g. that is has no null values in a particular column.

@dagster.asset_check(*, asset, name=None, description=None, blocking=False, additional_ins=None, additional_deps=None, required_resource_keys=None, resource_defs=None, config_schema=None, compute_kind=None, op_tags=None, retry_policy=None, metadata=None)[source]

Create a definition for how to execute an asset check.

Parameters:
  • asset (Union[AssetKey, Sequence[str], str, AssetsDefinition, SourceAsset]) – The asset that the check applies to.

  • name (Optional[str]) – The name of the check. If not specified, the name of the decorated function will be used. Checks for the same asset must have unique names.

  • description (Optional[str]) – The description of the check.

  • blocking (bool) – When enabled, runs that include this check and any downstream assets that depend on asset will wait for this check to complete before starting the downstream assets. If the check fails with severity AssetCheckSeverity.ERROR, then the downstream assets won’t execute.

  • additional_ins (Optional[Mapping[str, AssetIn]]) – A mapping from input name to information about the input. These inputs will apply to the underlying op that executes the check. These should not include the asset parameter, which is always included as a dependency.

  • additional_deps (Optional[Iterable[CoercibleToAssetDep]]) – Assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. These dependencies will apply to the underlying op that executes the check. These should not include the asset parameter, which is always included as a dependency.

  • required_resource_keys (Optional[Set[str]]) – A set of keys for resources that are required by the function that execute the check. These can alternatively be specified by including resource-typed parameters in the function signature.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the check’s underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that executes the check. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that executes the check, e.g. “dbt” or “spark”.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that executes the check.

  • metadata (Optional[Mapping[str, Any]]) – A dictionary of static metadata for the check.

Produces an AssetChecksDefinition object.

Example

from dagster import asset, asset_check, AssetCheckResult

@asset
def my_asset() -> None:
    ...

@asset_check(asset=my_asset, description="Check that my asset has enough rows")
def my_asset_has_enough_rows() -> AssetCheckResult:
    num_rows = ...
    return AssetCheckResult(passed=num_rows > 5, metadata={"num_rows": num_rows})
Example with a DataFrame Output:
from dagster import asset, asset_check, AssetCheckResult
from pandas import DataFrame

@asset
def my_asset() -> DataFrame:
    ...

@asset_check(asset=my_asset, description="Check that my asset has enough rows")
def my_asset_has_enough_rows(my_asset: DataFrame) -> AssetCheckResult:
    num_rows = my_asset.shape[0]
    return AssetCheckResult(passed=num_rows > 5, metadata={"num_rows": num_rows})
class dagster.AssetCheckResult(*, passed, asset_key=None, check_name=None, metadata=None, severity=AssetCheckSeverity.ERROR, description=None)[source]

The result of an asset check.

asset_key

The asset key that was checked.

Type:

Optional[AssetKey]

check_name

The name of the check.

Type:

Optional[str]

passed

The pass/fail result of the check.

Type:

bool

metadata

Arbitrary metadata about the asset. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method.

Type:

Optional[Dict[str, RawMetadataValue]]

severity

Severity of the check. Defaults to ERROR.

Type:

AssetCheckSeverity

description

A text description of the result of the check evaluation.

Type:

Optional[str]

class dagster.AssetCheckSpec(name, *, asset, description=None, additional_deps=None, blocking=False, metadata=None)[source]

Defines information about an asset check, except how to execute it.

AssetCheckSpec is often used as an argument to decorators that decorator a function that can execute multiple checks - e.g. @asset, and @multi_asset. It defines one of the checks that will be executed inside that function.

Parameters:
  • name (str) – Name of the check.

  • asset (Union[AssetKey, Sequence[str], str, AssetsDefinition, SourceAsset]) – The asset that the check applies to.

  • description (Optional[str]) – Description for the check.

  • additional_deps (Optional[Iterable[AssetDep]]) – Additional dependencies for the check. The check relies on these assets in some way, but the result of the check only applies to the asset specified by asset. For example, the check may test that asset has matching data with an asset in additional_deps. This field holds both additional_deps and additional_ins passed to @asset_check.

  • metadata (Optional[Mapping[str, Any]]) – A dict of static metadata for this asset check.

class dagster.AssetCheckSeverity(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Severity level for an AssetCheckResult.

  • WARN: a potential issue with the asset

  • ERROR: a definite issue with the asset

Severity does not impact execution of the asset or downstream assets.

class dagster.AssetCheckKey(asset_key, name)[source]

Check names are expected to be unique per-asset. Thus, this combination of asset key and check name uniquely identifies an asset check within a deployment.

@dagster.multi_asset_check(*, name=None, specs, description=None, can_subset=False, compute_kind=None, op_tags=None, resource_defs=None, required_resource_keys=None, retry_policy=None, config_schema=None)[source]

Defines a set of asset checks that can be executed together with the same op.

Parameters:
  • specs (Sequence[AssetCheckSpec]) – Specs for the asset checks.

  • name (Optional[str]) – The name of the op. If not specified, the name of the decorated function will be used.

  • description (Optional[str]) – Description of the op.

  • required_resource_keys (Optional[Set[str]]) – A set of keys for resources that are required by the function that execute the checks. These can alternatively be specified by including resource-typed parameters in the function signature.

  • config_schema (Optional[ConfigSchema) – The configuration schema for the asset checks’ underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that executes the checks. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • compute_kind (Optional[str]) – A string to represent the kind of computation that executes the checks, e.g. “dbt” or “spark”.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for the op that executes the checks.

  • can_subset (bool) – Whether the op can emit results for a subset of the asset checks keys, based on the context.selected_asset_check_keys argument. Defaults to False.

Examples

@multi_asset_check(
    specs=[
        AssetCheckSpec("enough_rows", asset="asset1"),
        AssetCheckSpec("no_dupes", asset="asset1"),
        AssetCheckSpec("enough_rows", asset="asset2"),
    ],
)
def checks():
    yield AssetCheckResult(passed=True, asset_key="asset1", check_name="enough_rows")
    yield AssetCheckResult(passed=False, asset_key="asset1", check_name="no_dupes")
    yield AssetCheckResult(passed=True, asset_key="asset2", check_name="enough_rows")
dagster.load_asset_checks_from_modules(modules, asset_key_prefix=None)[source]

Constructs a list of asset checks from the given modules. This is most often used in conjunction with a call to load_assets_from_modules.

Parameters:
  • modules (Iterable[ModuleType]) – The Python modules to look for checks inside.

  • asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_modules.

Returns:

A list containing asset checks defined in the given modules.

Return type:

Sequence[AssetChecksDefinition]

dagster.load_asset_checks_from_current_module(asset_key_prefix=None)[source]

Constructs a list of asset checks from the module where this function is called. This is most often used in conjunction with a call to load_assets_from_current_module.

Parameters:

asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_current_module.

Returns:

A list containing asset checks defined in the current module.

Return type:

Sequence[AssetChecksDefinition]

dagster.load_asset_checks_from_package_module(package_module, asset_key_prefix=None)[source]

Constructs a list of asset checks from all sub-modules of the given package module. This is most often used in conjunction with a call to load_assets_from_package_module.

Parameters:
  • package_module (ModuleType) – The Python module to look for checks inside.

  • asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_package_module.

Returns:

A list containing asset checks defined in the package.

Return type:

Sequence[AssetChecksDefinition]

dagster.load_asset_checks_from_package_name(package_name, asset_key_prefix=None)[source]

Constructs a list of asset checks from all sub-modules of the given package. This is most often used in conjunction with a call to load_assets_from_package_name.

Parameters:
  • package_name (str) – The name of the Python package to look for checks inside.

  • asset_key_prefix (Optional[Union[str, Sequence[str]]]) – The prefix for the asset keys targeted by the loaded checks. This should match the key_prefix argument to load_assets_from_package_name.

Returns:

A list containing asset checks defined in the package.

Return type:

Sequence[AssetChecksDefinition]

class dagster.AssetChecksDefinition(*, keys_by_input_name, keys_by_output_name, node_def, partitions_def=None, partition_mappings=None, asset_deps=None, selected_asset_keys=None, can_subset=False, resource_defs=None, group_names_by_key=None, metadata_by_key=None, tags_by_key=None, freshness_policies_by_key=None, auto_materialize_policies_by_key=None, backfill_policy=None, descriptions_by_key=None, check_specs_by_output_name=None, selected_asset_check_keys=None, is_subset=False, owners_by_key=None)[source]

Defines a set of checks that are produced by the same op or op graph.

AssetChecksDefinition should not be instantiated directly, but rather produced using the @asset_check decorator or AssetChecksDefinition.create method.

dagster.build_last_update_freshness_checks(*, assets, lower_bound_delta, deadline_cron=None, timezone='UTC', severity=AssetCheckSeverity.WARN)[source]

experimental This API may break in future versions, even between dot releases.

Constructs an AssetChecksDefinition that checks the freshness of the provided assets.

This check passes if the asset is found to be “fresh”, and fails if the asset is found to be “overdue”. An asset is considered fresh if a record (i.e. a materialization or observation) exists with a timestamp greater than the “lower bound” derived from the parameters of this function.

deadline_cron is a cron schedule that defines the deadline for when we should expect the asset to arrive by; if not provided, we consider the deadline to be the execution time of the check. lower_bound_delta is a timedelta that defines the lower bound for when a record could have arrived by. If the most recent recent record’s timestamp is earlier than deadline-lower_bound_delta, the asset is considered overdue.

Let’s use two examples, one with a deadline_cron set and one without. Let’s say I have an asset which runs on a schedule every day at 8:00 AM UTC, and usually takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the deadline_cron to “0 9 * * *”, and the lower_bound_delta to “45 minutes”. This would mean that starting at 9:00 AM, this check will expect a materialization record to have been created no earlier than 8:15 AM. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we’ll instead be checking for the most recently passed deadline, which is yesterday. Let’s say I have an observable source asset on a data source which I expect should never be more than 3 hours out of date. In this case, there’s no fixed schedule for when the data should be updated, so I would not provide a deadline_cron. Instead, I would set the lower_bound_delta parameter to “3 hours”. This would mean that the check will expect the most recent observation record to indicate data no older than 3 hours, relative to the current time, regardless of when it runs.

The check result will contain the following metadata: “dagster/freshness_params”: A dictionary containing the parameters used to construct the check “dagster/last_updated_time”: The time of the most recent update to the asset “dagster/overdue_seconds”: (Only present if asset is overdue) The number of seconds that the asset is overdue by. “dagster/overdue_deadline_timestamp”: The timestamp that we are expecting the asset to have arrived by. In the case of a provided deadline_cron, this is the timestamp of the most recent tick of the cron schedule. In the case of no deadline_cron, this is the current time.

Examples

# Example 1: Assets that are expected to be updated every day within 45 minutes of
# 9:00 AM UTC
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_daily_scheduled_assets_def

checks_def = build_last_update_freshness_checks(
    [my_daily_scheduled_assets_def, AssetKey("my_other_daily_asset_key")],
    lower_bound_delta=datetime.timedelta(minutes=45),
    deadline_cron="0 9 * * *",
)

# Example 2: Assets that are expected to be updated within 3 hours of the current time
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_observable_source_asset

checks_def = build_last_update_freshness_checks(
    [my_observable_source_asset, AssetKey("my_other_observable_asset_key")],
    lower_bound_delta=datetime.timedelta(hours=3),
)
Parameters:
  • assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]) – The assets to construct checks for. All checks are incorporated into the same AssetChecksDefinition, which can be subsetted to run checks for specific assets.

  • lower_bound_delta (datetime.timedelta) – The check will pass if the asset was updated within lower_bound_delta of the current_time (no cron) or the most recent tick of the cron (cron provided).

  • deadline_cron (Optional[str]) – Defines the deadline for when we should start checking that the asset arrived. If not provided, the deadline is the execution time of the check.

  • timezone (Optional[str]) – The timezone to use when calculating freshness and deadline. If not provided, defaults to “UTC”.

Returns:

An AssetChecksDefinition object, which can execute a freshness check

for all provided assets.

Return type:

AssetChecksDefinition

dagster.build_time_partition_freshness_checks(*, assets, deadline_cron, timezone='UTC', severity=AssetCheckSeverity.WARN)[source]

experimental This API may break in future versions, even between dot releases.

Construct an AssetChecksDefinition that checks the freshness of the provided assets.

This check passes if the asset is considered “fresh” by the time that execution begins. We consider an asset to be “fresh” if there exists a record for the most recent partition, once the deadline has passed.

deadline_cron is a cron schedule that defines the deadline for when we should expect the most recent partition to arrive by. Once a tick of the cron schedule has passed, this check will fail if the most recent partition has not been observed/materialized.

Let’s say I have a daily-partitioned asset which runs every day at 8:00 AM UTC, and takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the deadline_cron to “0 9 * * *”. This means that starting at 9:00 AM, this check will expect a record to exist for the previous day’s partition. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we’ll instead be checking for the most recently passed deadline, which is yesterday (meaning the partition representing the day before yesterday).

The timestamp of an observation record is the timestamp indicated by the “dagster/last_updated_timestamp” metadata key. The timestamp of a materialization record is the timestamp at which that record was created.

The check will fail at runtime if a non-time-window partitioned asset is passed in.

The check result will contain the following metadata: “dagster/freshness_params”: A dictionary containing the parameters used to construct the check. “dagster/last_updated_time”: (Only present if the asset has been observed/materialized before) The time of the most recent update to the asset. “dagster/overdue_seconds”: (Only present if asset is overdue) The number of seconds that the asset is overdue by. “dagster/overdue_deadline_timestamp”: The timestamp that we are expecting the asset to have arrived by. This is the timestamp of the most recent tick of the cron schedule.

Examples

from dagster import build_time_partition_freshness_checks, AssetKey
# A daily partitioned asset that is expected to be updated every day within 45 minutes
# of 9:00 AM UTC
from .somewhere import my_daily_scheduled_assets_def

checks_def = build_time_partition_freshness_checks(
    [my_daily_scheduled_assets_def],
    deadline_cron="0 9 * * *",
)
Parameters:
  • assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]) – The assets to construct checks for. For each passed in asset, there will be a corresponding constructed AssetChecksDefinition.

  • deadline_cron (str) – The check will pass if the partition time window most recently completed by the time of the last cron tick has been observed/materialized.

  • timezone (Optional[str]) – The timezone to use when calculating freshness and deadline. If not provided, defaults to “UTC”.

Returns:

An AssetChecksDefinition object, which can execute a freshness

check for each provided asset.

Return type:

AssetChecksDefinition

dagster.build_sensor_for_freshness_checks(*, freshness_checks, minimum_interval_seconds=None, name='freshness_checks_sensor', default_status=DefaultSensorStatus.STOPPED)[source]

experimental This API may break in future versions, even between dot releases.

Builds a sensor which kicks off evaluation of freshness checks.

The sensor will introspect from the parameters of the passed-in freshness checks how often to run them. IE, if the freshness check is based on a cron schedule, the sensor will request one run of the check per tick of the cron. If the freshness check is based on a maximum lag, the sensor will request one run of the check per interval specified by the maximum lag.

Parameters:
  • freshness_checks (Sequence[AssetChecksDefinition]) – The freshness checks to evaluate.

  • minimum_interval_seconds (Optional[int]) – The duration in seconds between evaluations of the sensor.

  • name (Optional[str]) – The name of the sensor. Defaults to “freshness_check_sensor”, but a name may need to be provided in case of multiple calls of this function.

  • default_status (Optional[DefaultSensorStatus]) – The default status of the sensor. Defaults to stopped.

Returns:

The sensor that evaluates the freshness of the assets.

Return type:

SensorDefinition

dagster.build_column_schema_change_checks(*, assets, severity=AssetCheckSeverity.WARN)[source]

experimental This API may break in future versions, even between dot releases.

Returns asset checks that pass if the the column schema of the asset’s latest materialization is the same as the column schema of the asset’s previous materialization.

Parameters:
Returns:

Sequence[AssetsChecksDefinition]