""" Binance Bot Monitor — monitors Binance Bot Marketplace + own sub-account bots. Exposes REST API for SenpAI tool use. """ from __future__ import annotations import asyncio, hashlib, hmac, json, logging, os, time from typing import Any, Dict, List, Optional from urllib.parse import urlencode import httpx import redis.asyncio as aioredis from fastapi import FastAPI from fastapi.responses import JSONResponse logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") CACHE_TTL = int(os.getenv("BINANCE_CACHE_TTL", "3600")) REFRESH_INTERVAL = int(os.getenv("BINANCE_REFRESH_INTERVAL", "1800")) CRAWL4AI_URL = os.getenv("CRAWL4AI_URL", "http://crawl4ai:11235") SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890") BINANCE_API_KEY = os.getenv("BINANCE_API_KEY", "") BINANCE_SECRET = os.getenv("BINANCE_SECRET_KEY", "") BINANCE_API_BASE = "https://api.binance.com" CACHE_KEY_SPOT = "binance:bots:spot_grid" CACHE_KEY_FUTURES = "binance:bots:futures_grid" CACHE_KEY_ACCOUNT = "binance:account:bots" app = FastAPI(title="Binance Bot Monitor", version="2.0.0") _redis: Optional[aioredis.Redis] = None async def get_redis() -> aioredis.Redis: global _redis if _redis is None: _redis = aioredis.from_url(REDIS_URL, decode_responses=True) return _redis def _sign(params: str) -> str: return hmac.new(BINANCE_SECRET.encode(), params.encode(), hashlib.sha256).hexdigest() async def _binance_signed_get(client: httpx.AsyncClient, path: str, extra_params: str = "") -> Optional[Dict]: """Authenticated signed request to api.binance.com.""" if not BINANCE_API_KEY or not BINANCE_SECRET: return None ts = int(time.time() * 1000) params = f"{extra_params}×tamp={ts}" if extra_params else f"timestamp={ts}" sig = _sign(params) url = f"{BINANCE_API_BASE}{path}?{params}&signature={sig}" try: resp = await client.get(url, headers={"X-MBX-APIKEY": BINANCE_API_KEY}, timeout=10.0) if resp.status_code == 200: return resp.json() logger.warning(f"Binance signed GET {path} → {resp.status_code}: {resp.text[:100]}") except Exception as e: logger.warning(f"Binance signed GET {path} failed: {e}") return None async def fetch_account_bots() -> Dict[str, Any]: """Fetch this sub-account's own grid bots via signed API.""" result: Dict[str, Any] = { "source": "binance_api", "account_type": None, "can_trade": None, "balances": [], "open_algo_orders": [], "historical_algo_orders": [], "error": None, "cached_at": time.time(), } if not BINANCE_API_KEY: result["error"] = "No API key configured" return result async with httpx.AsyncClient(timeout=12.0) as client: # Account info acct = await _binance_signed_get(client, "/api/v3/account") if acct: result["account_type"] = acct.get("accountType") result["can_trade"] = acct.get("canTrade") result["permissions"] = acct.get("permissions", []) result["balances"] = [ b for b in acct.get("balances", []) if float(b.get("free", 0)) > 0 or float(b.get("locked", 0)) > 0 ] # Open algo/grid orders open_orders = await _binance_signed_get(client, "/sapi/v1/algo/spot/openOrders") if open_orders: result["open_algo_orders"] = open_orders.get("orders", []) # Historical algo orders (last 10) hist = await _binance_signed_get(client, "/sapi/v1/algo/spot/historicalOrders", "pageSize=10") if hist: result["historical_algo_orders"] = hist.get("orders", []) # Futures algo grid orders (if futures enabled) open_fut = await _binance_signed_get(client, "/sapi/v1/algo/futures/openOrders") if open_fut: result["open_futures_algo"] = open_fut.get("orders", []) # Cache try: r = await get_redis() await r.set(CACHE_KEY_ACCOUNT, json.dumps(result, ensure_ascii=False), ex=CACHE_TTL) except Exception: pass return result # --- Marketplace scraping (unchanged from v1) --- BROWSER_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36", "Accept": "application/json", "Referer": "https://www.binance.com/en/trading-bots/marketplace", } async def _try_web_search(grid_type: str = "SPOT") -> Optional[List[Dict]]: query = f"binance {grid_type.lower()} grid bot marketplace top ROI PNL ranking 2026" try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post(f"{SWAPPER_URL}/web/search", json={"query": query, "max_results": 5}) if resp.status_code == 200: results = resp.json().get("results", []) return [{"source": "web_search", "title": r.get("title"), "url": r.get("url"), "snippet": r.get("snippet")} for r in results[:5]] except Exception as e: logger.warning(f"web_search failed: {e}") return None async def fetch_and_cache_marketplace(grid_type: str = "SPOT") -> Dict[str, Any]: cache_key = CACHE_KEY_SPOT if grid_type == "SPOT" else CACHE_KEY_FUTURES result = {"grid_type": grid_type, "source": "unknown", "bots": [], "cached_at": None, "error": None} bots = await _try_web_search(grid_type) if bots: result["source"] = "web_search" result["bots"] = bots else: result["error"] = "All methods failed" result["cached_at"] = time.time() try: r = await get_redis() await r.set(cache_key, json.dumps(result, ensure_ascii=False), ex=CACHE_TTL) except Exception: pass return result async def _background_refresh(): logger.info("Background refresh worker started") while True: try: await asyncio.sleep(REFRESH_INTERVAL) await fetch_and_cache_marketplace("SPOT") await asyncio.sleep(60) await fetch_and_cache_marketplace("FUTURES") await asyncio.sleep(60) await fetch_account_bots() except asyncio.CancelledError: break except Exception as e: logger.error(f"Background refresh error: {e}") await asyncio.sleep(300) @app.on_event("startup") async def startup(): asyncio.create_task(_background_refresh()) asyncio.create_task(fetch_and_cache_marketplace("SPOT")) asyncio.create_task(fetch_account_bots()) @app.get("/health") async def health(): has_key = bool(BINANCE_API_KEY) return {"status": "ok", "service": "binance-bot-monitor", "has_api_key": has_key} @app.get("/top-bots") async def top_bots(grid_type: str = "SPOT", limit: int = 10, force_refresh: bool = False): """Marketplace top bots (web_search fallback).""" grid_type = grid_type.upper() cache_key = CACHE_KEY_SPOT if grid_type == "SPOT" else CACHE_KEY_FUTURES if not force_refresh: try: r = await get_redis() cached = await r.get(cache_key) if cached: data = json.loads(cached) age_min = int((time.time() - data.get("cached_at", 0)) / 60) data["cache_age_minutes"] = age_min if isinstance(data.get("bots"), list): data["bots"] = data["bots"][:limit] return JSONResponse(data) except Exception: pass data = await fetch_and_cache_marketplace(grid_type) if isinstance(data.get("bots"), list): data["bots"] = data["bots"][:limit] return JSONResponse(data) @app.get("/account-bots") async def account_bots(force_refresh: bool = False): """This sub-account's own bots via signed Binance API.""" if not force_refresh: try: r = await get_redis() cached = await r.get(CACHE_KEY_ACCOUNT) if cached: data = json.loads(cached) age_min = int((time.time() - data.get("cached_at", 0)) / 60) data["cache_age_minutes"] = age_min return JSONResponse(data) except Exception: pass return JSONResponse(await fetch_account_bots()) @app.post("/refresh") async def trigger_refresh(grid_type: str = "SPOT", mode: str = "marketplace"): if mode == "account": asyncio.create_task(fetch_account_bots()) else: asyncio.create_task(fetch_and_cache_marketplace(grid_type.upper())) return {"status": "refresh_triggered", "mode": mode} # ─── CCXT Multi-Symbol Price Endpoint ───────────────────────────────────────── # Added: 2026-02-28 — Senpai expansion to 23 pairs + XAU/XAG via Kraken BINANCE_SPOT_SYMBOLS = [ 'BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT', 'ADAUSDT', 'DOGEUSDT', 'AVAXUSDT', 'DOTUSDT', 'LINKUSDT', 'POLUSDT', 'SHIBUSDT', 'TRXUSDT', 'UNIUSDT', 'LTCUSDT', 'ATOMUSDT', 'NEARUSDT', 'ICPUSDT', 'FILUSDT', 'APTUSDT', 'PAXGUSDT', ] CACHE_KEY_PRICES = 'senpai:prices:all' PRICE_CACHE_TTL = 30 async def fetch_binance_prices(symbols: list) -> dict: result = {} syms_param = '%5B' + '%2C'.join(('%22' + s + '%22') for s in symbols) + '%5D' url = 'https://api.binance.com/api/v3/ticker/24hr?symbols=' + syms_param try: async with httpx.AsyncClient(timeout=8.0) as client: resp = await client.get(url) if resp.status_code == 200: for item in resp.json(): sym = item.get('symbol', '') result[sym] = { 'symbol': sym, 'price': float(item.get('lastPrice') or 0), 'bid': float(item.get('bidPrice') or 0), 'ask': float(item.get('askPrice') or 0), 'volume_24h': float(item.get('volume') or 0), 'quote_volume_24h': float(item.get('quoteVolume') or 0), 'price_change_pct_24h': float(item.get('priceChangePercent') or 0), 'high_24h': float(item.get('highPrice') or 0), 'low_24h': float(item.get('lowPrice') or 0), 'exchange': 'binance', 'type': 'spot', } except Exception as e: logger.warning(f'binance prices fetch error: {e}') return result async def fetch_binance_futures_metals() -> dict: """XAU/USDT and XAG/USDT from Binance USDM Futures - no API key needed.""" result = {} for symbol in ("XAUUSDT", "XAGUSDT"): try: async with httpx.AsyncClient(timeout=6.0) as client: resp = await client.get( "https://fapi.binance.com/fapi/v1/ticker/24hr", params={"symbol": symbol} ) if resp.status_code == 200: d = resp.json() result[symbol] = { "symbol": symbol, "price": float(d.get("lastPrice") or 0), "bid": None, "ask": None, "volume_24h": float(d.get("volume") or 0), "quote_volume_24h": float(d.get("quoteVolume") or 0), "price_change_pct_24h": float(d.get("priceChangePercent") or 0), "high_24h": float(d.get("highPrice") or 0), "low_24h": float(d.get("lowPrice") or 0), "exchange": "binance_futures", "type": "futures_usdm", "note": "Gold perp USDM" if symbol == "XAUUSDT" else "Silver perp USDM", } else: result[symbol] = {"symbol": symbol, "price": None, "error": f"HTTP {resp.status_code}"} except Exception as e: logger.warning(f"binance futures {symbol} error: {e}") result[symbol] = {"symbol": symbol, "price": None, "error": str(e)} return result fetch_kraken_gold_silver = fetch_binance_futures_metals @app.get('/prices') async def get_all_prices(force_refresh: bool = False): if not force_refresh: try: r = await get_redis() cached = await r.get(CACHE_KEY_PRICES) if cached: data = json.loads(cached) data['cache_age_seconds'] = int(time.time() - data.get('cached_at', 0)) return JSONResponse(data) except Exception: pass binance_prices, kraken_prices = await asyncio.gather( fetch_binance_prices(BINANCE_SPOT_SYMBOLS), fetch_kraken_gold_silver(), ) all_prices = {**binance_prices, **kraken_prices} result = { 'prices': all_prices, 'total': len(all_prices), 'symbols': list(all_prices.keys()), 'cached_at': time.time(), 'sources': {'binance_spot': len(binance_prices), 'kraken': len(kraken_prices)}, } try: r = await get_redis() await r.set(CACHE_KEY_PRICES, json.dumps(result, ensure_ascii=False), ex=PRICE_CACHE_TTL) except Exception: pass return JSONResponse(result) @app.get('/price') async def get_single_price(symbol: str): symbol = symbol.upper() try: r = await get_redis() cached = await r.get(CACHE_KEY_PRICES) if cached: data = json.loads(cached) prices = data.get('prices', {}) if symbol in prices: return JSONResponse({'symbol': symbol, **prices[symbol], 'from_cache': True}) except Exception: pass if symbol in ('XAUUSDT', 'XAGUSDT'): prices = await fetch_kraken_gold_silver() else: prices = await fetch_binance_prices([symbol]) if symbol in prices: return JSONResponse({'symbol': symbol, **prices[symbol], 'from_cache': False}) return JSONResponse({'symbol': symbol, 'error': 'Not found'}, status_code=404)