Data contracts with asset checks
Data contracts define agreements about the structure, format, and quality of data, ensuring consistency and reliability across your data pipeline. Without formal contracts, schema changes such as renamed columns, type changes, or missing fields can cause failures in downstream systems. These issues are often discovered too late, leading to broken pipelines and unreliable data.
In Dagster, you can implement data contracts using asset checks that validate the actual schema against a predefined contract specification.
Getting started
To implement data contracts using asset checks, follow these general steps:
- Define your assets with schema metadata: Configure assets to automatically extract and attach column schema metadata
- Create a contract specification: Define the expected schema, types, and validation rules in a YAML file
- Define an asset check: Create an asset check that validates the actual schema against the contract
- Pass the asset check to the
Definitions
object: Asset checks must be added toDefinitions
for Dagster to recognize them - View validation results in the UI: Contract validation results will appear in the UI when the check runs
Defining assets with schema metadata
import pandas as pd
from dagster_pandas.data_frame import create_table_schema_metadata_from_dataframe
import dagster as dg
@dg.asset
def shipments(context: dg.AssetExecutionContext) -> pd.DataFrame:
"""Example shipments asset with column schema metadata."""
df = pd.DataFrame(
{
"shipment_id": [1, 2, 3],
"customer_name": pd.Series(
[
"Alice",
"Bob",
"Charlie",
],
dtype="string",
),
"amount": [100.50, 200.75, 150.25],
"ship_date": pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]),
}
)
context.add_output_metadata(
{"dagster/column_schema": create_table_schema_metadata_from_dataframe(df)}
)
return df
The shipments
asset uses create_table_schema_metadata_from_dataframe
from dagster-pandas
to automatically extract and attach column schema metadata. This metadata becomes available for validation by asset checks without requiring manual schema specification.
Defining the contract specification
The data contract is defined in a separate YAML file, making it easy to version control and collaborate across teams:
# data_contracts/shipments_contract.yaml
name: shipments_contract
version: "1.0"
description: "Data contract for shipments table"
schema:
columns:
shipment_id:
type: "int64"
description: "Unique identifier for shipment"
required: true
customer_name:
type: "string"
description: "Name of the customer"
required: true
amount:
type: "float64"
description: "Shipment amount"
required: true
ship_date:
type: "datetime64[ns]"
description: "Date when shipment was made"
required: true
The contract defines expected column names and types, field descriptions, required columns, and includes a version number for tracking changes over time.
Implementing contract validation
An asset check validates the actual asset schema against the data contract:
import pandas as pd
import yaml
import dagster as dg
@dg.asset_check(
asset=shipments, description="Validate shipments schema against data contract"
)
def validate_shipments_data_contract(
context: dg.AssetCheckExecutionContext, shipments: pd.DataFrame
) -> dg.AssetCheckResult:
"""Check that the shipments asset matches the data contract schema."""
# Read the data contract YAML file
try:
with open(dg.file_relative_path(__file__, "shipments_contract.yaml")) as file:
contract = yaml.safe_load(file)
except FileNotFoundError:
return dg.AssetCheckResult(
passed=False, metadata={"error": "Data contract YAML file not found"}
)
# Get the current asset's column schema from metadata
latest_materialization = context.instance.get_latest_materialization_event(
asset_key=dg.AssetKey(["shipments"])
)
if (
not latest_materialization
or not latest_materialization.dagster_event.event_specific_data.materialization.metadata
):
return dg.AssetCheckResult(
passed=False, metadata={"error": "No schema metadata found for asset"}
)
# Extract the TableSchema from metadata
schema_metadata = latest_materialization.dagster_event.event_specific_data.materialization.metadata.get(
"dagster/column_schema"
)
if not schema_metadata:
return dg.AssetCheckResult(
passed=False, metadata={"error": "No column schema metadata found"}
)
# Convert TableSchema to dict for comparison
actual_schema = {col.name: col.type for col in schema_metadata.value.columns}
# Extract expected schema from contract
expected_schema = {}
if "schema" in contract and "columns" in contract["schema"]:
for col_name, col_info in contract["schema"]["columns"].items():
expected_schema[col_name] = col_info.get("type", "unknown")
# Compare schemas
mismatches = []
missing_columns = []
extra_columns = []
# Check for missing columns in actual schema
for col_name in expected_schema:
if col_name not in actual_schema:
missing_columns.append(col_name)
# Check for extra columns in actual schema
for col_name in actual_schema:
if col_name not in expected_schema:
extra_columns.append(col_name)
# Check for type mismatches
for col_name in expected_schema:
if col_name in actual_schema:
if actual_schema[col_name] != expected_schema[col_name]:
mismatches.append(
{
"column": col_name,
"expected": expected_schema[col_name],
"actual": actual_schema[col_name],
}
)
# Determine if check passed
passed = (
len(mismatches) == 0 and len(missing_columns) == 0 and len(extra_columns) == 0
)
# Prepare metadata for the result
result_metadata = {
"expected_schema": expected_schema,
"actual_schema": actual_schema,
"mismatches": mismatches,
"missing_columns": missing_columns,
"extra_columns": extra_columns,
}
return dg.AssetCheckResult(
passed=passed,
metadata=result_metadata,
severity=dg.AssetCheckSeverity.ERROR
if not passed
else dg.AssetCheckSeverity.WARN,
)
The validation process loads the data contract from the YAML file, retrieves the actual schema from the asset's metadata, and compares them to identify type mismatches, missing required columns, and unexpected extra columns. When violations are detected, the check returns detailed results with specific information about each issue.
This approach provides early detection of schema violations, catching issues immediately when assets are materialized rather than waiting for downstream failures. The external YAML contracts serve as explicit documentation that can be version controlled alongside code, facilitating team collaboration and providing a clear communication tool between data producers and consumers.
Viewing contract validation results
When the data contract asset check runs, the results will appear in the Dagster UI. Successful validations indicate that the asset schema matches the contract, while failures provide detailed information about specific violations:
Next steps
- Learn more about asset checks
- Explore unit testing assets and ops