Skip to main content

Observing streaming processes

In this example, we'll explore how to observe external streaming processes (like Kafka or Azure Event Hub) in Dagster. When you have existing streaming infrastructure managed outside of Dagster, you can still incorporate it into your data lineage, monitor its health, and trigger downstream processing based on stream freshness—all without migrating the stream itself into Dagster.

Problem: Integrating external streaming infrastructure

Imagine your organization has existing Kafka topics or Azure Event Hub streams that are managed by a dedicated streaming team. These streams feed data into your batch processing pipelines, but they live outside Dagster's control. Without proper integration, you face several challenges:

  • Invisible dependencies: Downstream assets depend on stream data, but this relationship isn't visible in your asset graph.
  • Manual monitoring: Stream health (consumer lag, throughput) must be checked separately from your data platform.
  • Schedule-based processing: Batch jobs run on fixed schedules rather than when fresh data is actually available.
  • Inconsistent metadata: Each team documents streams differently, making discovery difficult.

You can incorporate external streams into Dagster's asset graph for visibility, monitoring, and data-driven orchestration in different ways depending on the scenario:

ScenarioPattern
Show stream in asset graphExternal asset
Consistently manage multiple streamsReusable component
Track stream health over timeObservation sensor
Trigger batch processing when data is availableFreshness trigger

Pattern 1: External asset

Use AssetSpec to define an external asset representing a Kafka topic or Event Hub stream. External assets appear in the asset graph but cannot be materialized by Dagster, since they represent infrastructure you don't own. You can include metadata, such as connection details and ownership information, and add kind tags (such as kafka, streaming, and so on) that will appear in the UI.

src/project_mini/defs/observing_streaming_processes/external_kafka_asset.py
import dagster as dg

streaming_insurance_claims = dg.AssetSpec(
key="streaming_insurance_claims",
description="External Kafka stream of insurance claims managed by infrastructure team",
group_name="streaming",
kinds={"kafka", "streaming"},
metadata={
"source": "External Kafka",
"topic": "insurance-claims-v1",
"broker": "kafka.internal.company.com:9092",
"managed_by": "Infrastructure Team",
},
)

Pattern 2: Reusable component

When you have multiple Kafka topics or Event Hub streams to observe, you can create a component to provide a reusable, YAML-configurable pattern. This allows teams to declare new external streams without writing Python code, ensures external streams follow the same pattern with standardized metadata. Additionally, the component validates inputs and provides IDE support.

src/project_mini/defs/observing_streaming_processes/external_kafka_component.py
import dagster as dg


class ExternalKafkaAsset(dg.Component, dg.Model, dg.Resolvable):
"""Reusable component for external Kafka/Event Hub streams."""

asset_key: str
topic: str
broker: str | None = None
event_hub_namespace: str | None = None
description: str | None = None
group_name: str = "default"

def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
external_asset = dg.AssetSpec(
key=self.asset_key,
description=self.description or f"External stream: {self.topic}",
group_name=self.group_name,
kinds={"kafka", "streaming"},
metadata={
"source": "External Kafka/Azure Event Hub",
"topic": self.topic,
"broker": self.broker or "N/A",
"event_hub_namespace": self.event_hub_namespace or "N/A",
"managed_by": "External Infrastructure Team",
},
)
return dg.Definitions(assets=[external_asset])

With this component defined and registered, teams can declare external streams in YAML:

# component.yaml
type: external_kafka_asset
attributes:
asset_key: claims_stream
topic: insurance-claims-v1
broker: kafka.internal.company.com:9092
group_name: streaming

Pattern 3: Observation sensor

Pattern 3: Observation sensor

Create a sensor that monitors the external stream and emits observations. Observations create a time-series record of stream health without materializing the asset.

src/project_mini/defs/observing_streaming_processes/observation_sensor.py
import random

import dagster as dg


@dg.sensor(
name="kafka_observation_sensor",
minimum_interval_seconds=60,
description="Monitors external Kafka stream and emits health observations",
)
def kafka_observation_sensor(context: dg.SensorEvaluationContext):
messages_per_second = 450 + random.randint(-50, 100)
consumer_lag_seconds = random.randint(5, 30)
partition_count = 8
messages_available = random.randint(5000, 15000)
stream_health = "healthy" if consumer_lag_seconds < 60 else "degraded"

context.log.info(
f"Stream health: {messages_per_second} msg/s, "
f"{consumer_lag_seconds}s lag, {messages_available} msgs available"
)

return dg.SensorResult(
asset_events=[
dg.AssetObservation(
asset_key=dg.AssetKey("streaming_insurance_claims"),
metadata={
"messages_per_second": messages_per_second,
"consumer_lag_seconds": consumer_lag_seconds,
"partition_count": partition_count,
"messages_available": messages_available,
"stream_health": stream_health,
"kafka_topic": "insurance-claims-v1",
},
)
]
)

The sensor:

  • Checks stream health: Query Kafka for consumer lag, throughput, and partition status
  • Emits AssetObservation: Records metrics without triggering materialization
  • Runs periodically: Default interval prevents overwhelming the Kafka admin API

In production, you would query actual Kafka metrics using a library like confluent-kafka:

from confluent_kafka.admin import AdminClient

admin = AdminClient({'bootstrap.servers': 'kafka.internal.company.com:9092'})
# Query consumer groups, offsets, and topic metadata

Pattern 4: Freshness trigger

Create a sensor that triggers downstream processing only when fresh data is available. This enables data-driven orchestration instead of fixed schedules.

src/project_mini/defs/observing_streaming_processes/freshness_trigger_sensor.py
import random

import dagster as dg


@dg.asset(group_name="streaming")
def processed_claims(context: dg.AssetExecutionContext):
"""Downstream asset that processes claims when fresh streaming data is available."""
context.log.info("Processing fresh claims from stream...")
return {"records_processed": random.randint(1000, 2000)}


@dg.sensor(
name="streaming_freshness_sensor",
minimum_interval_seconds=120,
description="Triggers downstream processing when fresh streaming data is available",
)
def streaming_freshness_sensor(context: dg.SensorEvaluationContext):
messages_available = random.randint(500, 2000)
consumer_lag = random.randint(5, 45)

context.log.info(f"Checking stream freshness: {messages_available} msgs, {consumer_lag}s lag")

should_trigger = messages_available > 1000 and consumer_lag < 30

if should_trigger:
context.log.info("Fresh data available, triggering downstream processing")
return dg.SensorResult(
run_requests=[
dg.RunRequest(
asset_selection=[dg.AssetKey("processed_claims")],
tags={
"trigger": "fresh_streaming_data",
"messages_available": str(messages_available),
"consumer_lag_seconds": str(consumer_lag),
},
)
]
)
else:
return dg.SensorResult(
run_requests=[],
skip_reason=f"Insufficient fresh data: {messages_available} messages, {consumer_lag}s lag",
)

The sensor triggers runs only when message count and consumer lag meet your thresholds, so processing is driven by actual data availability rather than a fixed schedule. When conditions aren't met, it emits clear skip reasons that make it easier to debug why a run was skipped.

Key considerations

  • Observation vs Materialization: Observations record metadata about external state; they don't create data. Use them for monitoring, not processing.
  • Sensor intervals: Balance between freshness and API load. 60-120 seconds is often appropriate for Kafka monitoring.
  • Stream health thresholds: Customize lag and throughput thresholds based on your SLAs.
  • External asset benefits: Even without materialization, external assets provide lineage visibility, documentation, and enable freshness-based automation.