""" Binance public WebSocket provider. No API key required. Subscribes to: - @trade → TradeEvent - @bookTicker → QuoteEvent Auto-reconnect with exponential backoff via tenacity. """ from __future__ import annotations import asyncio import json import logging from datetime import datetime, timezone from typing import AsyncIterator import websockets from websockets.exceptions import ConnectionClosed from app.config import settings from app.domain.events import ( Event, QuoteEvent, TradeEvent, ) from app.providers import MarketDataProvider logger = logging.getLogger(__name__) def _ms_to_dt(ms: int | float | None) -> datetime | None: """Convert millisecond epoch to UTC datetime.""" if ms is None: return None return datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc) class BinanceProvider(MarketDataProvider): """ Binance public WebSocket streams. Connects to the combined stream endpoint and subscribes to trade + bookTicker channels for each symbol. """ name = "binance" def __init__(self) -> None: self._ws: websockets.WebSocketClientProtocol | None = None self._symbols: list[str] = [] self._connected = False self._reconnect_count = 0 self._base_url = settings.binance_ws_url async def connect(self) -> None: """Establish WebSocket connection.""" logger.info("binance.connecting", extra={"url": self._base_url}) self._ws = await websockets.connect( self._base_url, ping_interval=20, ping_timeout=10, close_timeout=5, ) self._connected = True logger.info("binance.connected") async def subscribe(self, symbols: list[str]) -> None: """Subscribe to trade + bookTicker for each symbol.""" if not self._ws: raise RuntimeError("Not connected. Call connect() first.") self._symbols = [s.lower() for s in symbols] streams = [] for sym in self._symbols: streams.append(f"{sym}@trade") streams.append(f"{sym}@bookTicker") subscribe_msg = { "method": "SUBSCRIBE", "params": streams, "id": 1, } await self._ws.send(json.dumps(subscribe_msg)) logger.info( "binance.subscribed", extra={"symbols": self._symbols, "streams": len(streams)}, ) async def stream(self) -> AsyncIterator[Event]: """ Yield domain events. Handles reconnect automatically. """ backoff = settings.reconnect_base_delay while True: try: if not self._connected or not self._ws: await self._reconnect(backoff) # Set timeout for heartbeat detection try: raw = await asyncio.wait_for( self._ws.recv(), # type: ignore timeout=settings.heartbeat_timeout, ) except asyncio.TimeoutError: logger.warning( "binance.heartbeat_timeout", extra={"timeout": settings.heartbeat_timeout}, ) self._connected = False continue # Reset backoff on successful message backoff = settings.reconnect_base_delay data = json.loads(raw) # Skip subscription confirmations if "result" in data and "id" in data: continue event = self._parse(data) if event: yield event except ConnectionClosed as e: logger.warning( "binance.connection_closed", extra={"code": e.code, "reason": str(e.reason)}, ) self._connected = False backoff = min(backoff * 2, settings.reconnect_max_delay) except Exception as e: logger.error("binance.stream_error", extra={"error": str(e)}) self._connected = False backoff = min(backoff * 2, settings.reconnect_max_delay) async def _reconnect(self, delay: float) -> None: """Reconnect with delay, then resubscribe.""" self._reconnect_count += 1 logger.info( "binance.reconnecting", extra={"delay": delay, "attempt": self._reconnect_count}, ) await asyncio.sleep(delay) try: if self._ws: await self._ws.close() except Exception: pass await self.connect() if self._symbols: await self.subscribe(self._symbols) def _parse(self, data: dict) -> Event | None: """Parse raw Binance JSON into domain events.""" event_type = data.get("e") if event_type == "trade": return self._parse_trade(data) elif event_type == "bookTicker" or ("b" in data and "a" in data and "s" in data and "e" not in data): # bookTicker doesn't always have "e" field in combined stream return self._parse_book_ticker(data) return None def _parse_trade(self, data: dict) -> TradeEvent: """ Binance trade payload: { "e": "trade", "E": 1672515782136, "s": "BNBBTC", "t": 12345, "p": "0.001", "q": "100", "T": 1672515782136, "m": true } """ return TradeEvent( provider=self.name, symbol=data.get("s", "").upper(), price=float(data.get("p", 0)), size=float(data.get("q", 0)), ts_exchange=_ms_to_dt(data.get("T")), side="sell" if data.get("m") else "buy", # m=True → buyer is maker → trade is a sell trade_id=str(data.get("t", "")), ) def _parse_book_ticker(self, data: dict) -> QuoteEvent: """ Binance bookTicker payload: { "u": 400900217, "s": "BNBUSDT", "b": "25.35190000", "B": "31.21000000", "a": "25.36520000", "A": "40.66000000" } """ return QuoteEvent( provider=self.name, symbol=data.get("s", "").upper(), bid=float(data.get("b", 0)), ask=float(data.get("a", 0)), bid_size=float(data.get("B", 0)), ask_size=float(data.get("A", 0)), ts_exchange=_ms_to_dt(data.get("E")), ) async def close(self) -> None: """Close the WebSocket connection.""" self._connected = False if self._ws: try: await self._ws.close() except Exception: pass logger.info( "binance.closed", extra={"reconnect_count": self._reconnect_count}, )