Producer (market-data-service):
- Backpressure: smart drop policy (heartbeats→quotes→trades preserved)
- Heartbeat monitor: synthetic HeartbeatEvent on provider silence
- Graceful shutdown: WS→bus→storage→DB engine cleanup sequence
- Bybit V5 public WS provider (backup for Binance, no API key needed)
- FailoverManager: health-based provider switching with recovery
- NATS output adapter: md.events.{type}.{symbol} for SenpAI
- /bus-stats endpoint for backpressure monitoring
- Dockerfile + docker-compose.node1.yml integration
- 36 tests (parsing + bus + failover), requirements.lock
Consumer (senpai-md-consumer):
- NATSConsumer: subscribe md.events.>, queue group senpai-md, backpressure
- State store: LatestState + RollingWindow (deque, 60s)
- Feature engine: 11 features (mid, spread, VWAP, return, vol, latency)
- Rule-based signals: long/short on return+volume+spread conditions
- Publisher: rate-limited features + signals + alerts to NATS
- HTTP API: /health, /metrics, /state/latest, /features/latest, /stats
- 10 Prometheus metrics
- Dockerfile + docker-compose.senpai.yml
- 41 tests (parsing + state + features + rate-limit), requirements.lock
CI: ruff + pytest + smoke import for both services
Tests: 77 total passed, lint clean
Co-authored-by: Cursor <cursoragent@cursor.com>
139 lines
3.9 KiB
Python
139 lines
3.9 KiB
Python
"""
|
|
Test state management — LatestState and RollingWindow.
|
|
"""
|
|
import time
|
|
|
|
|
|
from senpai.md_consumer.state import (
|
|
LatestState,
|
|
RollingWindow,
|
|
TradeRecord,
|
|
)
|
|
from senpai.md_consumer.models import TradeEvent, QuoteEvent
|
|
|
|
|
|
# ── RollingWindow ──────────────────────────────────────────────────────
|
|
|
|
|
|
def test_rolling_window_add_trade():
|
|
w = RollingWindow(window_seconds=60.0)
|
|
t = TradeRecord(price=100.0, size=1.0, ts=time.monotonic())
|
|
w.add_trade(t)
|
|
assert len(w.trades) == 1
|
|
assert w.trades[0].price == 100.0
|
|
|
|
|
|
def test_rolling_window_eviction():
|
|
"""Old records should be evicted."""
|
|
w = RollingWindow(window_seconds=1.0) # 1 second window
|
|
|
|
old_ts = time.monotonic() - 2.0 # 2 seconds ago
|
|
w.add_trade(TradeRecord(price=100.0, size=1.0, ts=old_ts))
|
|
w.add_trade(TradeRecord(price=200.0, size=2.0, ts=time.monotonic()))
|
|
|
|
# Old record should be evicted
|
|
trades = list(w.trades)
|
|
assert len(trades) == 1
|
|
assert trades[0].price == 200.0
|
|
|
|
|
|
def test_rolling_window_trades_since():
|
|
w = RollingWindow(window_seconds=60.0)
|
|
now = time.monotonic()
|
|
|
|
# Add trades at different times
|
|
w.add_trade(TradeRecord(price=100.0, size=1.0, ts=now - 30)) # 30s ago
|
|
w.add_trade(TradeRecord(price=200.0, size=2.0, ts=now - 5)) # 5s ago
|
|
w.add_trade(TradeRecord(price=300.0, size=3.0, ts=now)) # now
|
|
|
|
last_10s = w.trades_since(10.0)
|
|
assert len(last_10s) == 2 # 5s ago + now
|
|
assert last_10s[0].price == 200.0
|
|
|
|
|
|
def test_rolling_window_empty():
|
|
w = RollingWindow(window_seconds=60.0)
|
|
assert len(w.trades) == 0
|
|
assert len(w.quotes) == 0
|
|
assert w.trades_since(10.0) == []
|
|
|
|
|
|
# ── LatestState ────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_latest_state_update_trade():
|
|
state = LatestState(window_seconds=60.0)
|
|
|
|
event = TradeEvent(
|
|
provider="binance",
|
|
symbol="BTCUSDT",
|
|
price=70500.0,
|
|
size=1.5,
|
|
side="buy",
|
|
)
|
|
state.update_trade(event)
|
|
|
|
latest = state.get_latest_trade("BTCUSDT")
|
|
assert latest is not None
|
|
assert latest.price == 70500.0
|
|
assert latest.side == "buy"
|
|
assert state.event_count == 1
|
|
|
|
|
|
def test_latest_state_update_quote():
|
|
state = LatestState(window_seconds=60.0)
|
|
|
|
event = QuoteEvent(
|
|
provider="binance",
|
|
symbol="BTCUSDT",
|
|
bid=70000.0,
|
|
ask=70001.0,
|
|
bid_size=5.0,
|
|
ask_size=3.0,
|
|
)
|
|
state.update_quote(event)
|
|
|
|
latest = state.get_latest_quote("BTCUSDT")
|
|
assert latest is not None
|
|
assert latest.bid == 70000.0
|
|
assert latest.ask == 70001.0
|
|
|
|
|
|
def test_latest_state_symbols():
|
|
state = LatestState(window_seconds=60.0)
|
|
|
|
state.update_trade(TradeEvent(
|
|
provider="binance", symbol="BTCUSDT", price=100.0, size=1.0
|
|
))
|
|
state.update_quote(QuoteEvent(
|
|
provider="binance", symbol="ETHUSDT",
|
|
bid=2000.0, ask=2001.0, bid_size=1.0, ask_size=1.0,
|
|
))
|
|
|
|
assert "BTCUSDT" in state.symbols
|
|
assert "ETHUSDT" in state.symbols
|
|
|
|
|
|
def test_latest_state_to_dict():
|
|
state = LatestState(window_seconds=60.0)
|
|
|
|
state.update_trade(TradeEvent(
|
|
provider="binance", symbol="BTCUSDT", price=70500.0, size=1.0
|
|
))
|
|
state.update_quote(QuoteEvent(
|
|
provider="binance", symbol="BTCUSDT",
|
|
bid=70000.0, ask=70001.0, bid_size=1.0, ask_size=1.0,
|
|
))
|
|
|
|
d = state.to_dict("BTCUSDT")
|
|
assert d["symbol"] == "BTCUSDT"
|
|
assert "latest_trade" in d
|
|
assert "latest_quote" in d
|
|
assert d["latest_trade"]["price"] == 70500.0
|
|
|
|
|
|
def test_latest_state_missing_symbol():
|
|
state = LatestState(window_seconds=60.0)
|
|
assert state.get_latest_trade("NOPE") is None
|
|
assert state.get_latest_quote("NOPE") is None
|