Files

SenpAI Market-Data Consumer

NATS subscriber + feature engine + signal bus for the SenpAI/Gordon trading agent.

Consumes normalised events from market-data-service, computes real-time features, and publishes signals back to NATS.

Architecture

market-data-service                  SenpAI MD Consumer
┌──────────────┐                    ┌────────────────────────────────┐
│ Binance WS   │                    │                                │
│ Bybit WS     │──► NATS ──────────►  NATSConsumer                  │
│ Alpaca WS    │   md.events.>      │    ↓ (bounded queue)           │
└──────────────┘                    │  State Store                   │
                                    │    ├─ LatestState (trade/quote)│
                                    │    └─ RollingWindow (60s deque)│
                                    │    ↓                           │
                                    │  Feature Engine                │
                                    │    ├─ mid, spread, vwap        │
                                    │    ├─ return_10s, vol_60s      │
                                    │    └─ latency p50/p95          │
                                    │    ↓                           │
                                    │  Publisher ──► NATS            │
                                    │    ├─ senpai.features.{symbol} │
                                    │    ├─ senpai.signals.{symbol}  │
                                    │    └─ senpai.alerts            │
                                    │                                │
                                    │  HTTP API (:8892)              │
                                    │    /health /metrics /stats     │
                                    │    /state/latest /features     │
                                    └────────────────────────────────┘

Quick Start

1. Install

cd services/senpai-md-consumer
pip install -r requirements.txt
cp .env.example .env

2. Start NATS (if not running)

docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:2.10-alpine --js -m 8222

3. Start market-data-service (producer)

cd ../market-data-service
python -m app run --provider binance --symbols BTCUSDT,ETHUSDT

4. Start SenpAI MD Consumer

cd ../senpai-md-consumer
python -m senpai.md_consumer

5. Verify

# Health
curl http://localhost:8892/health

# Stats
curl http://localhost:8892/stats

# Latest state
curl "http://localhost:8892/state/latest?symbol=BTCUSDT"

# Computed features
curl "http://localhost:8892/features/latest?symbol=BTCUSDT"

# Prometheus metrics
curl http://localhost:8892/metrics

Docker

Standalone (with NATS)

docker-compose -f docker-compose.senpai.yml up -d

Part of NODE1 stack

docker-compose -f docker-compose.node1.yml up -d market-data-service senpai-md-consumer

NATS Subjects

Consumed (from market-data-service)

Subject Description
md.events.trade.{symbol} Trade events
md.events.quote.{symbol} Quote events
md.events.book_l2.{symbol} L2 book snapshots
md.events.heartbeat.__system__ Provider heartbeats

Published (for SenpAI/other consumers)

Subject Description
senpai.features.{symbol} Feature snapshots (rate-limited to 10Hz/symbol)
senpai.signals.{symbol} Trade signals (long/short)
senpai.alerts System alerts (latency, gaps, backpressure)

Features Computed

Feature Description
mid (bid + ask) / 2
spread_abs ask - bid
spread_bps spread in basis points
trade_vwap_10s VWAP over 10 seconds
trade_vwap_60s VWAP over 60 seconds
trade_count_10s Number of trades in 10s
trade_volume_10s Total volume in 10s
return_10s Price return over 10 seconds
realized_vol_60s Realised volatility (60s log-return std)
latency_ms_p50 p50 exchange-to-receive latency
latency_ms_p95 p95 exchange-to-receive latency

Signal Rules (MVP)

Long signal emitted when ALL conditions met:

  • return_10s > 0.3% (configurable)
  • trade_volume_10s > 1.0 (configurable)
  • spread_bps < 20 (configurable)

Short signal: same but return_10s < -0.3%

Backpressure Policy

  • Queue < 90% → accept all events
  • Queue >= 90% → drop heartbeats, quotes, book snapshots
  • Trades are NEVER dropped

HTTP Endpoints

Endpoint Description
GET /health Service health + tracked symbols
GET /metrics Prometheus metrics
GET /state/latest?symbol= Latest trade + quote
GET /features/latest?symbol= Current computed features
GET /stats Queue fill, drops, events/sec

Prometheus Metrics

Metric Type Description
senpai_events_in_total Counter Events received {type, provider}
senpai_events_dropped_total Counter Dropped events {reason, type}
senpai_queue_fill_ratio Gauge Queue fill 0..1
senpai_processing_latency_ms Histogram Processing latency
senpai_feature_publish_total Counter Feature publishes {symbol}
senpai_signals_emitted_total Counter Signals {symbol, direction}
senpai_nats_connected Gauge NATS connection status

Tests

pytest tests/ -v

41 tests:

  • 11 model parsing tests (tolerant parsing, edge cases)
  • 10 state/rolling window tests (eviction, lookup)
  • 16 feature math tests (VWAP, vol, signals, percentile)
  • 5 rate-limit tests (publish throttling, error handling)

Troubleshooting

NATS connection refused

nats.error: error=could not connect to server

Ensure NATS is running:

docker run -d --name nats -p 4222:4222 nats:2.10-alpine --js

Or check NATS_URL in .env.

No events arriving (queue stays at 0)

  1. Verify market-data-service is running and NATS_ENABLED=true
  2. Check subject match: producer publishes to md.events.trade.BTCUSDT, consumer subscribes to md.events.>
  3. Check NATS monitoring: curl http://localhost:8222/connz — both services should appear

JetStream errors

If USE_JETSTREAM=true but NATS started without --js:

# Restart NATS with JetStream
docker rm -f nats
docker run -d -p 4222:4222 -p 8222:8222 nats:2.10-alpine --js -m 8222

Or set USE_JETSTREAM=false for core NATS (simpler, works for MVP).

Port 8892 already in use

lsof -ti:8892 | xargs kill -9

Features show null for all values

Normal on startup — features populate after first trade+quote arrive. Wait a few seconds for Binance data to flow through.

No signals emitted

Signal rules require ALL conditions simultaneously:

  • return_10s > 0.3% — needs price movement
  • volume_10s > 1.0 — needs trading activity
  • spread_bps < 20 — needs tight spread

In low-volatility markets, signals may be rare. Lower thresholds in .env for testing:

SIGNAL_RETURN_THRESHOLD=0.001
SIGNAL_VOLUME_THRESHOLD=0.1

High memory usage

Rolling windows grow per symbol. With many symbols, reduce window:

ROLLING_WINDOW_SECONDS=30

Configuration (ENV)

See .env.example for all available settings.

Key settings:

  • NATS_URL — NATS server URL
  • FEATURES_PUB_RATE_HZ — max feature publishes per symbol per second
  • SIGNAL_RETURN_THRESHOLD — min return for signal trigger
  • ROLLING_WINDOW_SECONDS — rolling window duration