Producer (market-data-service):
- Backpressure: smart drop policy (heartbeats→quotes→trades preserved)
- Heartbeat monitor: synthetic HeartbeatEvent on provider silence
- Graceful shutdown: WS→bus→storage→DB engine cleanup sequence
- Bybit V5 public WS provider (backup for Binance, no API key needed)
- FailoverManager: health-based provider switching with recovery
- NATS output adapter: md.events.{type}.{symbol} for SenpAI
- /bus-stats endpoint for backpressure monitoring
- Dockerfile + docker-compose.node1.yml integration
- 36 tests (parsing + bus + failover), requirements.lock
Consumer (senpai-md-consumer):
- NATSConsumer: subscribe md.events.>, queue group senpai-md, backpressure
- State store: LatestState + RollingWindow (deque, 60s)
- Feature engine: 11 features (mid, spread, VWAP, return, vol, latency)
- Rule-based signals: long/short on return+volume+spread conditions
- Publisher: rate-limited features + signals + alerts to NATS
- HTTP API: /health, /metrics, /state/latest, /features/latest, /stats
- 10 Prometheus metrics
- Dockerfile + docker-compose.senpai.yml
- 41 tests (parsing + state + features + rate-limit), requirements.lock
CI: ruff + pytest + smoke import for both services
Tests: 77 total passed, lint clean
Co-authored-by: Cursor <cursoragent@cursor.com>
213 lines
5.9 KiB
Python
213 lines
5.9 KiB
Python
"""
|
|
Test feature computations — deterministic scenarios.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from senpai.md_consumer.features import (
|
|
_percentile,
|
|
_realized_vol,
|
|
_vwap,
|
|
check_signal,
|
|
compute_features,
|
|
)
|
|
from senpai.md_consumer.models import QuoteEvent, TradeEvent
|
|
from senpai.md_consumer.state import LatestState, TradeRecord
|
|
|
|
|
|
# ── VWAP ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_vwap_basic():
|
|
trades = [
|
|
TradeRecord(price=100.0, size=10.0, ts=0),
|
|
TradeRecord(price=200.0, size=10.0, ts=0),
|
|
]
|
|
# VWAP = (100*10 + 200*10) / (10+10) = 150
|
|
assert _vwap(trades) == 150.0
|
|
|
|
|
|
def test_vwap_weighted():
|
|
trades = [
|
|
TradeRecord(price=100.0, size=90.0, ts=0),
|
|
TradeRecord(price=200.0, size=10.0, ts=0),
|
|
]
|
|
# VWAP = (100*90 + 200*10) / 100 = 110
|
|
assert _vwap(trades) == 110.0
|
|
|
|
|
|
def test_vwap_empty():
|
|
assert _vwap([]) is None
|
|
|
|
|
|
def test_vwap_zero_volume():
|
|
trades = [TradeRecord(price=100.0, size=0.0, ts=0)]
|
|
assert _vwap(trades) is None
|
|
|
|
|
|
# ── Realized volatility ───────────────────────────────────────────────
|
|
|
|
|
|
def test_realized_vol_constant_price():
|
|
"""Constant price → 0 volatility."""
|
|
trades = [TradeRecord(price=100.0, size=1.0, ts=0) for _ in range(10)]
|
|
vol = _realized_vol(trades)
|
|
assert vol is not None
|
|
assert vol == 0.0
|
|
|
|
|
|
def test_realized_vol_two_prices():
|
|
"""Not enough data points → None."""
|
|
trades = [
|
|
TradeRecord(price=100.0, size=1.0, ts=0),
|
|
TradeRecord(price=101.0, size=1.0, ts=0),
|
|
]
|
|
assert _realized_vol(trades) is None # needs at least 3
|
|
|
|
|
|
def test_realized_vol_positive():
|
|
"""Variable prices should give positive volatility."""
|
|
trades = [
|
|
TradeRecord(price=100.0, size=1.0, ts=0),
|
|
TradeRecord(price=102.0, size=1.0, ts=0),
|
|
TradeRecord(price=99.0, size=1.0, ts=0),
|
|
TradeRecord(price=103.0, size=1.0, ts=0),
|
|
]
|
|
vol = _realized_vol(trades)
|
|
assert vol is not None
|
|
assert vol > 0
|
|
|
|
|
|
# ── Percentile ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def test_percentile_basic():
|
|
data = [1.0, 2.0, 3.0, 4.0, 5.0]
|
|
assert _percentile(data, 50) == 3.0
|
|
assert _percentile(data, 0) == 1.0
|
|
assert _percentile(data, 100) == 5.0
|
|
|
|
|
|
def test_percentile_p95():
|
|
data = list(range(1, 101)) # 1..100
|
|
data_float = [float(x) for x in data]
|
|
p95 = _percentile(data_float, 95)
|
|
assert 95 <= p95 <= 96
|
|
|
|
|
|
# ── Full feature computation ──────────────────────────────────────────
|
|
|
|
|
|
def test_compute_features_with_state():
|
|
state = LatestState(window_seconds=60.0)
|
|
|
|
# Add quote
|
|
state.update_quote(QuoteEvent(
|
|
provider="binance",
|
|
symbol="BTCUSDT",
|
|
bid=70000.0,
|
|
ask=70002.0,
|
|
bid_size=5.0,
|
|
ask_size=3.0,
|
|
))
|
|
|
|
# Add some trades
|
|
for i in range(5):
|
|
state.update_trade(TradeEvent(
|
|
provider="binance",
|
|
symbol="BTCUSDT",
|
|
price=70000.0 + i * 10,
|
|
size=1.0,
|
|
))
|
|
|
|
features = compute_features(state, "BTCUSDT")
|
|
|
|
# Mid
|
|
assert features["mid"] == pytest.approx(70001.0)
|
|
|
|
# Spread
|
|
assert features["spread_abs"] == pytest.approx(2.0)
|
|
assert features["spread_bps"] is not None
|
|
assert features["spread_bps"] > 0
|
|
|
|
# Trade count
|
|
assert features["trade_count_10s"] == 5.0
|
|
|
|
# Volume
|
|
assert features["trade_volume_10s"] == 5.0
|
|
|
|
# VWAP should be defined
|
|
assert features["trade_vwap_10s"] is not None
|
|
assert features["trade_vwap_60s"] is not None
|
|
|
|
|
|
def test_compute_features_no_data():
|
|
state = LatestState(window_seconds=60.0)
|
|
features = compute_features(state, "BTCUSDT")
|
|
|
|
# All should be None
|
|
assert features["mid"] is None
|
|
assert features["spread_abs"] is None
|
|
assert features["trade_vwap_10s"] is None
|
|
|
|
|
|
# ── Signal detection ──────────────────────────────────────────────────
|
|
|
|
|
|
def test_check_signal_long():
|
|
"""Strong positive return + volume + tight spread → long signal."""
|
|
features = {
|
|
"return_10s": 0.005, # 0.5% (> 0.3% threshold)
|
|
"trade_volume_10s": 5.0, # > 1.0 threshold
|
|
"spread_bps": 3.0, # < 20 bps threshold
|
|
}
|
|
signal = check_signal(features, "BTCUSDT")
|
|
assert signal is not None
|
|
assert signal.direction == "long"
|
|
assert signal.confidence > 0
|
|
|
|
|
|
def test_check_signal_short():
|
|
"""Strong negative return → short signal."""
|
|
features = {
|
|
"return_10s": -0.005,
|
|
"trade_volume_10s": 5.0,
|
|
"spread_bps": 3.0,
|
|
}
|
|
signal = check_signal(features, "BTCUSDT")
|
|
assert signal is not None
|
|
assert signal.direction == "short"
|
|
|
|
|
|
def test_check_signal_no_trigger():
|
|
"""Small return → no signal."""
|
|
features = {
|
|
"return_10s": 0.0001,
|
|
"trade_volume_10s": 5.0,
|
|
"spread_bps": 3.0,
|
|
}
|
|
signal = check_signal(features, "BTCUSDT")
|
|
assert signal is None
|
|
|
|
|
|
def test_check_signal_wide_spread():
|
|
"""Wide spread → no signal (even with strong return)."""
|
|
features = {
|
|
"return_10s": 0.01,
|
|
"trade_volume_10s": 5.0,
|
|
"spread_bps": 50.0, # > 20 bps
|
|
}
|
|
signal = check_signal(features, "BTCUSDT")
|
|
assert signal is None
|
|
|
|
|
|
def test_check_signal_low_volume():
|
|
"""Low volume → no signal."""
|
|
features = {
|
|
"return_10s": 0.01,
|
|
"trade_volume_10s": 0.1, # < 1.0
|
|
"spread_bps": 3.0,
|
|
}
|
|
signal = check_signal(features, "BTCUSDT")
|
|
assert signal is None
|