Files
microdao-daarion/ops/fabric_snapshot.py
Apple 9a36020316 P3.5-P3.7: 2-layer inventory, capability routing, STT/TTS adapters, Dev Contract
NCS:
- _collect_worker_caps() fetches capability flags from node-worker /caps
- _derive_capabilities() merges served model types + worker provider flags
- installed_artifacts replaces inventory_only (disk scan with DISK_SCAN_PATHS env)
- New endpoints: /capabilities/caps, /capabilities/installed

Node Worker:
- STT_PROVIDER, TTS_PROVIDER, OCR_PROVIDER, IMAGE_PROVIDER env flags
- /caps endpoint returns capabilities + providers for NCS aggregation
- STT adapter (providers/stt_mlx_whisper.py) — remote + local mode
- TTS adapter (providers/tts_mlx_kokoro.py) — remote + local mode
- OCR handler via vision_prompted (ollama_vision with OCR prompt)
- NATS subjects: node.{id}.stt/tts/ocr/image.request

Router:
- POST /v1/capability/{stt,tts,ocr,image} — capability-based offload routing
- GET /v1/capabilities — global view with capabilities_by_node
- require_fresh_caps(ttl) preflight guard
- find_nodes_with_capability(cap) + load-based node selection

Ops:
- ops/fabric_snapshot.py — full runtime snapshot collector
- ops/fabric_preflight.sh — quick check + snapshot save + diff
- docs/fabric_contract.md — Dev Contract v0.1 (preflight-first)
- tests/test_fabric_contract.py — CI enforcement (6 tests)

Made-with: Cursor
2026-02-27 05:24:09 -08:00

290 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""Fabric Snapshot — collect full runtime truth from a node.
Queries every endpoint (NCS, node-worker, router, Ollama, docker)
and saves a single JSON artifact for preflight/postflight comparison.
Usage:
python3 ops/fabric_snapshot.py [--node-id NODA2] [--ncs URL] [--router URL] ...
python3 ops/fabric_snapshot.py --ssh root@144.76.224.179 # remote node
"""
import argparse
import asyncio
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
SCRIPT_DIR = Path(__file__).parent
SNAPSHOT_DIR = SCRIPT_DIR / "preflight_snapshots"
try:
import httpx
except ImportError:
print("httpx not installed; pip install httpx", file=sys.stderr)
sys.exit(1)
async def _get(url: str, timeout: float = 5.0) -> Dict[str, Any]:
try:
async with httpx.AsyncClient(timeout=timeout) as c:
r = await c.get(url)
if r.status_code == 200:
return {"status": "ok", "data": r.json()}
return {"status": f"http_{r.status_code}", "data": None}
except Exception as e:
return {"status": f"error:{type(e).__name__}", "data": None, "error": str(e)[:200]}
def _run_cmd(cmd: List[str], timeout: int = 10) -> Dict[str, Any]:
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return {"status": "ok" if result.returncode == 0 else f"exit_{result.returncode}",
"stdout": result.stdout[:10000], "stderr": result.stderr[:2000]}
except FileNotFoundError:
return {"status": "not_found"}
except subprocess.TimeoutExpired:
return {"status": "timeout"}
except Exception as e:
return {"status": f"error:{e}"}
def _ssh_cmd(ssh_target: str, remote_cmd: str, timeout: int = 15) -> Dict[str, Any]:
full = ["ssh", "-o", "StrictHostKeyChecking=accept-new",
"-o", "ConnectTimeout=5", ssh_target, remote_cmd]
return _run_cmd(full, timeout=timeout)
async def collect_snapshot(
node_id: str,
ncs_url: str,
worker_url: str,
router_url: str,
ollama_url: str,
ssh_target: str = "",
) -> Dict[str, Any]:
snap: Dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"node_id": node_id,
"collector": "fabric_snapshot.py",
"errors": [],
}
# --- NCS ---
ncs_caps = await _get(f"{ncs_url}/capabilities")
ncs_caps_only = await _get(f"{ncs_url}/capabilities/caps")
ncs_installed = await _get(f"{ncs_url}/capabilities/installed")
snap["ncs"] = {
"url": ncs_url,
"capabilities_full": ncs_caps.get("data"),
"capabilities_flags": ncs_caps_only.get("data"),
"installed": ncs_installed.get("data"),
}
if ncs_caps["status"] != "ok":
snap["errors"].append(f"NCS /capabilities: {ncs_caps['status']}")
ncs_data = ncs_caps.get("data") or {}
snap["served_models"] = ncs_data.get("served_models", [])
snap["served_count"] = ncs_data.get("served_count", 0)
snap["installed_artifacts"] = (ncs_installed.get("data") or {}).get("installed_artifacts", [])
snap["installed_count"] = (ncs_installed.get("data") or {}).get("installed_count", 0)
snap["capabilities"] = ncs_data.get("capabilities", {})
snap["node_load"] = ncs_data.get("node_load", {})
snap["runtimes"] = ncs_data.get("runtimes", {})
# --- Node Worker ---
worker_caps = await _get(f"{worker_url}/caps")
worker_health = await _get(f"{worker_url}/healthz")
worker_metrics = await _get(f"{worker_url}/metrics")
snap["worker"] = {
"url": worker_url,
"caps": worker_caps.get("data"),
"health": worker_health.get("data"),
"metrics": worker_metrics.get("data"),
}
if worker_health["status"] != "ok":
snap["errors"].append(f"Worker /healthz: {worker_health['status']}")
# --- Router ---
router_health = await _get(f"{router_url}/health")
router_models = await _get(f"{router_url}/v1/models")
router_caps = await _get(f"{router_url}/v1/capabilities")
snap["router"] = {
"url": router_url,
"health": router_health.get("data"),
"models": router_models.get("data"),
"capabilities": router_caps.get("data"),
}
if router_health["status"] != "ok":
snap["errors"].append(f"Router /health: {router_health['status']}")
# --- Ollama ---
ollama_tags = await _get(f"{ollama_url}/api/tags")
ollama_ps = await _get(f"{ollama_url}/api/ps")
snap["ollama"] = {
"url": ollama_url,
"tags": ollama_tags.get("data"),
"ps": ollama_ps.get("data"),
}
if ollama_tags["status"] != "ok":
snap["errors"].append(f"Ollama /api/tags: {ollama_tags['status']}")
# --- Docker ---
if ssh_target:
docker_ps = _ssh_cmd(ssh_target, "docker ps --format '{{.Names}}\\t{{.Status}}\\t{{.Ports}}'")
else:
docker_ps = _run_cmd(["docker", "ps", "--format", "{{.Names}}\t{{.Status}}\t{{.Ports}}"])
containers = []
if docker_ps["status"] == "ok":
for line in docker_ps.get("stdout", "").strip().split("\n"):
parts = line.split("\t")
if len(parts) >= 2:
containers.append({
"name": parts[0],
"status": parts[1],
"ports": parts[2] if len(parts) > 2 else "",
})
snap["docker"] = {"containers": containers, "container_count": len(containers)}
# --- Summary ---
snap["summary"] = {
"ncs_ok": ncs_caps["status"] == "ok",
"worker_ok": worker_health["status"] == "ok",
"router_ok": router_health["status"] == "ok",
"ollama_ok": ollama_tags["status"] == "ok",
"served_count": snap["served_count"],
"installed_count": snap["installed_count"],
"capabilities": {k: v for k, v in snap["capabilities"].items() if k != "providers"},
"container_count": len(containers),
"error_count": len(snap["errors"]),
"passed": len(snap["errors"]) == 0,
}
return snap
def save_snapshot(snap: Dict[str, Any], out_dir: Path = SNAPSHOT_DIR) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
node = snap.get("node_id", "unknown").lower()
ts = datetime.now().strftime("%Y-%m-%d_%H%M%S")
path = out_dir / f"{node}_{ts}.json"
with open(path, "w") as f:
json.dump(snap, f, indent=2, ensure_ascii=False, default=str)
return path
def find_previous_snapshot(node_id: str, out_dir: Path = SNAPSHOT_DIR) -> Optional[Dict]:
prefix = node_id.lower() + "_"
files = sorted(
[f for f in out_dir.glob(f"{prefix}*.json")],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if len(files) < 2:
return None
with open(files[1]) as f:
return json.load(f)
def print_diff(current: Dict, previous: Optional[Dict]):
if not previous:
print(" (no previous snapshot to compare)")
return
diffs = []
for key in ("served_count", "installed_count"):
old = previous.get("summary", {}).get(key, previous.get(key, "?"))
new = current.get("summary", {}).get(key, current.get(key, "?"))
if old != new:
diffs.append(f" {key}: {old}{new}")
old_caps = previous.get("summary", {}).get("capabilities", previous.get("capabilities", {}))
new_caps = current.get("summary", {}).get("capabilities", current.get("capabilities", {}))
all_keys = set(list(old_caps.keys()) + list(new_caps.keys()))
for k in sorted(all_keys):
if k == "providers":
continue
ov, nv = old_caps.get(k, "?"), new_caps.get(k, "?")
if ov != nv:
diffs.append(f" caps.{k}: {ov}{nv}")
old_ct = previous.get("docker", {}).get("container_count", "?")
new_ct = current.get("docker", {}).get("container_count", "?")
if old_ct != new_ct:
diffs.append(f" containers: {old_ct}{new_ct}")
if diffs:
print(" Changes vs previous snapshot:")
for d in diffs:
print(d)
else:
print(" (no changes vs previous snapshot)")
def print_summary(snap: Dict):
s = snap.get("summary", {})
print(f" node_id: {snap.get('node_id')}")
print(f" served: {s.get('served_count')}")
print(f" installed: {s.get('installed_count')}")
print(f" containers: {s.get('container_count')}")
caps = s.get("capabilities", {})
cap_str = " ".join(f"{k}={'Y' if v else 'N'}" for k, v in caps.items())
print(f" capabilities: {cap_str}")
print(f" errors: {s.get('error_count')}")
if snap.get("errors"):
for e in snap["errors"]:
print(f"{e}")
passed = s.get("passed", False)
status = "\033[32mPASSED\033[0m" if passed else "\033[31mFAILED\033[0m"
print(f" result: {status}")
def main():
parser = argparse.ArgumentParser(description="Fabric Snapshot Collector")
parser.add_argument("--node-id", default="NODA2")
parser.add_argument("--ncs", default="http://127.0.0.1:8099")
parser.add_argument("--worker", default="http://127.0.0.1:8109")
parser.add_argument("--router", default="http://127.0.0.1:9102")
parser.add_argument("--ollama", default="http://127.0.0.1:11434")
parser.add_argument("--ssh", default="", help="SSH target for remote docker ps (e.g. root@1.2.3.4)")
parser.add_argument("--out-dir", default=str(SNAPSHOT_DIR))
parser.add_argument("--json-only", action="store_true", help="Print JSON to stdout, no save")
args = parser.parse_args()
snap = asyncio.run(collect_snapshot(
node_id=args.node_id,
ncs_url=args.ncs,
worker_url=args.worker,
router_url=args.router,
ollama_url=args.ollama,
ssh_target=args.ssh,
))
if args.json_only:
print(json.dumps(snap, indent=2, ensure_ascii=False, default=str))
return
out_dir = Path(args.out_dir)
path = save_snapshot(snap, out_dir)
print(f"╔══════════════════════════════════════╗")
print(f"║ Fabric Snapshot: {args.node_id:<18s}")
print(f"╚══════════════════════════════════════╝")
print()
print_summary(snap)
print()
prev = find_previous_snapshot(args.node_id, out_dir)
print_diff(snap, prev)
print()
print(f"Saved: {path}")
if not snap.get("summary", {}).get("passed"):
sys.exit(1)
if __name__ == "__main__":
main()