Skip to main content

Asset health monitoring

In this example, we'll explore different approaches to monitoring critical (Tier-0) assets in Dagster. When you have assets that power downstream business processes, you need to ensure they are successfully materialized, passing data quality checks, and fresh according to defined policies.

Problem: Monitoring critical data assets

Imagine you have a set of critical data assets that require monitoring. Without a monitoring strategy, you'd have to check each asset individually in the UI, manually verify freshness across multiple assets, and piece together health status from scattered logs.

To monitor critical data assets, you can use freshness policies, asset checks, or health monitoring.

StrategyUse when
Freshness policies
  • You need automatic staleness detection
  • Assets have time-based freshness requirements
  • You want built-in UI indicators for data currency
Asset checks
  • You need to validate data quality after materialization
  • Business rules must be enforced on asset outputs
  • You want pass/fail validation with severity levels
Health monitoring asset
  • You need aggregated health status across multiple assets
  • Scheduled health reports are required at specific times (for example, at the start or end of the day)
  • You want historical tracking of health over time
Alerting

To get notified when assets become stale or fail checks, use alert policies (Dagster+) or sensors to react to run or asset status.

Strategy 1: Freshness policies for staleness detection

Freshness policies define acceptable staleness thresholds for your assets. Dagster automatically tracks whether assets are fresh and displays status in the UI.

Benefits:

  • Dagster automatically monitors time since asset materialization.
  • Assets have built-in freshness status badges in the UI.

Drawbacks:

  • No built-in history of freshness status.
Asset with freshness policy and asset check
@dg.asset(
group_name="risk",
tags={"tier": "tier-0", "criticality": "high"},
description="Market risk calculations and exposure metrics",
freshness_policy=dg.FreshnessPolicy.time_window(
fail_window=timedelta(hours=1),
warn_window=timedelta(minutes=45),
),
)
def market_risk_data(context: dg.AssetExecutionContext) -> dict:
data = {
"timestamp": datetime.now().isoformat(),
"var_95": random.uniform(1_000_000, 5_000_000),
"portfolio_value": random.uniform(100_000_000, 500_000_000),
}
context.log.info(f"Market risk VaR: ${data['var_95']:,.2f}")
return data


@dg.asset_check(asset=market_risk_data, description="Validates market risk data quality")
def market_risk_data_quality(context: dg.AssetExecutionContext) -> dg.AssetCheckResult:
passed = random.random() > 0.1
if passed:
return dg.AssetCheckResult(
passed=True,
metadata={
"validation_time": datetime.now().isoformat(),
"checks_passed": "VaR within limits, positions validated",
},
)
else:
return dg.AssetCheckResult(
passed=False,
severity=dg.AssetCheckSeverity.ERROR,
metadata={
"validation_time": datetime.now().isoformat(),
"error": "VaR exceeds risk limits",
},
)

Strategy 2: Asset checks for data quality

Asset checks provide the following:

  • Validation scope: Per-asset data quality rules
  • Severity levels: ERROR (blocking) or WARN (non-blocking)
  • Execution: Runs after materialization or on-demand
  • Aggregated view: Check results visible per-asset, not aggregated

Asset checks validate data quality after materialization, providing pass/fail results with configurable severity levels. You can define multiple assets with different freshness requirements and paired checks.

Multiple assets with checks
@dg.asset(
group_name="security_master",
tags={"tier": "tier-0", "criticality": "high"},
description="Security master data - pricing and reference data for all instruments",
freshness_policy=dg.FreshnessPolicy.time_window(
fail_window=timedelta(minutes=30),
warn_window=timedelta(minutes=20),
),
)
def security_master_data(context: dg.AssetExecutionContext) -> dict:
data = {
"timestamp": datetime.now().isoformat(),
"total_securities": random.randint(10_000, 50_000),
"updated_prices": random.randint(8_000, 45_000),
}
context.log.info(f"Security master updated: {data['total_securities']} securities")
return data


@dg.asset_check(asset=security_master_data, description="Validates security master completeness")
def security_master_completeness(context: dg.AssetExecutionContext) -> dg.AssetCheckResult:
passed = random.random() > 0.15
if passed:
return dg.AssetCheckResult(
passed=True,
metadata={
"validation_time": datetime.now().isoformat(),
"coverage": "99.8%",
},
)
else:
return dg.AssetCheckResult(
passed=False,
severity=dg.AssetCheckSeverity.ERROR,
metadata={
"validation_time": datetime.now().isoformat(),
"error": "Missing critical security prices",
"coverage": "94.2%",
},
)


@dg.asset(
group_name="risk",
tags={"tier": "tier-0", "criticality": "high"},
description="Credit risk metrics and counterparty exposures",
freshness_policy=dg.FreshnessPolicy.time_window(
fail_window=timedelta(minutes=90),
warn_window=timedelta(hours=1),
),
)
def credit_risk_data(context: dg.AssetExecutionContext) -> dict:
data = {
"timestamp": datetime.now().isoformat(),
"total_exposure": random.uniform(50_000_000, 200_000_000),
"high_risk_counterparties": random.randint(0, 5),
}
context.log.info(f"Credit exposure: ${data['total_exposure']:,.2f}")
return data


@dg.asset_check(asset=credit_risk_data, description="Validates credit risk calculations")
def credit_risk_limits(context: dg.AssetExecutionContext) -> dg.AssetCheckResult:
passed = random.random() > 0.12
if passed:
return dg.AssetCheckResult(
passed=True,
metadata={
"validation_time": datetime.now().isoformat(),
"exposure_within_limits": True,
},
)
else:
return dg.AssetCheckResult(
passed=False,
severity=dg.AssetCheckSeverity.ERROR,
metadata={
"validation_time": datetime.now().isoformat(),
"error": "Counterparty exposure exceeds limits",
},
)

Strategy 3: Aggregated health monitoring asset

For centralized monitoring with scheduled execution, create a dedicated asset that queries the Dagster instance and aggregates health across all critical assets. This approach combines the benefits of freshness policies and asset checks into a single, scheduled health report.

Step 1: Define a health check function

First, define a function that examines three dimensions of health for each asset:

Health check function
def get_asset_health(
context: dg.AssetExecutionContext, asset_key: dg.AssetKey
) -> tuple[str, dict[str, Any]]:
instance = context.instance
details: dict[str, Any] = {
"materialization_status": "unknown",
"asset_checks": [],
"freshness_status": "unknown",
"last_materialized": None,
}
has_errors = False
has_warnings = False

try:
latest_mat = instance.get_latest_materialization_event(asset_key)
if latest_mat:
details["last_materialized"] = datetime.fromtimestamp(latest_mat.timestamp).isoformat()
details["materialization_status"] = "success"
else:
details["materialization_status"] = "never_materialized"
has_warnings = True
except Exception as e:
details["materialization_status"] = f"error: {e!s}"
has_errors = True

try:
from dagster._core.storage.event_log.base import EventRecordsFilter

records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_CHECK_EVALUATION,
asset_key=asset_key,
),
limit=100,
ascending=False,
)
for record in records:
if not record.event_log_entry or not record.event_log_entry.dagster_event:
continue
event = record.event_log_entry.dagster_event
check_data = event.asset_check_evaluation_data
if any(c["check_name"] == check_data.check_name for c in details["asset_checks"]):
continue
check_info = {
"check_name": check_data.check_name,
"passed": check_data.passed,
"severity": str(getattr(check_data, "severity", "ERROR")),
}
details["asset_checks"].append(check_info)
if not check_data.passed:
if check_info["severity"] == "AssetCheckSeverity.WARN":
has_warnings = True
else:
has_errors = True
except Exception as e:
details["asset_checks"] = [{"error": str(e)}]
context.log.warning(f"Error checking asset checks for {asset_key}: {e}")

try:
asset_graph = context.repository_def.asset_graph
if asset_graph.has(asset_key):
node = asset_graph.get(asset_key)
if hasattr(node, "freshness_policy") and node.freshness_policy:
policy = node.freshness_policy
if latest_mat and hasattr(policy, "fail_window") and policy.fail_window:
lag = datetime.now().timestamp() - latest_mat.timestamp
fail_secs = policy.fail_window.total_seconds()
warn_secs = policy.warn_window.total_seconds() if policy.warn_window else None
if lag > fail_secs:
details["freshness_status"] = (
f"stale (lag: {lag / 60:.1f}m > fail: {fail_secs / 60:.1f}m)"
)
has_errors = True
elif warn_secs and lag > warn_secs:
details["freshness_status"] = f"warning (lag: {lag / 60:.1f}m)"
has_warnings = True
else:
details["freshness_status"] = f"fresh (lag: {lag / 60:.1f}m)"
elif not latest_mat:
details["freshness_status"] = "no_materialization"
has_errors = True
else:
details["freshness_status"] = "policy_present"
else:
details["freshness_status"] = "no_policy"
except Exception as e:
details["freshness_status"] = f"error: {e!s}"
context.log.warning(f"Error checking freshness for {asset_key}: {e}")

status = "UNHEALTHY" if has_errors else ("WARNING" if has_warnings else "HEALTHY")
return status, details

CheckHow it worksStatus impact
Materialization statusCalls instance.get_latest_materialization_event() to verify the asset has been successfully materializedWARNING if never materialized
Asset check evaluationQueries the event log for ASSET_CHECK_EVALUATION events and aggregates pass/fail statusUNHEALTHY for errors, WARNING for warnings
Freshness calculationCompares the lag since last materialization against the fail_window and warn_window thresholdsUNHEALTHY if stale, WARNING if approaching

Step 2: Create a health monitoring asset

Create an asset that uses the health check function from step 1 to aggregate health across all Tier-0 assets. The health monitoring asset iterates through the TIER0_ASSETS list, calls get_asset_health() for each, and produces a structured output with overall_status that downstream processes can consume.

Health monitoring asset
TIER0_ASSETS = [
dg.AssetKey("market_risk_data"),
dg.AssetKey("security_master_data"),
dg.AssetKey("credit_risk_data"),
]


@dg.asset(
description="Aggregated health status of all Tier-0 critical assets",
group_name="monitoring",
)
def tier0_health_status(context: dg.AssetExecutionContext) -> dict[str, Any]:
context.log.info("=" * 60)
context.log.info("TIER-0 ASSET HEALTH REPORT")
context.log.info("=" * 60)

health_results: dict[str, str] = {}
unhealthy_assets: list[str] = []
warning_assets: list[str] = []
asset_details: dict[str, dict[str, Any]] = {}

for asset_key in TIER0_ASSETS:
name = asset_key.to_user_string()
status, details = get_asset_health(context, asset_key)

health_results[name] = status
asset_details[name] = details

if status == "UNHEALTHY":
unhealthy_assets.append(name)
context.log.error(f"❌ {name}: {status}")
if details["materialization_status"] != "success":
context.log.error(f" └─ Materialization: {details['materialization_status']}")
for check in details["asset_checks"]:
if not check.get("passed", True):
context.log.error(f" └─ Check '{check['check_name']}': FAILED")
elif status == "WARNING":
warning_assets.append(name)
context.log.warning(f"⚠️ {name}: {status}")
else:
context.log.info(f"✅ {name}: {status}")
if details["last_materialized"]:
context.log.info(f" └─ Last materialized: {details['last_materialized']}")

context.log.info("=" * 60)
context.log.info(
f"Healthy: {len(health_results) - len(unhealthy_assets) - len(warning_assets)}"
)
context.log.info(f"Warnings: {len(warning_assets)}")
context.log.info(f"Unhealthy: {len(unhealthy_assets)}")

if unhealthy_assets:
context.log.error("Action Required: Review asset health in Dagster UI")

return {
"timestamp": datetime.now().isoformat(),
"total_assets": len(TIER0_ASSETS),
"healthy": len(health_results) - len(unhealthy_assets) - len(warning_assets),
"warnings": len(warning_assets),
"unhealthy": len(unhealthy_assets),
"unhealthy_assets": unhealthy_assets,
"warning_assets": warning_assets,
"health_results": health_results,
"asset_details": asset_details,
"overall_status": "DEGRADED"
if unhealthy_assets
else ("WARNING" if warning_assets else "HEALTHY"),
}

Step 3: Schedule health checks

Finally, schedule asset health monitoring for predictable check times:

Health check schedules
tier0_health_check_job = dg.define_asset_job(
name="tier0_health_check_job",
selection=dg.AssetSelection.assets(tier0_health_status),
description="Checks the health of all Tier-0 critical assets",
)

start_of_day_health_check = dg.ScheduleDefinition(
name="start_of_day_health_check",
job=tier0_health_check_job,
cron_schedule="0 8 * * *",
description="Start-of-day health check for all Tier-0 assets",
)

end_of_day_health_check = dg.ScheduleDefinition(
name="end_of_day_health_check",
job=tier0_health_check_job,
cron_schedule="0 18 * * *",
description="End-of-day health check for all Tier-0 assets",
)

hourly_health_check = dg.ScheduleDefinition(
name="hourly_health_check",
job=tier0_health_check_job,
cron_schedule="0 * * * *",
description="Hourly health check for all Tier-0 assets",
)
Optional: Add alerting

You can trigger alerts when the health monitoring asset's overall_status indicates an issue by using alert policies (Dagster+) or sensors that react to the asset's materialization result.