New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
304 lines
11 KiB
Python
304 lines
11 KiB
Python
"""
|
|
Monitor telemetry bridge — probes each node's monitor endpoint.
|
|
|
|
Each node CAN expose GET /monitor/status (or /healthz extended).
|
|
This module does a best-effort fan-out: missing/unreachable nodes
|
|
return {"online": false} without crashing the dashboard.
|
|
|
|
Expected monitor/status response shape (node provides):
|
|
{
|
|
"online": true,
|
|
"ts": "ISO",
|
|
"node_id": "NODA1",
|
|
"heartbeat_age_s": 5,
|
|
"router": {"ok": true, "latency_ms": 12},
|
|
"gateway": {"ok": true, "latency_ms": 8},
|
|
"alerts_loop_slo": {"p95_ms": 320, "failed_rate": 0.0},
|
|
"open_incidents": 2,
|
|
"backends": {"alerts": "postgres", "audit": "auto", ...},
|
|
"last_artifacts": {
|
|
"risk_digest": "2026-02-24",
|
|
"platform_digest": "2026-W08",
|
|
"backlog": "2026-02-24"
|
|
}
|
|
}
|
|
|
|
If a node only has /healthz, we synthesise a partial status from it.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
import httpx
|
|
|
|
# Timeout per node probe (seconds)
|
|
_PROBE_TIMEOUT = 8.0
|
|
# Paths tried in order for monitor status
|
|
_MONITOR_PATHS = ["/monitor/status", "/api/monitor/status"]
|
|
# Fallback health paths for basic online check
|
|
_HEALTH_PATHS = ["/healthz", "/health"]
|
|
|
|
|
|
def _running_in_docker() -> bool:
|
|
return os.path.exists("/.dockerenv")
|
|
|
|
|
|
def _normalize_probe_url(base_url: str) -> str:
|
|
"""
|
|
Inside Docker, localhost points to the container itself.
|
|
Remap localhost/127.0.0.1 to host.docker.internal for node probes.
|
|
"""
|
|
if not base_url:
|
|
return base_url
|
|
if not _running_in_docker():
|
|
return base_url
|
|
try:
|
|
parsed = urlparse(base_url)
|
|
if parsed.hostname in ("localhost", "127.0.0.1"):
|
|
netloc = parsed.netloc.replace(parsed.hostname, "host.docker.internal")
|
|
return urlunparse(parsed._replace(netloc=netloc))
|
|
except Exception:
|
|
return base_url
|
|
return base_url
|
|
|
|
|
|
async def _probe_monitor(base_url: str, timeout: float = _PROBE_TIMEOUT) -> Dict[str, Any]:
|
|
"""
|
|
Probe a node's monitor endpoint.
|
|
Returns the monitor status dict (may be synthesised from /healthz).
|
|
"""
|
|
base = base_url.rstrip("/")
|
|
t0 = time.monotonic()
|
|
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
# Try dedicated /monitor/status first
|
|
for path in _MONITOR_PATHS:
|
|
try:
|
|
r = await client.get(f"{base}{path}")
|
|
if r.status_code == 200:
|
|
d = r.json()
|
|
d.setdefault("online", True)
|
|
d.setdefault("latency_ms", int((time.monotonic() - t0) * 1000))
|
|
d.setdefault("source", "monitor_endpoint")
|
|
return d
|
|
except Exception:
|
|
continue
|
|
|
|
# Fallback: synthesise from /healthz
|
|
for path in _HEALTH_PATHS:
|
|
try:
|
|
r = await client.get(f"{base}{path}")
|
|
if r.status_code == 200:
|
|
latency = int((time.monotonic() - t0) * 1000)
|
|
try:
|
|
hdata = r.json()
|
|
except Exception:
|
|
hdata = {}
|
|
return {
|
|
"online": True,
|
|
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
"latency_ms": latency,
|
|
"source": "healthz_fallback",
|
|
"router": {"ok": hdata.get("ok", True), "latency_ms": latency},
|
|
"gateway": None,
|
|
"alerts_loop_slo": None,
|
|
"open_incidents": None,
|
|
"backends": {},
|
|
"last_artifacts": {},
|
|
}
|
|
except Exception:
|
|
continue
|
|
|
|
return {
|
|
"online": False,
|
|
"ts": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
"latency_ms": None,
|
|
"source": "unreachable",
|
|
"error": f"no response from {base}",
|
|
}
|
|
|
|
|
|
async def _probe_router(router_url: str, timeout: float = 5.0) -> Dict[str, Any]:
|
|
"""Quick router health probe."""
|
|
base = router_url.rstrip("/")
|
|
t0 = time.monotonic()
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
for path in ("/healthz", "/health"):
|
|
try:
|
|
r = await client.get(f"{base}{path}")
|
|
if r.status_code == 200:
|
|
latency = int((time.monotonic() - t0) * 1000)
|
|
try:
|
|
d = r.json()
|
|
except Exception:
|
|
d = {}
|
|
return {"ok": True, "latency_ms": latency, "detail": d.get("status", "ok")}
|
|
except Exception:
|
|
continue
|
|
return {"ok": False, "latency_ms": None}
|
|
|
|
|
|
async def _probe_gateway(gateway_url: str, timeout: float = 5.0) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Gateway health probe — also extracts build_sha, agents_count, required_missing
|
|
from /health response when available.
|
|
"""
|
|
if not gateway_url:
|
|
return None
|
|
base = gateway_url.rstrip("/")
|
|
t0 = time.monotonic()
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
for path in ("/health", "/healthz", "/"):
|
|
try:
|
|
r = await client.get(f"{base}{path}", timeout=timeout)
|
|
latency = int((time.monotonic() - t0) * 1000)
|
|
if r.status_code < 500:
|
|
ok = r.status_code < 400
|
|
result: Dict[str, Any] = {"ok": ok, "latency_ms": latency}
|
|
if ok:
|
|
try:
|
|
d = r.json()
|
|
result["agents_count"] = d.get("agents_count")
|
|
result["build_sha"] = d.get("build_sha")
|
|
result["build_time"] = d.get("build_time")
|
|
result["node_id"] = d.get("node_id")
|
|
result["required_missing"] = d.get("required_missing", [])
|
|
except Exception:
|
|
pass
|
|
return result
|
|
except Exception:
|
|
continue
|
|
return {"ok": False, "latency_ms": None}
|
|
|
|
|
|
async def collect_node_telemetry(
|
|
node_id: str,
|
|
cfg: Dict[str, Any],
|
|
router_api_key: str = "",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Full telemetry for one node.
|
|
Runs monitor probe, router probe, gateway probe in parallel.
|
|
Returns merged/normalised result.
|
|
"""
|
|
router_url = _normalize_probe_url(cfg.get("router_url", ""))
|
|
gateway_url = _normalize_probe_url(cfg.get("gateway_url", ""))
|
|
monitor_url = _normalize_probe_url(cfg.get("monitor_url") or router_url) # default: same host as router
|
|
|
|
async def _no_monitor() -> Dict[str, Any]:
|
|
return {"online": False, "source": "no_url"}
|
|
|
|
async def _no_router() -> Dict[str, Any]:
|
|
return {"ok": False}
|
|
|
|
# Fan-out parallel probes
|
|
results = await asyncio.gather(
|
|
_probe_monitor(monitor_url) if monitor_url else _no_monitor(),
|
|
_probe_router(router_url) if router_url else _no_router(),
|
|
_probe_gateway(gateway_url),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
mon = results[0] if not isinstance(results[0], Exception) else {"online": False, "error": str(results[0])[:100]}
|
|
rtr = results[1] if not isinstance(results[1], Exception) else {"ok": False}
|
|
gwy = results[2] if not isinstance(results[2], Exception) else None
|
|
|
|
# Merge: router from dedicated probe overrides monitor.router if present
|
|
# (dedicated probe is more accurate; monitor.router may be stale)
|
|
router_merged = {
|
|
"ok": rtr.get("ok", False),
|
|
"latency_ms": rtr.get("latency_ms"),
|
|
}
|
|
gateway_merged = gwy # may be None
|
|
|
|
# Determine overall online status
|
|
online = rtr.get("ok", False) or mon.get("online", False)
|
|
|
|
gwy_data = gateway_merged or {}
|
|
|
|
return {
|
|
"node_id": node_id,
|
|
"label": cfg.get("label", node_id),
|
|
"node_role": cfg.get("node_role", "prod"),
|
|
"router_url": router_url,
|
|
"gateway_url": gateway_url or None,
|
|
"monitor_url": monitor_url or None,
|
|
"ssh_configured": bool(cfg.get("ssh")),
|
|
"online": online,
|
|
"ts": mon.get("ts") or datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
# --- router ---
|
|
"router_ok": router_merged["ok"],
|
|
"router_latency_ms": router_merged["latency_ms"],
|
|
# --- gateway ---
|
|
"gateway_ok": gwy_data.get("ok"),
|
|
"gateway_latency_ms": gwy_data.get("latency_ms"),
|
|
"gateway_agents_count": gwy_data.get("agents_count"),
|
|
"gateway_build_sha": gwy_data.get("build_sha"),
|
|
"gateway_build_time": gwy_data.get("build_time"),
|
|
"gateway_required_missing": gwy_data.get("required_missing", []),
|
|
# --- monitor extended (present only if monitor endpoint exists) ---
|
|
"heartbeat_age_s": mon.get("heartbeat_age_s"),
|
|
"alerts_loop_slo": mon.get("alerts_loop_slo"),
|
|
"open_incidents": mon.get("open_incidents"),
|
|
"backends": mon.get("backends") or {},
|
|
"last_artifacts": mon.get("last_artifacts") or {},
|
|
# --- meta ---
|
|
"monitor_source": mon.get("source", "unknown"),
|
|
"monitor_latency_ms": mon.get("latency_ms"),
|
|
}
|
|
|
|
|
|
async def collect_all_nodes(
|
|
nodes_cfg: Dict[str, Any],
|
|
router_api_key: str = "",
|
|
timeout_per_node: float = 10.0,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Parallel fan-out for all nodes. Each node gets up to timeout_per_node seconds."""
|
|
if not nodes_cfg:
|
|
return []
|
|
|
|
async def _safe(node_id: str, cfg: Dict[str, Any]) -> Dict[str, Any]:
|
|
if cfg.get("enabled", True) is False:
|
|
return {
|
|
"node_id": node_id,
|
|
"label": cfg.get("label", node_id),
|
|
"router_url": cfg.get("router_url") or None,
|
|
"gateway_url": cfg.get("gateway_url") or None,
|
|
"monitor_url": cfg.get("monitor_url") or None,
|
|
"online": False,
|
|
"router_ok": False,
|
|
"gateway_ok": None,
|
|
"disabled": True,
|
|
"monitor_source": "disabled",
|
|
}
|
|
try:
|
|
return await asyncio.wait_for(
|
|
collect_node_telemetry(node_id, cfg, router_api_key),
|
|
timeout=timeout_per_node,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
return {
|
|
"node_id": node_id,
|
|
"label": cfg.get("label", node_id),
|
|
"online": False,
|
|
"router_ok": False,
|
|
"gateway_ok": None,
|
|
"error": f"timeout after {timeout_per_node}s",
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"node_id": node_id,
|
|
"label": cfg.get("label", node_id),
|
|
"online": False,
|
|
"router_ok": False,
|
|
"error": str(e)[:120],
|
|
}
|
|
|
|
tasks = [_safe(nid, ncfg) for nid, ncfg in nodes_cfg.items()]
|
|
return list(await asyncio.gather(*tasks))
|