Asset health monitoring
In this example, we'll explore different approaches to monitoring critical (Tier-0) assets in Dagster. When you have assets that power downstream business processes, you need to ensure they are successfully materialized, passing data quality checks, and fresh according to defined policies.
Problem: Monitoring critical data assets
Imagine you have a set of critical data assets that require monitoring. Without a monitoring strategy, you'd have to check each asset individually in the UI, manually verify freshness across multiple assets, and piece together health status from scattered logs.
To monitor critical data assets, you can use freshness policies, asset checks, or health monitoring.
| Strategy | Use when |
|---|---|
| Freshness policies |
|
| Asset checks |
|
| Health monitoring asset |
|
To get notified when assets become stale or fail checks, use alert policies (Dagster+) or sensors to react to run or asset status.
Strategy 1: Freshness policies for staleness detection
Freshness policies define acceptable staleness thresholds for your assets. Dagster automatically tracks whether assets are fresh and displays status in the UI.
Benefits:
- Dagster automatically monitors time since asset materialization.
- Assets have built-in freshness status badges in the UI.
Drawbacks:
- No built-in history of freshness status.
@dg.asset(
group_name="risk",
tags={"tier": "tier-0", "criticality": "high"},
description="Market risk calculations and exposure metrics",
freshness_policy=dg.FreshnessPolicy.time_window(
fail_window=timedelta(hours=1),
warn_window=timedelta(minutes=45),
),
)
def market_risk_data(context: dg.AssetExecutionContext) -> dict:
data = {
"timestamp": datetime.now().isoformat(),
"var_95": random.uniform(1_000_000, 5_000_000),
"portfolio_value": random.uniform(100_000_000, 500_000_000),
}
context.log.info(f"Market risk VaR: ${data['var_95']:,.2f}")
return data
@dg.asset_check(asset=market_risk_data, description="Validates market risk data quality")
def market_risk_data_quality(context: dg.AssetExecutionContext) -> dg.AssetCheckResult:
passed = random.random() > 0.1
if passed:
return dg.AssetCheckResult(
passed=True,
metadata={
"validation_time": datetime.now().isoformat(),
"checks_passed": "VaR within limits, positions validated",
},
)
else:
return dg.AssetCheckResult(
passed=False,
severity=dg.AssetCheckSeverity.ERROR,
metadata={
"validation_time": datetime.now().isoformat(),
"error": "VaR exceeds risk limits",
},
)
Strategy 2: Asset checks for data quality
Asset checks provide the following:
- Validation scope: Per-asset data quality rules
- Severity levels: ERROR (blocking) or WARN (non-blocking)
- Execution: Runs after materialization or on-demand
- Aggregated view: Check results visible per-asset, not aggregated
Asset checks validate data quality after materialization, providing pass/fail results with configurable severity levels. You can define multiple assets with different freshness requirements and paired checks.
@dg.asset(
group_name="security_master",
tags={"tier": "tier-0", "criticality": "high"},
description="Security master data - pricing and reference data for all instruments",
freshness_policy=dg.FreshnessPolicy.time_window(
fail_window=timedelta(minutes=30),
warn_window=timedelta(minutes=20),
),
)
def security_master_data(context: dg.AssetExecutionContext) -> dict:
data = {
"timestamp": datetime.now().isoformat(),
"total_securities": random.randint(10_000, 50_000),
"updated_prices": random.randint(8_000, 45_000),
}
context.log.info(f"Security master updated: {data['total_securities']} securities")
return data
@dg.asset_check(asset=security_master_data, description="Validates security master completeness")
def security_master_completeness(context: dg.AssetExecutionContext) -> dg.AssetCheckResult:
passed = random.random() > 0.15
if passed:
return dg.AssetCheckResult(
passed=True,
metadata={
"validation_time": datetime.now().isoformat(),
"coverage": "99.8%",
},
)
else:
return dg.AssetCheckResult(
passed=False,
severity=dg.AssetCheckSeverity.ERROR,
metadata={
"validation_time": datetime.now().isoformat(),
"error": "Missing critical security prices",
"coverage": "94.2%",
},
)
@dg.asset(
group_name="risk",
tags={"tier": "tier-0", "criticality": "high"},
description="Credit risk metrics and counterparty exposures",
freshness_policy=dg.FreshnessPolicy.time_window(
fail_window=timedelta(minutes=90),
warn_window=timedelta(hours=1),
),
)
def credit_risk_data(context: dg.AssetExecutionContext) -> dict:
data = {
"timestamp": datetime.now().isoformat(),
"total_exposure": random.uniform(50_000_000, 200_000_000),
"high_risk_counterparties": random.randint(0, 5),
}
context.log.info(f"Credit exposure: ${data['total_exposure']:,.2f}")
return data
@dg.asset_check(asset=credit_risk_data, description="Validates credit risk calculations")
def credit_risk_limits(context: dg.AssetExecutionContext) -> dg.AssetCheckResult:
passed = random.random() > 0.12
if passed:
return dg.AssetCheckResult(
passed=True,
metadata={
"validation_time": datetime.now().isoformat(),
"exposure_within_limits": True,
},
)
else:
return dg.AssetCheckResult(
passed=False,
severity=dg.AssetCheckSeverity.ERROR,
metadata={
"validation_time": datetime.now().isoformat(),
"error": "Counterparty exposure exceeds limits",
},
)
Strategy 3: Aggregated health monitoring asset
For centralized monitoring with scheduled execution, create a dedicated asset that queries the Dagster instance and aggregates health across all critical assets. This approach combines the benefits of freshness policies and asset checks into a single, scheduled health report.
Step 1: Define a health check function
First, define a function that examines three dimensions of health for each asset:
def get_asset_health(
context: dg.AssetExecutionContext, asset_key: dg.AssetKey
) -> tuple[str, dict[str, Any]]:
instance = context.instance
details: dict[str, Any] = {
"materialization_status": "unknown",
"asset_checks": [],
"freshness_status": "unknown",
"last_materialized": None,
}
has_errors = False
has_warnings = False
try:
latest_mat = instance.get_latest_materialization_event(asset_key)
if latest_mat:
details["last_materialized"] = datetime.fromtimestamp(latest_mat.timestamp).isoformat()
details["materialization_status"] = "success"
else:
details["materialization_status"] = "never_materialized"
has_warnings = True
except Exception as e:
details["materialization_status"] = f"error: {e!s}"
has_errors = True
try:
from dagster._core.storage.event_log.base import EventRecordsFilter
records = instance.get_event_records(
event_records_filter=EventRecordsFilter(
event_type=DagsterEventType.ASSET_CHECK_EVALUATION,
asset_key=asset_key,
),
limit=100,
ascending=False,
)
for record in records:
if not record.event_log_entry or not record.event_log_entry.dagster_event:
continue
event = record.event_log_entry.dagster_event
check_data = event.asset_check_evaluation_data
if any(c["check_name"] == check_data.check_name for c in details["asset_checks"]):
continue
check_info = {
"check_name": check_data.check_name,
"passed": check_data.passed,
"severity": str(getattr(check_data, "severity", "ERROR")),
}
details["asset_checks"].append(check_info)
if not check_data.passed:
if check_info["severity"] == "AssetCheckSeverity.WARN":
has_warnings = True
else:
has_errors = True
except Exception as e:
details["asset_checks"] = [{"error": str(e)}]
context.log.warning(f"Error checking asset checks for {asset_key}: {e}")
try:
asset_graph = context.repository_def.asset_graph
if asset_graph.has(asset_key):
node = asset_graph.get(asset_key)
if hasattr(node, "freshness_policy") and node.freshness_policy:
policy = node.freshness_policy
if latest_mat and hasattr(policy, "fail_window") and policy.fail_window:
lag = datetime.now().timestamp() - latest_mat.timestamp
fail_secs = policy.fail_window.total_seconds()
warn_secs = policy.warn_window.total_seconds() if policy.warn_window else None
if lag > fail_secs:
details["freshness_status"] = (
f"stale (lag: {lag / 60:.1f}m > fail: {fail_secs / 60:.1f}m)"
)
has_errors = True
elif warn_secs and lag > warn_secs:
details["freshness_status"] = f"warning (lag: {lag / 60:.1f}m)"
has_warnings = True
else:
details["freshness_status"] = f"fresh (lag: {lag / 60:.1f}m)"
elif not latest_mat:
details["freshness_status"] = "no_materialization"
has_errors = True
else:
details["freshness_status"] = "policy_present"
else:
details["freshness_status"] = "no_policy"
except Exception as e:
details["freshness_status"] = f"error: {e!s}"
context.log.warning(f"Error checking freshness for {asset_key}: {e}")
status = "UNHEALTHY" if has_errors else ("WARNING" if has_warnings else "HEALTHY")
return status, details
| Check | How it works | Status impact |
|---|---|---|
| Materialization status | Calls instance.get_latest_materialization_event() to verify the asset has been successfully materialized | WARNING if never materialized |
| Asset check evaluation | Queries the event log for ASSET_CHECK_EVALUATION events and aggregates pass/fail status | UNHEALTHY for errors, WARNING for warnings |
| Freshness calculation | Compares the lag since last materialization against the fail_window and warn_window thresholds | UNHEALTHY if stale, WARNING if approaching |
Step 2: Create a health monitoring asset
Create an asset that uses the health check function from step 1 to aggregate health across all Tier-0 assets. The health monitoring asset iterates through the TIER0_ASSETS list, calls get_asset_health() for each, and produces a structured output with overall_status that downstream processes can consume.
TIER0_ASSETS = [
dg.AssetKey("market_risk_data"),
dg.AssetKey("security_master_data"),
dg.AssetKey("credit_risk_data"),
]
@dg.asset(
description="Aggregated health status of all Tier-0 critical assets",
group_name="monitoring",
)
def tier0_health_status(context: dg.AssetExecutionContext) -> dict[str, Any]:
context.log.info("=" * 60)
context.log.info("TIER-0 ASSET HEALTH REPORT")
context.log.info("=" * 60)
health_results: dict[str, str] = {}
unhealthy_assets: list[str] = []
warning_assets: list[str] = []
asset_details: dict[str, dict[str, Any]] = {}
for asset_key in TIER0_ASSETS:
name = asset_key.to_user_string()
status, details = get_asset_health(context, asset_key)
health_results[name] = status
asset_details[name] = details
if status == "UNHEALTHY":
unhealthy_assets.append(name)
context.log.error(f"❌ {name}: {status}")
if details["materialization_status"] != "success":
context.log.error(f" └─ Materialization: {details['materialization_status']}")
for check in details["asset_checks"]:
if not check.get("passed", True):
context.log.error(f" └─ Check '{check['check_name']}': FAILED")
elif status == "WARNING":
warning_assets.append(name)
context.log.warning(f"⚠️ {name}: {status}")
else:
context.log.info(f"✅ {name}: {status}")
if details["last_materialized"]:
context.log.info(f" └─ Last materialized: {details['last_materialized']}")
context.log.info("=" * 60)
context.log.info(
f"Healthy: {len(health_results) - len(unhealthy_assets) - len(warning_assets)}"
)
context.log.info(f"Warnings: {len(warning_assets)}")
context.log.info(f"Unhealthy: {len(unhealthy_assets)}")
if unhealthy_assets:
context.log.error("Action Required: Review asset health in Dagster UI")
return {
"timestamp": datetime.now().isoformat(),
"total_assets": len(TIER0_ASSETS),
"healthy": len(health_results) - len(unhealthy_assets) - len(warning_assets),
"warnings": len(warning_assets),
"unhealthy": len(unhealthy_assets),
"unhealthy_assets": unhealthy_assets,
"warning_assets": warning_assets,
"health_results": health_results,
"asset_details": asset_details,
"overall_status": "DEGRADED"
if unhealthy_assets
else ("WARNING" if warning_assets else "HEALTHY"),
}
Step 3: Schedule health checks
Finally, schedule asset health monitoring for predictable check times:
tier0_health_check_job = dg.define_asset_job(
name="tier0_health_check_job",
selection=dg.AssetSelection.assets(tier0_health_status),
description="Checks the health of all Tier-0 critical assets",
)
start_of_day_health_check = dg.ScheduleDefinition(
name="start_of_day_health_check",
job=tier0_health_check_job,
cron_schedule="0 8 * * *",
description="Start-of-day health check for all Tier-0 assets",
)
end_of_day_health_check = dg.ScheduleDefinition(
name="end_of_day_health_check",
job=tier0_health_check_job,
cron_schedule="0 18 * * *",
description="End-of-day health check for all Tier-0 assets",
)
hourly_health_check = dg.ScheduleDefinition(
name="hourly_health_check",
job=tier0_health_check_job,
cron_schedule="0 * * * *",
description="Hourly health check for all Tier-0 assets",
)
You can trigger alerts when the health monitoring asset's overall_status indicates an issue by using alert policies (Dagster+) or sensors that react to the asset's materialization result.