Monitor Dynamic Table freshness
Because the Dynamic Tables are virtual — Dagster never executes them — their metadata timeline in the asset catalog never updates on its own. Two complementary tools close this gap: a sensor that continuously records refresh state as observations, and asset checks that give a structured pass/fail signal when a table's scheduling state is unhealthy.
This same sensor does double duty: the refresh state it reads here is also what drives downstream automation on the next page. Monitoring refresh state and triggering on refresh completion are the same problem, so they live in one sensor.
The freshness sensor
The sensor runs every 60 seconds, queries information_schema.dynamic_tables, and emits one AssetObservation per table:
@dg.sensor(
name="dynamic_table_freshness_sensor",
minimum_interval_seconds=60,
asset_selection=dg.AssetSelection.assets("executive_dashboard_report"),
)
def dynamic_table_freshness_sensor(
context: dg.SensorEvaluationContext,
snowflake: SnowflakeResource,
) -> dg.SensorResult | dg.SkipReason:
table_names_sql = ", ".join(f"'{sf_name}'" for _, sf_name in _TABLES)
try:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
f"""
SELECT
name,
scheduling_state,
last_completed_refresh,
TIMESTAMPDIFF('second', last_completed_refresh, CURRENT_TIMESTAMP())
AS seconds_since_refresh
FROM information_schema.dynamic_tables
WHERE UPPER(name) IN ({table_names_sql})
AND schema_name = 'ANALYTICS'
"""
)
rows = cursor.fetchall()
except Exception as exc:
# Network/IO failure is the one place LBYL doesn't apply — skip this tick.
return dg.SkipReason(f"Snowflake query failed: {exc}")
if not rows:
return dg.SkipReason("No dynamic tables found in information_schema")
name_to_key = {sf_name: dagster_key for dagster_key, sf_name in _TABLES}
# Always record refresh state as observations on the virtual assets.
observations = [
dg.AssetObservation(
asset_key=dg.AssetKey(name_to_key[row[0].upper()]),
metadata={
"scheduling_state": dg.MetadataValue.text(str(row[1])),
"last_completed_refresh": dg.MetadataValue.text(str(row[2])),
"seconds_since_refresh": dg.MetadataValue.int(int(row[3] or 0)),
},
)
for row in rows
if row[0].upper() in name_to_key
]
# Snowflake owns the refresh; the dashboard must run only AFTER a refresh lands.
# Trigger off the actual `last_completed_refresh` timestamps, never off source change.
name_to_refresh = {row[0].upper(): row[2] for row in rows if row[0].upper() in name_to_key}
clv_refresh = name_to_refresh.get("CUSTOMER_LIFETIME_VALUE")
rollup_refresh = name_to_refresh.get("DAILY_REVENUE_ROLLUP")
# Cold-start gate (LBYL): a NULL last_completed_refresh means the table has no
# committed data yet. Firing now would read an empty table and go green on nothing.
if clv_refresh is None or rollup_refresh is None:
return dg.SensorResult(
asset_events=observations,
skip_reason="Waiting for both dynamic tables to complete a first refresh.",
cursor=context.cursor,
)
# Composite run key: fire on EITHER table advancing, dedupe identical combined state.
refresh_state = f"{clv_refresh}-{rollup_refresh}"
if refresh_state == context.cursor:
return dg.SensorResult(
asset_events=observations,
skip_reason="No dynamic table refresh since last tick.",
cursor=context.cursor,
)
return dg.SensorResult(
run_requests=[dg.RunRequest(run_key=refresh_state)],
asset_events=observations,
cursor=refresh_state,
)
The sensor returns SensorResult(asset_events=[...], ...). The asset_events carry the AssetObservation objects that populate each virtual asset's metadata timeline in the Dagster UI — scheduling_state, last_completed_refresh, and seconds_since_refresh. The skip_reason surfaces the freshness summary in the sensor tick history on ticks where no downstream run is requested.
Freshness checks on the virtual assets
Observations record state — they don't pass or fail. For a structured health signal, asset checks co-located with the virtual specs query scheduling_state and return a boolean result:
@dg.asset_check(asset="customer_lifetime_value")
def customer_lifetime_value_is_fresh(snowflake: SnowflakeResource) -> dg.AssetCheckResult:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT scheduling_state, last_completed_refresh
FROM information_schema.dynamic_tables
WHERE UPPER(name) = 'CUSTOMER_LIFETIME_VALUE' AND schema_name = 'ANALYTICS'
""")
row = cursor.fetchone()
if not row:
return dg.AssetCheckResult(passed=False, metadata={"error": "table not found"})
state, last_refresh = row
return dg.AssetCheckResult(
passed=state in ("RUNNING", "SUSPENDED"),
metadata={
"scheduling_state": dg.MetadataValue.text(str(state)),
"last_completed_refresh": dg.MetadataValue.text(str(last_refresh)),
},
)
@dg.asset_check(asset="daily_revenue_rollup")
def daily_revenue_rollup_is_fresh(snowflake: SnowflakeResource) -> dg.AssetCheckResult:
with snowflake.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT scheduling_state, last_completed_refresh
FROM information_schema.dynamic_tables
WHERE UPPER(name) = 'DAILY_REVENUE_ROLLUP' AND schema_name = 'ANALYTICS'
""")
row = cursor.fetchone()
if not row:
return dg.AssetCheckResult(passed=False, metadata={"error": "table not found"})
state, last_refresh = row
return dg.AssetCheckResult(
passed=state in ("RUNNING", "SUSPENDED"),
metadata={
"scheduling_state": dg.MetadataValue.text(str(state)),
"last_completed_refresh": dg.MetadataValue.text(str(last_refresh)),
},
)
The check passes when scheduling_state is RUNNING or SUSPENDED — both are valid healthy states for a Snowflake Dynamic Table. Any other state (e.g. FAILED) causes the check to fail, making the problem visible in the Dagster asset catalog as a failed check rather than requiring manual inspection of Snowflake.
This pairing matters for automation, too: a FAILED table's last_completed_refresh simply stops advancing, so the trigger logic on the next page naturally stops firing for it — while the check turns red to surface the failure. The two mechanisms compose without any special-case handling.
Next steps
Continue this example by automating downstream assets.