#!/usr/bin/env python3 """ verify_sofiia_stack.py — Sofiia stack parity verifier (NODA1 / NODA2). DAARION.city | deterministic PASS/FAIL/WARN, no LLM. Checks (per node): - Router /healthz (or /health) - /v1/tools/execute dry-run: risk_engine_tool.service, architecture_pressure_tool.service, backlog_tool.dashboard - BFF /api/status/full → reachable, router+memory reachable, alerts backend != memory - BFF /api/health → service=sofiia-console - Cron: jobs present (via status/full or local file) - Optional: supervisor health if SUPERVISOR_URL set Parity (--compare-with): - Compare BFF version between two nodes (WARN if different, not FAIL) - Compare router/memory reachable on both Usage: python3 ops/scripts/verify_sofiia_stack.py python3 ops/scripts/verify_sofiia_stack.py --node NODA2 --bff-url http://localhost:8002 python3 ops/scripts/verify_sofiia_stack.py \\ --node NODA2 --bff-url http://noda2:8002 \\ --compare-with http://noda1:8002 Exit: 0 if all critical checks PASS, 1 otherwise. """ from __future__ import annotations import argparse import json import os import sys import urllib.error import urllib.request from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent.parent CRON_FILE = REPO_ROOT / "ops" / "cron" / "jobs.cron" TOOLS_TIMEOUT = 25 CRON_JOBS_EXPECTED = [ "hourly_risk_snapshot", "daily_risk_digest", "risk_history_cleanup", "weekly_platform_priority_digest", "weekly_backlog_generate", "daily_backlog_cleanup", ] # ── HTTP helpers ────────────────────────────────────────────────────────────── def _get(url: str, timeout: int = 8) -> tuple[int, dict]: try: with urllib.request.urlopen(url, timeout=timeout) as resp: return resp.getcode(), json.loads(resp.read().decode()) except urllib.error.HTTPError as e: try: body = json.loads(e.read().decode()) except Exception: body = {} return e.code, body except Exception: return 0, {} def _post_json(url: str, body: dict, api_key: str = "", timeout: int = 30) -> tuple[int, dict]: try: data = json.dumps(body).encode() req = urllib.request.Request(url, data=data, method="POST", headers={"Content-Type": "application/json"}) if api_key: req.add_header("Authorization", f"Bearer {api_key}") with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.getcode(), json.loads(resp.read().decode()) except urllib.error.HTTPError as e: try: body = json.loads(e.read().decode()) except Exception: body = {} return e.code, body except Exception: return 0, {} # ── Individual checks ───────────────────────────────────────────────────────── def check_router_health(base_url: str) -> dict: """CRITICAL: router must respond 200.""" for path in ("/healthz", "/health"): code, _ = _get(f"{base_url.rstrip('/')}{path}", timeout=5) if code == 200: return {"name": "router_health", "pass": True, "level": "critical", "detail": f"GET {path} 200"} return {"name": "router_health", "pass": False, "level": "critical", "detail": "router unreachable (no 200 from /healthz or /health)"} def check_tool(base_url: str, tool: str, action: str, params: dict, api_key: str) -> dict: """CRITICAL: tool execute must reach router (400/422 = reached, schema error = ok).""" url = f"{base_url.rstrip('/')}/v1/tools/execute" body = {"tool": tool, "action": action, "agent_id": "sofiia", **params} code, data = _post_json(url, body, api_key=api_key, timeout=TOOLS_TIMEOUT) # 200 = success, 400/422 = reached but bad params (tool not loaded) — still PASS reached = code in (200, 400, 422) succeeded = code == 200 and ( data.get("status") == "succeeded" or data.get("data") is not None ) return { "name": f"tool_{tool}_{action}", "pass": reached, "level": "critical", "detail": ( f"HTTP {code} status={data.get('status', '—')}" + (" [data returned]" if succeeded else "") ), } def check_bff_health(bff_url: str) -> dict: """CRITICAL: BFF must identify as sofiia-console.""" code, data = _get(f"{bff_url.rstrip('/')}/api/health", timeout=6) if code == 200 and data.get("service") == "sofiia-console": return {"name": "bff_health", "pass": True, "level": "critical", "detail": f"version={data.get('version')} env={data.get('env')} uptime={data.get('uptime_s')}s", "version": data.get("version", ""), "build": data.get("build", "")} return {"name": "bff_health", "pass": False, "level": "critical", "detail": f"HTTP {code} — expected service=sofiia-console, got: {str(data)[:120]}", "version": "", "build": ""} def check_status_full(bff_url: str, env: str = "dev") -> dict: """CRITICAL: /api/status/full must show router+memory reachable + alerts backend.""" code, data = _get(f"{bff_url.rstrip('/')}/api/status/full", timeout=12) issues = [] warns = [] if code != 200: return {"name": "bff_status_full", "pass": False, "level": "critical", "detail": f"HTTP {code} — /api/status/full unreachable", "data": {}} router_ok = (data.get("router") or {}).get("reachable", False) mem_ok = (data.get("memory") or {}).get("reachable", False) ollama_ok = (data.get("ollama") or {}).get("reachable", False) backends = data.get("backends") or {} cron = data.get("cron") or {} if not router_ok: issues.append("router.reachable=false") if not mem_ok: issues.append("memory.reachable=false") # Alerts backend must not be 'memory' in prod/staging alerts_be = backends.get("alerts", "unknown") if env in ("prod", "staging") and alerts_be == "memory": issues.append(f"alerts backend=memory (must be postgres in {env})") elif alerts_be == "memory": warns.append(f"alerts backend=memory (ok in dev, not prod)") cron_installed = cron.get("installed", False) if cron_installed is False and env in ("prod", "staging"): warns.append("cron.installed=false") cron_jobs = cron.get("jobs_present", []) missing_jobs = [j for j in CRON_JOBS_EXPECTED if j not in cron_jobs] if missing_jobs and env in ("prod", "staging"): warns.append(f"cron missing jobs: {missing_jobs}") ok = len(issues) == 0 detail_parts = [ f"router={'ok' if router_ok else 'FAIL'}", f"memory={'ok' if mem_ok else 'FAIL'}", f"ollama={'ok' if ollama_ok else 'offline'}", f"alerts_be={alerts_be}", f"cron={cron_installed}", ] if issues: detail_parts.append(f"issues={issues}") if warns: detail_parts.append(f"warns={warns}") return { "name": "bff_status_full", "pass": ok, "level": "critical", "detail": " | ".join(detail_parts), "warns": warns, "data": { "router_ok": router_ok, "memory_ok": mem_ok, "ollama_ok": ollama_ok, "alerts_backend": alerts_be, "cron_installed": cron_installed, "cron_jobs_present": cron_jobs, } } def check_alerts_backend_not_memory(bff_url: str, env: str) -> dict: """CRITICAL in prod/staging: alerts must not use in-memory store.""" code, data = _get(f"{bff_url.rstrip('/')}/api/status/full", timeout=10) if code != 200: return {"name": "alerts_backend", "pass": True, "level": "warn", "detail": "skipped (status/full unreachable)"} backend = (data.get("backends") or {}).get("alerts", "unknown") if env in ("prod", "staging") and backend == "memory": return {"name": "alerts_backend", "pass": False, "level": "critical", "detail": f"alerts backend=memory in {env} — must be postgres"} return {"name": "alerts_backend", "pass": True, "level": "critical", "detail": f"alerts backend={backend}"} def check_cron_entries() -> dict: """WARN: local cron file should have all governance entries.""" if not CRON_FILE.exists(): return {"name": "cron_local_file", "pass": False, "level": "warn", "detail": f"not found: {CRON_FILE.relative_to(REPO_ROOT)}"} text = CRON_FILE.read_text(encoding="utf-8") missing = [r for r in CRON_JOBS_EXPECTED if r not in text] if missing: return {"name": "cron_local_file", "pass": False, "level": "warn", "detail": f"missing entries: {missing}"} return {"name": "cron_local_file", "pass": True, "level": "warn", "detail": "all governance entries present"} def check_supervisor(supervisor_url: str) -> dict: if not supervisor_url: return {"name": "supervisor_health", "pass": True, "level": "info", "detail": "skipped (no SUPERVISOR_URL)"} code, _ = _get(f"{supervisor_url.rstrip('/')}/health", timeout=5) ok = code == 200 return {"name": "supervisor_health", "pass": ok, "level": "warn", "detail": f"GET /health → {code}" if code else "unreachable"} # ── Parity comparison ───────────────────────────────────────────────────────── def compare_nodes(bff_a: str, bff_b: str, node_a: str = "A", node_b: str = "B") -> list[dict]: """Compare two BFF nodes. Returns list of parity check results.""" checks = [] def _full(url: str) -> dict: _, d = _get(f"{url.rstrip('/')}/api/status/full", timeout=10) return d def _health(url: str) -> dict: _, d = _get(f"{url.rstrip('/')}/api/health", timeout=6) return d ha, hb = _health(bff_a), _health(bff_b) ver_a, ver_b = ha.get("version", "?"), hb.get("version", "?") version_match = ver_a == ver_b checks.append({ "name": f"parity_version_{node_a}_vs_{node_b}", "pass": version_match, "level": "warn", # mismatch is WARN, not FAIL "detail": f"{node_a}={ver_a} {node_b}={ver_b}" + ("" if version_match else " [MISMATCH — consider deploying same version]"), }) fa, fb = _full(bff_a), _full(bff_b) for key in ("router", "memory"): ok_a = (fa.get(key) or {}).get("reachable", False) ok_b = (fb.get(key) or {}).get("reachable", False) same = ok_a == ok_b checks.append({ "name": f"parity_{key}_reachable_{node_a}_vs_{node_b}", "pass": ok_a and ok_b, # FAIL if either node missing critical service "level": "critical" if key == "router" else "warn", "detail": f"{node_a}.{key}={'ok' if ok_a else 'FAIL'} {node_b}.{key}={'ok' if ok_b else 'FAIL'}", }) be_a = (fa.get("backends") or {}).get("alerts", "?") be_b = (fb.get("backends") or {}).get("alerts", "?") checks.append({ "name": f"parity_alerts_backend_{node_a}_vs_{node_b}", "pass": be_a == be_b, "level": "warn", "detail": f"{node_a}.alerts={be_a} {node_b}.alerts={be_b}" + ("" if be_a == be_b else " [backends differ]"), }) return checks # ── Main ────────────────────────────────────────────────────────────────────── def main() -> int: ap = argparse.ArgumentParser(description="Verify Sofiia stack (NODA1/NODA2)") ap.add_argument("--node", default="NODA2", help="Node label (for display)") ap.add_argument("--router-url", default=os.getenv("ROUTER_URL", "http://localhost:8000"), help="Router URL for this node") ap.add_argument("--bff-url", default=os.getenv("BFF_URL", "http://localhost:8002"), help="sofiia-console BFF URL for this node") ap.add_argument("--compare-with", default=os.getenv("COMPARE_WITH_BFF", ""), help="Second BFF URL for parity comparison (optional)") ap.add_argument("--compare-node", default="NODA1", help="Label for the comparison node (default: NODA1)") ap.add_argument("--supervisor-url", default=os.getenv("SUPERVISOR_URL", "")) ap.add_argument("--api-key", default=os.getenv("SUPERVISOR_API_KEY", "")) ap.add_argument("--env", default=os.getenv("ENV", "dev"), help="Environment (dev|staging|prod) — affects alert backend strictness") ap.add_argument("--json", dest="json_out", action="store_true", help="JSON output only") args = ap.parse_args() api_key = args.api_key.strip() env = args.env.strip().lower() results: list[dict] = [] # ── Router checks ────────────────────────────────────────────────────────── results.append(check_router_health(args.router_url)) results.append(check_tool(args.router_url, "risk_engine_tool", "service", {"env": "prod", "service": "gateway"}, api_key)) results.append(check_tool(args.router_url, "architecture_pressure_tool", "service", {"env": "prod", "service": "gateway"}, api_key)) results.append(check_tool(args.router_url, "backlog_tool", "dashboard", {"env": "prod"}, api_key)) # ── BFF checks ───────────────────────────────────────────────────────────── results.append(check_bff_health(args.bff_url)) results.append(check_status_full(args.bff_url, env=env)) # ── Cron (local file) ────────────────────────────────────────────────────── results.append(check_cron_entries()) # ── Supervisor (optional) ────────────────────────────────────────────────── results.append(check_supervisor(args.supervisor_url)) # ── Parity (optional) ───────────────────────────────────────────────────── parity_results: list[dict] = [] if args.compare_with: parity_results = compare_nodes( args.bff_url, args.compare_with, node_a=args.node, node_b=args.compare_node, ) results.extend(parity_results) # ── Evaluate ─────────────────────────────────────────────────────────────── critical_fail = [r for r in results if not r["pass"] and r.get("level") == "critical"] warn_fail = [r for r in results if not r["pass"] and r.get("level") in ("warn",)] all_pass = len(critical_fail) == 0 # Collect all inline warns from status_full inline_warns: list[str] = [] for r in results: if isinstance(r.get("warns"), list): inline_warns.extend(r["warns"]) summary = { "node": args.node, "env": env, "bff_url": args.bff_url, "router_url": args.router_url, "pass": all_pass, "critical_failures": [r["name"] for r in critical_fail], "warnings": [r["name"] for r in warn_fail] + inline_warns, "checks": results, "parity_checks": parity_results, "recommendations": ( [] if all_pass else ["Fix critical failures listed above."] + ([f"alerts_backend must be postgres (not memory) in {env}"] if any("alerts backend=memory" in r.get("detail","") for r in critical_fail) else []) + (["Ensure cron jobs are deployed on this node"] if any("cron" in r["name"] for r in warn_fail) else []) ), } if args.json_out: print(json.dumps(summary, indent=2)) else: print(f"\n{'='*60}") print(f" Sofiia Stack Verifier — {args.node} ({env.upper()})") print(f" BFF: {args.bff_url}") print(f" Router: {args.router_url}") if args.compare_with: print(f" Parity: comparing with {args.compare_node} @ {args.compare_with}") print(f"{'='*60}\n") all_checks = [r for r in results if r not in parity_results] if parity_results: print("Node checks:") for r in all_checks: icon = "✓" if r["pass"] else ("⚠" if r.get("level") == "warn" else "✗") lvl = f"[{r.get('level','?').upper():<8}]" print(f" {icon} {lvl} {r['name']:<45} {r.get('detail','')}") if r.get("warns"): for w in r["warns"]: print(f" ⚠ {w}") if parity_results: print("\nParity checks:") for r in parity_results: icon = "✓" if r["pass"] else ("⚠" if r.get("level") == "warn" else "✗") lvl = f"[{r.get('level','?').upper():<8}]" print(f" {icon} {lvl} {r['name']:<55} {r.get('detail','')}") print() if all_pass: print(f" OVERALL: ✓ PASS (warnings: {len(summary['warnings'])})") else: print(f" OVERALL: ✗ FAIL") print(f" Critical failures: {summary['critical_failures']}") if summary["warnings"]: print(f" Warnings: {summary['warnings']}") if summary["recommendations"]: print(f"\n Recommendations:") for rec in summary["recommendations"]: print(f" → {rec}") print() return 0 if all_pass else 1 if __name__ == "__main__": sys.exit(main())