Files
Apple 09dee24342 feat: MD pipeline — market-data-service hardening + SenpAI NATS consumer
Producer (market-data-service):
- Backpressure: smart drop policy (heartbeats→quotes→trades preserved)
- Heartbeat monitor: synthetic HeartbeatEvent on provider silence
- Graceful shutdown: WS→bus→storage→DB engine cleanup sequence
- Bybit V5 public WS provider (backup for Binance, no API key needed)
- FailoverManager: health-based provider switching with recovery
- NATS output adapter: md.events.{type}.{symbol} for SenpAI
- /bus-stats endpoint for backpressure monitoring
- Dockerfile + docker-compose.node1.yml integration
- 36 tests (parsing + bus + failover), requirements.lock

Consumer (senpai-md-consumer):
- NATSConsumer: subscribe md.events.>, queue group senpai-md, backpressure
- State store: LatestState + RollingWindow (deque, 60s)
- Feature engine: 11 features (mid, spread, VWAP, return, vol, latency)
- Rule-based signals: long/short on return+volume+spread conditions
- Publisher: rate-limited features + signals + alerts to NATS
- HTTP API: /health, /metrics, /state/latest, /features/latest, /stats
- 10 Prometheus metrics
- Dockerfile + docker-compose.senpai.yml
- 41 tests (parsing + state + features + rate-limit), requirements.lock

CI: ruff + pytest + smoke import for both services
Tests: 77 total passed, lint clean
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 11:46:15 -08:00

213 lines
5.9 KiB
Python

"""
Test feature computations — deterministic scenarios.
"""
import pytest
from senpai.md_consumer.features import (
_percentile,
_realized_vol,
_vwap,
check_signal,
compute_features,
)
from senpai.md_consumer.models import QuoteEvent, TradeEvent
from senpai.md_consumer.state import LatestState, TradeRecord
# ── VWAP ───────────────────────────────────────────────────────────────
def test_vwap_basic():
trades = [
TradeRecord(price=100.0, size=10.0, ts=0),
TradeRecord(price=200.0, size=10.0, ts=0),
]
# VWAP = (100*10 + 200*10) / (10+10) = 150
assert _vwap(trades) == 150.0
def test_vwap_weighted():
trades = [
TradeRecord(price=100.0, size=90.0, ts=0),
TradeRecord(price=200.0, size=10.0, ts=0),
]
# VWAP = (100*90 + 200*10) / 100 = 110
assert _vwap(trades) == 110.0
def test_vwap_empty():
assert _vwap([]) is None
def test_vwap_zero_volume():
trades = [TradeRecord(price=100.0, size=0.0, ts=0)]
assert _vwap(trades) is None
# ── Realized volatility ───────────────────────────────────────────────
def test_realized_vol_constant_price():
"""Constant price → 0 volatility."""
trades = [TradeRecord(price=100.0, size=1.0, ts=0) for _ in range(10)]
vol = _realized_vol(trades)
assert vol is not None
assert vol == 0.0
def test_realized_vol_two_prices():
"""Not enough data points → None."""
trades = [
TradeRecord(price=100.0, size=1.0, ts=0),
TradeRecord(price=101.0, size=1.0, ts=0),
]
assert _realized_vol(trades) is None # needs at least 3
def test_realized_vol_positive():
"""Variable prices should give positive volatility."""
trades = [
TradeRecord(price=100.0, size=1.0, ts=0),
TradeRecord(price=102.0, size=1.0, ts=0),
TradeRecord(price=99.0, size=1.0, ts=0),
TradeRecord(price=103.0, size=1.0, ts=0),
]
vol = _realized_vol(trades)
assert vol is not None
assert vol > 0
# ── Percentile ─────────────────────────────────────────────────────────
def test_percentile_basic():
data = [1.0, 2.0, 3.0, 4.0, 5.0]
assert _percentile(data, 50) == 3.0
assert _percentile(data, 0) == 1.0
assert _percentile(data, 100) == 5.0
def test_percentile_p95():
data = list(range(1, 101)) # 1..100
data_float = [float(x) for x in data]
p95 = _percentile(data_float, 95)
assert 95 <= p95 <= 96
# ── Full feature computation ──────────────────────────────────────────
def test_compute_features_with_state():
state = LatestState(window_seconds=60.0)
# Add quote
state.update_quote(QuoteEvent(
provider="binance",
symbol="BTCUSDT",
bid=70000.0,
ask=70002.0,
bid_size=5.0,
ask_size=3.0,
))
# Add some trades
for i in range(5):
state.update_trade(TradeEvent(
provider="binance",
symbol="BTCUSDT",
price=70000.0 + i * 10,
size=1.0,
))
features = compute_features(state, "BTCUSDT")
# Mid
assert features["mid"] == pytest.approx(70001.0)
# Spread
assert features["spread_abs"] == pytest.approx(2.0)
assert features["spread_bps"] is not None
assert features["spread_bps"] > 0
# Trade count
assert features["trade_count_10s"] == 5.0
# Volume
assert features["trade_volume_10s"] == 5.0
# VWAP should be defined
assert features["trade_vwap_10s"] is not None
assert features["trade_vwap_60s"] is not None
def test_compute_features_no_data():
state = LatestState(window_seconds=60.0)
features = compute_features(state, "BTCUSDT")
# All should be None
assert features["mid"] is None
assert features["spread_abs"] is None
assert features["trade_vwap_10s"] is None
# ── Signal detection ──────────────────────────────────────────────────
def test_check_signal_long():
"""Strong positive return + volume + tight spread → long signal."""
features = {
"return_10s": 0.005, # 0.5% (> 0.3% threshold)
"trade_volume_10s": 5.0, # > 1.0 threshold
"spread_bps": 3.0, # < 20 bps threshold
}
signal = check_signal(features, "BTCUSDT")
assert signal is not None
assert signal.direction == "long"
assert signal.confidence > 0
def test_check_signal_short():
"""Strong negative return → short signal."""
features = {
"return_10s": -0.005,
"trade_volume_10s": 5.0,
"spread_bps": 3.0,
}
signal = check_signal(features, "BTCUSDT")
assert signal is not None
assert signal.direction == "short"
def test_check_signal_no_trigger():
"""Small return → no signal."""
features = {
"return_10s": 0.0001,
"trade_volume_10s": 5.0,
"spread_bps": 3.0,
}
signal = check_signal(features, "BTCUSDT")
assert signal is None
def test_check_signal_wide_spread():
"""Wide spread → no signal (even with strong return)."""
features = {
"return_10s": 0.01,
"trade_volume_10s": 5.0,
"spread_bps": 50.0, # > 20 bps
}
signal = check_signal(features, "BTCUSDT")
assert signal is None
def test_check_signal_low_volume():
"""Low volume → no signal."""
features = {
"return_10s": 0.01,
"trade_volume_10s": 0.1, # < 1.0
"spread_bps": 3.0,
}
signal = check_signal(features, "BTCUSDT")
assert signal is None