""" Monitor telemetry bridge — probes each node's monitor endpoint. Each node CAN expose GET /monitor/status (or /healthz extended). This module does a best-effort fan-out: missing/unreachable nodes return {"online": false} without crashing the dashboard. Expected monitor/status response shape (node provides): { "online": true, "ts": "ISO", "node_id": "NODA1", "heartbeat_age_s": 5, "router": {"ok": true, "latency_ms": 12}, "gateway": {"ok": true, "latency_ms": 8}, "alerts_loop_slo": {"p95_ms": 320, "failed_rate": 0.0}, "open_incidents": 2, "backends": {"alerts": "postgres", "audit": "auto", ...}, "last_artifacts": { "risk_digest": "2026-02-24", "platform_digest": "2026-W08", "backlog": "2026-02-24" } } If a node only has /healthz, we synthesise a partial status from it. """ from __future__ import annotations import asyncio import time import os from datetime import datetime, timezone from typing import Any, Dict, List, Optional from urllib.parse import urlparse, urlunparse import httpx # Timeout per node probe (seconds) _PROBE_TIMEOUT = 8.0 # Paths tried in order for monitor status _MONITOR_PATHS = ["/monitor/status", "/api/monitor/status"] # Fallback health paths for basic online check _HEALTH_PATHS = ["/healthz", "/health"] def _running_in_docker() -> bool: return os.path.exists("/.dockerenv") def _normalize_probe_url(base_url: str) -> str: """ Inside Docker, localhost points to the container itself. Remap localhost/127.0.0.1 to host.docker.internal for node probes. """ if not base_url: return base_url if not _running_in_docker(): return base_url try: parsed = urlparse(base_url) if parsed.hostname in ("localhost", "127.0.0.1"): netloc = parsed.netloc.replace(parsed.hostname, "host.docker.internal") return urlunparse(parsed._replace(netloc=netloc)) except Exception: return base_url return base_url async def _probe_monitor(base_url: str, timeout: float = _PROBE_TIMEOUT) -> Dict[str, Any]: """ Probe a node's monitor endpoint. Returns the monitor status dict (may be synthesised from /healthz). """ base = base_url.rstrip("/") t0 = time.monotonic() async with httpx.AsyncClient(timeout=timeout) as client: # Try dedicated /monitor/status first for path in _MONITOR_PATHS: try: r = await client.get(f"{base}{path}") if r.status_code == 200: d = r.json() d.setdefault("online", True) d.setdefault("latency_ms", int((time.monotonic() - t0) * 1000)) d.setdefault("source", "monitor_endpoint") return d except Exception: continue # Fallback: synthesise from /healthz for path in _HEALTH_PATHS: try: r = await client.get(f"{base}{path}") if r.status_code == 200: latency = int((time.monotonic() - t0) * 1000) try: hdata = r.json() except Exception: hdata = {} return { "online": True, "ts": datetime.now(timezone.utc).isoformat(timespec="seconds"), "latency_ms": latency, "source": "healthz_fallback", "router": {"ok": hdata.get("ok", True), "latency_ms": latency}, "gateway": None, "alerts_loop_slo": None, "open_incidents": None, "backends": {}, "last_artifacts": {}, } except Exception: continue return { "online": False, "ts": datetime.now(timezone.utc).isoformat(timespec="seconds"), "latency_ms": None, "source": "unreachable", "error": f"no response from {base}", } async def _probe_router(router_url: str, timeout: float = 5.0) -> Dict[str, Any]: """Quick router health probe.""" base = router_url.rstrip("/") t0 = time.monotonic() async with httpx.AsyncClient(timeout=timeout) as client: for path in ("/healthz", "/health"): try: r = await client.get(f"{base}{path}") if r.status_code == 200: latency = int((time.monotonic() - t0) * 1000) try: d = r.json() except Exception: d = {} return {"ok": True, "latency_ms": latency, "detail": d.get("status", "ok")} except Exception: continue return {"ok": False, "latency_ms": None} async def _probe_gateway(gateway_url: str, timeout: float = 5.0) -> Optional[Dict[str, Any]]: """ Gateway health probe — also extracts build_sha, agents_count, required_missing from /health response when available. """ if not gateway_url: return None base = gateway_url.rstrip("/") t0 = time.monotonic() async with httpx.AsyncClient(timeout=timeout) as client: for path in ("/health", "/healthz", "/"): try: r = await client.get(f"{base}{path}", timeout=timeout) latency = int((time.monotonic() - t0) * 1000) if r.status_code < 500: ok = r.status_code < 400 result: Dict[str, Any] = {"ok": ok, "latency_ms": latency} if ok: try: d = r.json() result["agents_count"] = d.get("agents_count") result["build_sha"] = d.get("build_sha") result["build_time"] = d.get("build_time") result["node_id"] = d.get("node_id") result["required_missing"] = d.get("required_missing", []) except Exception: pass return result except Exception: continue return {"ok": False, "latency_ms": None} async def collect_node_telemetry( node_id: str, cfg: Dict[str, Any], router_api_key: str = "", ) -> Dict[str, Any]: """ Full telemetry for one node. Runs monitor probe, router probe, gateway probe in parallel. Returns merged/normalised result. """ router_url = _normalize_probe_url(cfg.get("router_url", "")) gateway_url = _normalize_probe_url(cfg.get("gateway_url", "")) monitor_url = _normalize_probe_url(cfg.get("monitor_url") or router_url) # default: same host as router async def _no_monitor() -> Dict[str, Any]: return {"online": False, "source": "no_url"} async def _no_router() -> Dict[str, Any]: return {"ok": False} # Fan-out parallel probes results = await asyncio.gather( _probe_monitor(monitor_url) if monitor_url else _no_monitor(), _probe_router(router_url) if router_url else _no_router(), _probe_gateway(gateway_url), return_exceptions=True, ) mon = results[0] if not isinstance(results[0], Exception) else {"online": False, "error": str(results[0])[:100]} rtr = results[1] if not isinstance(results[1], Exception) else {"ok": False} gwy = results[2] if not isinstance(results[2], Exception) else None # Merge: router from dedicated probe overrides monitor.router if present # (dedicated probe is more accurate; monitor.router may be stale) router_merged = { "ok": rtr.get("ok", False), "latency_ms": rtr.get("latency_ms"), } gateway_merged = gwy # may be None # Determine overall online status online = rtr.get("ok", False) or mon.get("online", False) gwy_data = gateway_merged or {} return { "node_id": node_id, "label": cfg.get("label", node_id), "node_role": cfg.get("node_role", "prod"), "router_url": router_url, "gateway_url": gateway_url or None, "monitor_url": monitor_url or None, "ssh_configured": bool(cfg.get("ssh")), "online": online, "ts": mon.get("ts") or datetime.now(timezone.utc).isoformat(timespec="seconds"), # --- router --- "router_ok": router_merged["ok"], "router_latency_ms": router_merged["latency_ms"], # --- gateway --- "gateway_ok": gwy_data.get("ok"), "gateway_latency_ms": gwy_data.get("latency_ms"), "gateway_agents_count": gwy_data.get("agents_count"), "gateway_build_sha": gwy_data.get("build_sha"), "gateway_build_time": gwy_data.get("build_time"), "gateway_required_missing": gwy_data.get("required_missing", []), # --- monitor extended (present only if monitor endpoint exists) --- "heartbeat_age_s": mon.get("heartbeat_age_s"), "alerts_loop_slo": mon.get("alerts_loop_slo"), "open_incidents": mon.get("open_incidents"), "backends": mon.get("backends") or {}, "last_artifacts": mon.get("last_artifacts") or {}, # --- meta --- "monitor_source": mon.get("source", "unknown"), "monitor_latency_ms": mon.get("latency_ms"), } async def collect_all_nodes( nodes_cfg: Dict[str, Any], router_api_key: str = "", timeout_per_node: float = 10.0, ) -> List[Dict[str, Any]]: """Parallel fan-out for all nodes. Each node gets up to timeout_per_node seconds.""" if not nodes_cfg: return [] async def _safe(node_id: str, cfg: Dict[str, Any]) -> Dict[str, Any]: if cfg.get("enabled", True) is False: return { "node_id": node_id, "label": cfg.get("label", node_id), "router_url": cfg.get("router_url") or None, "gateway_url": cfg.get("gateway_url") or None, "monitor_url": cfg.get("monitor_url") or None, "online": False, "router_ok": False, "gateway_ok": None, "disabled": True, "monitor_source": "disabled", } try: return await asyncio.wait_for( collect_node_telemetry(node_id, cfg, router_api_key), timeout=timeout_per_node, ) except asyncio.TimeoutError: return { "node_id": node_id, "label": cfg.get("label", node_id), "online": False, "router_ok": False, "gateway_ok": None, "error": f"timeout after {timeout_per_node}s", } except Exception as e: return { "node_id": node_id, "label": cfg.get("label", node_id), "online": False, "router_ok": False, "error": str(e)[:120], } tasks = [_safe(nid, ncfg) for nid, ncfg in nodes_cfg.items()] return list(await asyncio.gather(*tasks))