Files
microdao-daarion/services/senpai-md-consumer/senpai/md_consumer/api.py

177 lines
5.4 KiB
Python

"""
Minimal HTTP API — lightweight asyncio server (no framework dependency).
Endpoints:
GET /health → service health
GET /metrics → Prometheus metrics
GET /state/latest → latest trade/quote per symbol (?symbol=BTCUSDT)
GET /features/latest → latest computed features (?symbol=BTCUSDT)
GET /stats → queue fill, drops, events/sec
"""
from __future__ import annotations
import asyncio
import json
import logging
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from senpai.md_consumer.config import settings
from senpai.md_consumer.features import compute_features
from senpai.md_consumer.state import LatestState
logger = logging.getLogger(__name__)
# These are set by main.py at startup
_state: LatestState | None = None
_stats_fn = None # callable → dict
_features_cache: dict[str, dict] = {} # symbol → last computed features
def set_state(state: LatestState) -> None:
global _state
_state = state
def set_stats_fn(fn) -> None:
global _stats_fn
_stats_fn = fn
def cache_features(symbol: str, features: dict) -> None:
"""Cache pre-computed features for fast API responses."""
_features_cache[symbol] = features
async def _handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""Minimal HTTP request handler."""
try:
request_line = await asyncio.wait_for(reader.readline(), timeout=5.0)
request_str = request_line.decode("utf-8", errors="replace").strip()
parts = request_str.split()
if len(parts) < 2:
writer.close()
return
path = parts[1]
# Consume headers
while True:
line = await reader.readline()
if line in (b"\r\n", b"\n", b""):
break
# Parse query params
query_params: dict[str, str] = {}
if "?" in path:
base_path, query = path.split("?", 1)
for param in query.split("&"):
if "=" in param:
k, v = param.split("=", 1)
query_params[k] = v
else:
base_path = path
body, content_type, status = await _route(base_path, query_params)
response = (
f"HTTP/1.1 {status}\r\n"
f"Content-Type: {content_type}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(response.encode() + body)
await writer.drain()
except Exception:
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
async def _route(
path: str, params: dict[str, str]
) -> tuple[bytes, str, str]:
"""Route request to handler. Returns (body, content_type, status)."""
if path == "/health":
body = json.dumps({
"status": "ok",
"service": "senpai-md-consumer",
"symbols": _state.symbols if _state else [],
}).encode()
return body, "application/json", "200 OK"
elif path == "/metrics":
body = generate_latest()
return body, CONTENT_TYPE_LATEST, "200 OK"
elif path == "/state/latest":
symbol = params.get("symbol", "")
if not symbol:
body = json.dumps({"error": "missing ?symbol=XXX"}).encode()
return body, "application/json", "400 Bad Request"
if not _state:
body = json.dumps({"error": "not initialized"}).encode()
return body, "application/json", "503 Service Unavailable"
data = _state.to_dict(symbol)
body = json.dumps(data, ensure_ascii=False).encode()
return body, "application/json", "200 OK"
elif path == "/features/latest":
symbol = params.get("symbol", "").upper()
if not symbol:
body = json.dumps({"error": "missing ?symbol=XXX"}).encode()
return body, "application/json", "400 Bad Request"
cached = _features_cache.get(symbol)
if cached:
data = {"symbol": symbol, "features": cached}
elif _state:
# Fallback to live compute (slower)
data = {"symbol": symbol, "features": compute_features(_state, symbol)}
else:
body = json.dumps({"error": "not initialized"}).encode()
return body, "application/json", "503 Service Unavailable"
body = json.dumps(data, ensure_ascii=False).encode()
return body, "application/json", "200 OK"
elif path == "/stats":
if _stats_fn:
data = _stats_fn()
else:
data = {"error": "not initialized"}
body = json.dumps(data, ensure_ascii=False).encode()
return body, "application/json", "200 OK"
else:
body = json.dumps({"error": "not found"}).encode()
return body, "application/json", "404 Not Found"
async def start_api() -> asyncio.Server:
"""Start the HTTP server."""
server = await asyncio.start_server(
_handler,
settings.http_host,
settings.http_port,
)
logger.info(
"api.started",
extra={
"host": settings.http_host,
"port": settings.http_port,
"endpoints": [
"/health",
"/metrics",
"/state/latest?symbol=",
"/features/latest?symbol=",
"/stats",
],
},
)
return server