New service: real-time market data collection with unified event model. Architecture: - Domain events: TradeEvent, QuoteEvent, BookL2Event, HeartbeatEvent - Provider interface: MarketDataProvider ABC with connect/subscribe/stream/close - Async EventBus with fan-out to multiple consumers Providers: - BinanceProvider: public WebSocket (trades + bookTicker), no API key needed, auto-reconnect with exponential backoff, heartbeat timeout detection - AlpacaProvider: IEX real-time data + paper trading auth, dry-run mode when no keys configured (heartbeats only) Consumers: - StorageConsumer: SQLite (via SQLAlchemy async) + JSONL append-only log - MetricsConsumer: Prometheus counters, latency histograms, events/sec gauge - PrintConsumer: sampled structured logging (1/100 events) CLI: python -m app run --provider binance --symbols BTCUSDT,ETHUSDT HTTP: /health, /metrics (Prometheus), /latest?symbol=XXX Tests: 19/19 passed (Binance parse, Alpaca parse, bus smoke tests) Config: pydantic-settings + .env, all secrets via environment variables. Co-authored-by: Cursor <cursoragent@cursor.com>
99 lines
3.0 KiB
Python
99 lines
3.0 KiB
Python
"""
|
||
MetricsConsumer: Prometheus counters + latency histograms.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import time
|
||
|
||
from prometheus_client import Counter, Gauge, Histogram, Summary
|
||
|
||
from app.domain.events import Event, EventType
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Prometheus metrics ─────────────────────────────────────────────────
|
||
|
||
EVENTS_TOTAL = Counter(
|
||
"market_events_total",
|
||
"Total market data events received",
|
||
["provider", "event_type", "symbol"],
|
||
)
|
||
|
||
EVENTS_PER_SECOND = Gauge(
|
||
"market_events_per_second",
|
||
"Approximate events per second",
|
||
["provider"],
|
||
)
|
||
|
||
EXCHANGE_LATENCY = Histogram(
|
||
"market_exchange_latency_ms",
|
||
"Latency from exchange timestamp to receive (ms)",
|
||
["provider"],
|
||
buckets=[1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000],
|
||
)
|
||
|
||
RECV_LATENCY = Summary(
|
||
"market_recv_latency_ns",
|
||
"Internal receive latency (nanoseconds, monotonic)",
|
||
["provider"],
|
||
)
|
||
|
||
GAPS = Counter(
|
||
"market_gaps_total",
|
||
"Number of detected message gaps (heartbeat timeouts)",
|
||
["provider"],
|
||
)
|
||
|
||
|
||
class MetricsConsumer:
|
||
"""
|
||
Computes and exposes Prometheus metrics from the event stream.
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
self._last_ts: dict[str, float] = {} # provider → last time.time()
|
||
self._window_counts: dict[str, int] = {}
|
||
self._window_start: dict[str, float] = {}
|
||
|
||
async def handle(self, event: Event) -> None:
|
||
provider = event.provider
|
||
event_type = event.event_type.value
|
||
symbol = getattr(event, "symbol", "__heartbeat__")
|
||
|
||
# Count
|
||
EVENTS_TOTAL.labels(
|
||
provider=provider,
|
||
event_type=event_type,
|
||
symbol=symbol,
|
||
).inc()
|
||
|
||
# Exchange latency (if ts_exchange available)
|
||
ts_exchange = getattr(event, "ts_exchange", None)
|
||
if ts_exchange is not None:
|
||
latency_ms = (event.ts_recv.timestamp() - ts_exchange.timestamp()) * 1000
|
||
if 0 < latency_ms < 60_000: # sanity: 0–60s
|
||
EXCHANGE_LATENCY.labels(provider=provider).observe(latency_ms)
|
||
|
||
# Internal receive latency
|
||
RECV_LATENCY.labels(provider=provider).observe(event.ts_recv_mono_ns)
|
||
|
||
# Events/sec approximation (1-second window)
|
||
now = time.time()
|
||
if provider not in self._window_start:
|
||
self._window_start[provider] = now
|
||
self._window_counts[provider] = 0
|
||
|
||
self._window_counts[provider] += 1
|
||
elapsed = now - self._window_start[provider]
|
||
|
||
if elapsed >= 1.0:
|
||
eps = self._window_counts[provider] / elapsed
|
||
EVENTS_PER_SECOND.labels(provider=provider).set(eps)
|
||
self._window_start[provider] = now
|
||
self._window_counts[provider] = 0
|
||
|
||
# Gap detection: heartbeat events signal a potential gap
|
||
if event.event_type == EventType.HEARTBEAT:
|
||
GAPS.labels(provider=provider).inc()
|