Ask AI

Defining and executing asset checks#

After creating some Software-defined Assets, you may want to automate checks on the assets that test for data quality.

In this guide, we'll show you a few approaches to defining asset checks, how to use check results to include helpful information, and how to execute checks.


Defining asset checks#

There are a few ways you can define an asset check:

  • Separately from the assets the checks target - In this approach, asset materialization and asset checks are executed in their own separate ops. If using the multiprocess_executor, this allows you to launch runs that will use separate processes to materialize the asset and execute its check.
  • Together with assets - In this approach, checks execute in the same op that materializes the asset.
  • Using an asset check factory - This approach allows you to define multiple, similar asset checks at once
  • Loading dbt tests into Dagster - This approach allows you to model your dbt tests as asset checks

Defining checks separately from assets#

In this example, we'll demonstrate how to define separate functions for an asset and the corresponding check.

The following code defines an asset named orders and an asset check named orders_id_has_no_nulls. When executed, the check verifies that all values in the orders asset's primary key column are unique.

import pandas as pd

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")


@asset_check(asset=orders)
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    return AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(
    assets=[orders],
    asset_checks=[orders_id_has_no_nulls],
)

The @asset_check decorator decorates the orders_id_has_no_nulls function which returns an AssetCheckResult object.

The orders_id_has_no_nulls check runs in its own op. That means that if you launch a run that does all of the following, the check will execute in a separate process from the process that materializes the asset:

  1. Materializes the orders asset,
  2. Executes the orders_id_has_no_nulls check, and
  3. You're using the multiprocess_executor

Defining checks and assets together#

Sometimes, it makes sense for a single function to materialize an asset and execute a check on it.

In this example, we'll demonstrate how to do this by using the check_specs argument. This argument is available when using the @asset or @multi_asset decorators. Each provided AssetCheckSpec declares a check that the decorated function should yield an AssetCheckResult for.

import pandas as pd

from dagster import (
    AssetCheckResult,
    AssetCheckSpec,
    AssetExecutionContext,
    Definitions,
    Output,
    asset,
)


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context: AssetExecutionContext):
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

    # save the output and indicate that it's been saved
    orders_df.to_csv("orders")
    yield Output(value=None)

    # check it
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(assets=[orders])

Using a factory#

To define multiple, similar asset checks, use a factory pattern. In the following example, the factory accepts a list of SQL statements and turns them into asset checks:

from typing import Any, Mapping, Sequence

from mock import MagicMock

from dagster import (
    AssetCheckResult,
    AssetChecksDefinition,
    Definitions,
    asset,
    asset_check,
)


@asset
def orders(): ...


@asset
def items(): ...


def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition:
    @asset_check(
        name=check_blob["name"],
        asset=check_blob["asset"],
        required_resource_keys={"db_connection"},
    )
    def _check(context):
        rows = context.resources.db_connection.execute(check_blob["sql"])
        return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)})

    return _check


check_blobs = [
    {
        "name": "orders_id_has_no_nulls",
        "asset": "orders",
        "sql": "select * from orders where order_id is null",
    },
    {
        "name": "items_id_has_no_nulls",
        "asset": "items",
        "sql": "select * from items where item_id is null",
    },
]

defs = Definitions(
    assets=[orders, items],
    asset_checks=[make_check(check_blob) for check_blob in check_blobs],
    resources={"db_connection": MagicMock()},
)

Loading dbt tests as asset checks#

Using the DagsterDbtTranslator, you can model your existing dbt tests as asset checks. Refer to the dbt integration reference for more information.


Using asset check results#

In this section, we'll show you how to use asset check results to:

Customizing the Dagster UI#

Using asset check results, you can display how check-related information displays in the Dagster UI.

Setting severity#

Using AssetCheckSeverity, you can define a severity on check results. The default severity is ERROR.

The severity determines how the check result will display in the UI. For example, if a check fails with ERROR severity, the asset will appear red in the lineage graph in the UI.

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    Definitions,
    asset,
    asset_check,
)


@asset
def my_asset(): ...


@asset_check(asset=my_asset)
def my_check():
    is_serious = ...
    return AssetCheckResult(
        passed=False,
        severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN,
    )


defs = Definitions(assets=[my_asset], asset_checks=[my_check])

Adding metadata#

Including details about a check result can provide helpful context to others who view it in the UI. Using the metadata argument on AssetCheckResult, you can include information about why a check passed or failed.

In the following example, we added num_null_order_ids as metadata to the orders_id_has_no_nulls check:

import pandas as pd

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")


@asset_check(asset=orders, description="Check for null order_ids")
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    return AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
        metadata={
            "num_null_order_ids": int(num_null_order_ids),
        },
    )


defs = Definitions(
    assets=[orders],
    asset_checks=[orders_id_has_no_nulls],
)

There are a variety of types supported via the MetadataValue class. You can view the metadata on the Checks tab of the Asset details page.

Blocking downstream assets#

To block downstream assets from executing when checks fail, set the blocking argument to True in the @asset_check decorator. In the following example, check_upstream_asset will block downstream_asset from executing.

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def upstream_asset():
    pass


@asset_check(asset=upstream_asset, blocking=True)
def check_upstream_asset():
    return AssetCheckResult(passed=False)


@asset(deps=[upstream_asset])
def downstream_asset():
    pass


defs = Definitions(
    assets=[upstream_asset, downstream_asset], asset_checks=[check_upstream_asset]
)

When blocking is enabled, downstream assets will wait to execute until the check completes:

  • Downstream assets will not execute if the check returns a failing AssetCheckResult or raises an exception
  • Downstream assets will execute if the AssetCheckSeverity is set to WARN instead of ERROR

This feature has the following limitations:


Executing checks#

Via the UI#

Materializing an asset from the UI will also execute any checks defined for that asset. To execute a check without materializing the asset, use the Checks tab of the Asset's details page.

Via sensors and schedules#

To define jobs that execute sets of assets and checks, you can use define_asset_job and then trigger the jobs via sensors or schedules. By default, checks are included with the assets they check. You can also define jobs that include only checks, or only assets.

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
    asset,
    asset_check,
    define_asset_job,
)


@asset
def my_asset(): ...


@asset_check(asset=my_asset)
def check_1(): ...


@asset_check(asset=my_asset)
def check_2(): ...


# includes my_asset and both checks
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))


# includes only my_asset
my_asset_only_job = define_asset_job(
    "my_asset_only_job",
    selection=AssetSelection.assets(my_asset).without_checks(),
)

# includes check_1 and check_2, but not my_asset
checks_only_job = define_asset_job(
    "checks_only_job", selection=AssetSelection.checks_for_assets(my_asset)
)

# includes only check_1
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))

# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")

defs = Definitions(
    assets=[my_asset],
    asset_checks=[check_1, check_2],
    jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
    schedules=[basic_schedule],
)

Testing checks#

Refer to the Asset checks section of the Testing documentation for more information.


APIs in this guide#

NameDescription
@asset_checkA decorator used to define asset checks that execute in their own op.
AssetCheckResultThe class returned by asset checks.
AssetCheckSeverityDefines the severity of a given asset check result.
AssetCheckSpecA class that's passed to asset decorators to define checks that execute in the same op as the asset.