Asset checks (Experimental)#

Dagster allows you to define and execute data quality checks on your Software-defined Assets. Each asset check verifies some property of a data asset, e.g. that there are no null values in a particular column.

When viewing an asset in Dagster’s UI, you can see all of its checks, and whether they’ve passed, failed, or haven’t run.


Relevant APIs#

NameDescription
@asset_checkA decorator used to define asset checks that execute in their own op.
AssetCheckResultThe class returned by asset checks.
AssetCheckSpecA class that's passed to asset decorators to define checks that execute in the same op as the asset.

Defining asset checks#

Asset check that executes in its own op#

The following code defines an asset named orders and an asset check named orders_id_has_no_nulls. When executed, the check verifies a property of the orders asset: that all the values in its primary key column are unique.

import pandas as pd

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")


@asset_check(asset=orders)
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    return AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(
    assets=[orders],
    asset_checks=[orders_id_has_no_nulls],
)

@asset_check decorates the orders_id_has_no_nulls function which returns an AssetCheckResult object.

The orders_id_has_no_nulls check runs in its own op. That means that if you launch a run that materializes the orders asset and also executes the orders_id_has_no_nulls check and you’re using the multiprocess_executor, the check will execute in a separate process from the process that materializes the asset.

Checks that execute in the same op that materializes the asset#

Sometimes, it makes more sense for a single function to both materialize an asset and execute a check on it.

When defining an asset using the @asset or @multi_asset decorators, you can set the check_specs argument. Each provided AssetCheckSpec declares a check that the decorated function should yield an AssetCheckResult for.

import pandas as pd

from dagster import (
    AssetCheckResult,
    AssetCheckSpec,
    AssetExecutionContext,
    Definitions,
    Output,
    asset,
)


@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])
def orders(context: AssetExecutionContext):
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})

    # save the output and indicate that it's been saved
    orders_df.to_csv("orders")
    yield Output(value=None)

    # check it
    num_null_order_ids = orders_df["order_id"].isna().sum()
    yield AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
    )


defs = Definitions(assets=[orders])

Severity#

You can optionally set AssetCheckSeverity on check results. The default severity is ERROR. Severity determines how the check result appears in the UI. If a check fails with ERROR severity, the asset will appear red in the lineage graph.

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    Definitions,
    asset,
    asset_check,
)


@asset
def my_asset():
    ...


@asset_check(asset=my_asset)
def my_check():
    is_serious = ...
    return AssetCheckResult(
        passed=False,
        severity=AssetCheckSeverity.ERROR if is_serious else AssetCheckSeverity.WARN,
    )


defs = Definitions(assets=[my_asset], asset_checks=[my_check])

Adding metadata to check results#

You can add information why a check passed or failed using the metadata argument on AssetCheckResult. We'll add num_null_order_ids as metadata to the orders_id_has_no_nulls example:

import pandas as pd

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    orders_df.to_csv("orders.csv")


@asset_check(asset=orders, description="Check for null order_ids")
def orders_id_has_no_nulls():
    orders_df = pd.read_csv("orders.csv")
    num_null_order_ids = orders_df["order_id"].isna().sum()
    return AssetCheckResult(
        passed=bool(num_null_order_ids == 0),
        metadata={
            "num_null_order_ids": int(num_null_order_ids),
        },
    )


defs = Definitions(
    assets=[orders],
    asset_checks=[orders_id_has_no_nulls],
)

There are a variety of types supported via the MetadataValue class. You can view the metadata on the Checks tab of the Asset details page.

Asset check factories#

If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks.

from typing import Any, Mapping, Sequence

from mock import MagicMock

from dagster import (
    AssetCheckResult,
    AssetChecksDefinition,
    Definitions,
    asset,
    asset_check,
)


@asset
def orders():
    ...


@asset
def items():
    ...


def make_check(check_blob: Mapping[str, str]) -> AssetChecksDefinition:
    @asset_check(
        name=check_blob["name"],
        asset=check_blob["asset"],
        required_resource_keys={"db_connection"},
    )
    def _check(context):
        rows = context.resources.db_connection.execute(check_blob["sql"])
        return AssetCheckResult(passed=len(rows) == 0, metadata={"num_rows": len(rows)})

    return _check


check_blobs = [
    {
        "name": "orders_id_has_no_nulls",
        "asset": "orders",
        "sql": "select * from orders where order_id is null",
    },
    {
        "name": "items_id_has_no_nulls",
        "asset": "items",
        "sql": "select * from items where item_id is null",
    },
]

defs = Definitions(
    assets=[orders, items],
    asset_checks=[make_check(check_blob) for check_blob in check_blobs],
    resources={"db_connection": MagicMock()},
)

Executing checks#

Via the UI#

Materializing an asset from the UI will also execute any checks that are defined for that asset. You can also execute checks without materializing the asset from the Checks tab of the asset’s detail page.

Via sensors and schedules#

You can use define_asset_job to define jobs that execute sets of both assets and checks, and then trigger those jobs via sensors or schedules. By default, checks are included with the assets they check. You can also define jobs that include only checks, or only assets.

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
    asset,
    asset_check,
    define_asset_job,
)


@asset
def my_asset():
    ...


@asset_check(asset=my_asset)
def check_1():
    ...


@asset_check(asset=my_asset)
def check_2():
    ...


# includes my_asset and both checks
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))


# includes only my_asset
my_asset_only_job = define_asset_job(
    "my_asset_only_job",
    selection=AssetSelection.assets(my_asset).without_checks(),
)

# includes check_1 and check_2, but not my_asset
checks_only_job = define_asset_job(
    "checks_only_job", selection=AssetSelection.checks_for_assets(my_asset)
)

# includes only check_1
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))

# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")

defs = Definitions(
    assets=[my_asset],
    asset_checks=[check_1, check_2],
    jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
    schedules=[basic_schedule],
)

Alerting (Dagster Cloud)#

You can configure alerts on asset checks using asset based alert policies.


Limitations#

  • Dagster's UI is tested with a maximum of 1000 checks per asset. It's designed with the expectation that most assets will have fewer than 50 checks. If you have a use case that doesn't fit these limits, you can reach out to discuss.

  • Checks are currently only supported per-asset, not per-partition. See this issue for updates.