""" MetricsConsumer: Prometheus counters + latency histograms. """ from __future__ import annotations import logging import time from prometheus_client import Counter, Gauge, Histogram, Summary from app.domain.events import Event, EventType logger = logging.getLogger(__name__) # ── Prometheus metrics ───────────────────────────────────────────────── EVENTS_TOTAL = Counter( "market_events_total", "Total market data events received", ["provider", "event_type", "symbol"], ) EVENTS_PER_SECOND = Gauge( "market_events_per_second", "Approximate events per second", ["provider"], ) EXCHANGE_LATENCY = Histogram( "market_exchange_latency_ms", "Latency from exchange timestamp to receive (ms)", ["provider"], buckets=[1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000], ) RECV_LATENCY = Summary( "market_recv_latency_ns", "Internal receive latency (nanoseconds, monotonic)", ["provider"], ) GAPS = Counter( "market_gaps_total", "Number of detected message gaps (heartbeat timeouts)", ["provider"], ) class MetricsConsumer: """ Computes and exposes Prometheus metrics from the event stream. """ def __init__(self) -> None: self._last_ts: dict[str, float] = {} # provider → last time.time() self._window_counts: dict[str, int] = {} self._window_start: dict[str, float] = {} async def handle(self, event: Event) -> None: provider = event.provider event_type = event.event_type.value symbol = getattr(event, "symbol", "__heartbeat__") # Count EVENTS_TOTAL.labels( provider=provider, event_type=event_type, symbol=symbol, ).inc() # Exchange latency (if ts_exchange available) ts_exchange = getattr(event, "ts_exchange", None) if ts_exchange is not None: latency_ms = (event.ts_recv.timestamp() - ts_exchange.timestamp()) * 1000 if 0 < latency_ms < 60_000: # sanity: 0–60s EXCHANGE_LATENCY.labels(provider=provider).observe(latency_ms) # Internal receive latency RECV_LATENCY.labels(provider=provider).observe(event.ts_recv_mono_ns) # Events/sec approximation (1-second window) now = time.time() if provider not in self._window_start: self._window_start[provider] = now self._window_counts[provider] = 0 self._window_counts[provider] += 1 elapsed = now - self._window_start[provider] if elapsed >= 1.0: eps = self._window_counts[provider] / elapsed EVENTS_PER_SECOND.labels(provider=provider).set(eps) self._window_start[provider] = now self._window_counts[provider] = 0 # Gap detection: heartbeat events signal a potential gap if event.event_type == EventType.HEARTBEAT: GAPS.labels(provider=provider).inc()