diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml index 1ab1602b..d31d4f55 100644 --- a/docker-compose.node1.yml +++ b/docker-compose.node1.yml @@ -534,6 +534,10 @@ services: - NODE_DEFAULT_VISION=qwen3-vl:8b - NODE_WORKER_MAX_CONCURRENCY=2 - NCS_REPORT_URL=http://node-capabilities:8099 + - STT_PROVIDER=none + - TTS_PROVIDER=none + - OCR_PROVIDER=vision_prompted + - IMAGE_PROVIDER=none depends_on: - nats networks: diff --git a/docker-compose.node2-sofiia.yml b/docker-compose.node2-sofiia.yml index a0a8a84f..2f2ac7ab 100644 --- a/docker-compose.node2-sofiia.yml +++ b/docker-compose.node2-sofiia.yml @@ -120,6 +120,9 @@ services: - "127.0.0.1:8099:8099" extra_hosts: - "host.docker.internal:host-gateway" + volumes: + - ~/.ollama/models:/host_models/ollama:ro + - ~/.cache/huggingface/hub:/host_models/hf_cache:ro environment: - NODE_ID=NODA2 - OLLAMA_BASE_URL=http://host.docker.internal:11434 @@ -129,6 +132,7 @@ services: - ENABLE_NATS_CAPS=true - NATS_URL=nats://dagi-nats:4222 - NODE_WORKER_URL=http://node-worker:8109 + - DISK_SCAN_PATHS=/host_models/ollama,/host_models/hf_cache depends_on: - swapper-service - dagi-nats @@ -153,6 +157,11 @@ services: - NODE_DEFAULT_VISION=llava:13b - NODE_WORKER_MAX_CONCURRENCY=2 - NCS_REPORT_URL=http://node-capabilities:8099 + # Capability providers (none = not available on this node) + - STT_PROVIDER=none + - TTS_PROVIDER=none + - OCR_PROVIDER=vision_prompted + - IMAGE_PROVIDER=none depends_on: - dagi-nats networks: diff --git a/docs/fabric_contract.md b/docs/fabric_contract.md new file mode 100644 index 00000000..b03b67fb --- /dev/null +++ b/docs/fabric_contract.md @@ -0,0 +1,159 @@ +# Dev Contract: Preflight-first, Node-specific, Zero Assumptions (v0.1) + +## 0. Заборона припущень + +Будь-яка дія/пропозиція щодо моделей, провайдерів, портів, compose або routing **заборонена без preflight snapshot** з цільової ноди. + +Без snapshot — ні коміт, ні деплой, ні рекомендація. + +## 1. Обов'язковий Preflight Snapshot + +### 1.1 Збір + +Перед кожною зміною запустити `ops/fabric_snapshot.py` на цільовій ноді: + +```bash +# NODA2 (локально) +python3 ops/fabric_snapshot.py --node-id NODA2 + +# NODA1 (через SSH tunnel) +ssh -L 18099:127.0.0.1:8099 -L 18109:127.0.0.1:8109 \ + -L 19102:127.0.0.1:9102 -fN root@144.76.224.179 +python3 ops/fabric_snapshot.py --node-id noda1 \ + --ncs http://127.0.0.1:18099 --worker http://127.0.0.1:18109 \ + --router http://127.0.0.1:19102 --ollama http://127.0.0.1:21434 \ + --ssh root@144.76.224.179 +``` + +Snapshot зберігається в `ops/preflight_snapshots/_.json`. + +### 1.2 Endpoints (всі обов'язкові) + +| Endpoint | Що перевіряє | +|---|---| +| `NCS /capabilities` | served_models, capabilities, node_load, runtimes | +| `NCS /capabilities/caps` | capability flags (stt/tts/ocr/image) | +| `NCS /capabilities/installed` | installed_artifacts (disk scan) | +| `node-worker /caps` | provider flags (STT_PROVIDER, TTS_PROVIDER...) | +| `node-worker /healthz` | NATS connection, concurrency | +| `Router /v1/capabilities` | global view (всі ноди, capabilities_by_node) | +| `Router /v1/models` | глобальний пул моделей | +| `Ollama /api/tags` | реальні Ollama моделі на ноді | +| `docker ps` | список контейнерів | + +### 1.3 Quality gate (must-pass) + +Snapshot валідний тільки якщо: + +- Router healthy +- NCS healthy +- Node-worker healthy +- `capabilities_by_node` містить цільову ноду +- `served_models` не порожній (або є пояснення "compute-less node") + +## 2. Served ≠ Installed + +### 2.1 Два шари правди + +| Шар | Джерело | Використання | +|---|---|---| +| **served_models** | NCS /capabilities → runtimes (Ollama/llama-server/...) | Routing, model selection, offload | +| **installed_artifacts** | NCS /capabilities/installed → disk scan | Інвентаризація, recommendations, cleanup | + +Модель на диску — це **candidate**, не "доступна". + +### 2.2 Заборона hardcode + +- Заборонено комітити `models:` списки в swapper/router configs +- Дозволено: policy-only `prefer` (тип/клас моделі), але не імена, крім критичних cloud SKU + +## 3. Capability-first routing + +### 3.1 Routing rules + +Router обирає **ноду**, не "модель": + +1. `find_nodes_with_capability(cap)` — тільки ноди де cap=true +2. `require_fresh_caps(ttl=30)` — preflight guard +3. Circuit breaker — виключити ноди з відкритим breaker +4. Load scoring — `inflight * 10 + (100 if mem_pressure=high)` +5. Offload через NATS `node.{id}.{cap}.request` + +### 3.2 Fail fast + +Якщо capability відсутня на всіх нодах — **fail fast** з явним повідомленням: + +```json +{"error": "No node with capability 'stt' available", "capabilities_by_node": {...}} +``` + +Заборонено: тихий fallback на cloud без WARNING log. + +### 3.3 Нодозалежність + +STT/TTS/OCR/Image **можуть бути різними** на різних нодах: + +- NODA2: `STT_PROVIDER=mlx_whisper`, `TTS_PROVIDER=mlx_kokoro` +- NODA1: `STT_PROVIDER=none`, `OCR_PROVIDER=vision_prompted` + +Вмикання capability = тільки через env flags в node-worker → `/caps`. + +## 4. Безпечний контроль змін + +### 4.1 План змін (обов'язково) + +Перед зміною відповісти на: + +1. **Що** міняємо +2. **Чому** +3. **Що може зламатися** +4. **Як перевіряємо** (postflight) +5. **Rollback** (точна команда) + +### 4.2 Rollback + +Кожна зміна має мати: + +- git commit hash / tag +- одну команду rollback (`docker compose up -d --force-recreate `, `git checkout`) + +## 5. Postflight + +Після кожної зміни — повторний snapshot і порівняння: + +- served_models count (не зменшився без причини) +- capabilities map (нові = очікувані) +- container count +- error rate (prom_metrics) + +## 6. Жодних прихованих fallback + +- Невідомий профіль або відсутній API key → WARNING + deterministic fallback на `agent.fallback_llm` +- Заборонено: "мовчки пішли в DeepSeek" без логу + +## 7. Канонічні артефакти + +| Артефакт | Призначення | +|---|---| +| `ops/fabric_snapshot.py` | Збір повного snapshot | +| `ops/fabric_preflight.sh` | Quick check + snapshot save | +| `ops/preflight_snapshots/` | Зберігання snapshots | +| `docs/fabric_contract.md` | Цей контракт | + +## Реальний стан (snapshot 2026-02-27) + +### NODA1 (production) + +- **49 контейнерів** (gateway, router, memory, qdrant, neo4j, redis, postgres, minio, rag, swapper, farmos, brand-*, oneok-*, plant-vision, crawl4ai, grafana, prometheus...) +- **5 Ollama моделей**: qwen3-vl:8b (vision), qwen3:8b, qwen3.5:27b-q4_K_M, smollm2:135m, deepseek-v3.1:671b-cloud +- **14 Telegram агентів** active +- **NCS P3.5 не задеплоєний** — capabilities flags відсутні, installed_artifacts = 0 +- `swapper=disabled`, worker NATS connected + +### NODA2 (development) + +- **14 контейнерів** (router, node-worker, node-capabilities, nats, gateway, memory, qdrant, postgres, neo4j, redis, open-webui, sofiia-console, swapper) +- **13 served моделей** (Ollama: 12 + llama_server: 1) +- **29 installed artifacts** на диску (150.3GB LLM + 0.3GB TTS kokoro-v1_0) +- **capabilities**: llm=Y, vision=Y, ocr=Y, stt=N, tts=N, image=N +- `OCR_PROVIDER=vision_prompted` diff --git a/ops/fabric_preflight.sh b/ops/fabric_preflight.sh new file mode 100755 index 00000000..90dd0f58 --- /dev/null +++ b/ops/fabric_preflight.sh @@ -0,0 +1,186 @@ +#!/usr/bin/env bash +# Fabric Preflight — verify all nodes before changes/deploys. +# Saves snapshot, compares with previous, fails hard on critical issues. +# +# Usage: +# bash ops/fabric_preflight.sh [NCS_URL] [NCS_URL2] [ROUTER_URL] +# bash ops/fabric_preflight.sh # defaults: 127.0.0.1:8099, same, 127.0.0.1:9102 +set -euo pipefail + +NODA_NCS="${1:-http://127.0.0.1:8099}" +ROUTER_URL="${2:-http://127.0.0.1:9102}" + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +CYAN='\033[0;36m' +NC='\033[0m' + +pass() { echo -e " ${GREEN}PASS${NC} $1"; } +warn() { echo -e " ${YELLOW}WARN${NC} $1"; } +fail() { echo -e " ${RED}FAIL${NC} $1"; ERRORS=$((ERRORS+1)); } +info() { echo -e " ${CYAN}INFO${NC} $1"; } + +ERRORS=0 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SNAPSHOT_DIR="${SCRIPT_DIR}/preflight_snapshots" +mkdir -p "$SNAPSHOT_DIR" + +# ── NCS check ───────────────────────────────────────────────────────────────── + +check_ncs() { + local label="$1" url="$2" + echo "── $label ($url) ──" + local raw + raw=$(curl -sf "$url/capabilities" 2>/dev/null) || { fail "NCS unreachable at $url"; return; } + + local node_id served installed swapper_status + node_id=$(echo "$raw" | python3 -c "import json,sys;print(json.load(sys.stdin).get('node_id','?'))" 2>/dev/null) + served=$(echo "$raw" | python3 -c "import json,sys;print(json.load(sys.stdin).get('served_count',0))" 2>/dev/null) + installed=$(echo "$raw" | python3 -c "import json,sys;print(json.load(sys.stdin).get('installed_count',0))" 2>/dev/null) + swapper_status=$(echo "$raw" | python3 -c "import json,sys;print(json.load(sys.stdin).get('runtimes',{}).get('swapper',{}).get('status','?'))" 2>/dev/null) + + [ "$served" -gt 0 ] 2>/dev/null && pass "node=$node_id served=$served installed=$installed" \ + || fail "node=$node_id served=$served (empty pool!)" + + [ "$swapper_status" = "disabled" ] && pass "swapper=disabled" || warn "swapper=$swapper_status" + + local caps + caps=$(echo "$raw" | python3 -c " +import json,sys +c=json.load(sys.stdin).get('capabilities',{}) +parts=[f'{k}={v}' for k,v in c.items() if k!='providers'] +print(' '.join(parts) if parts else '(none — P3.5 not deployed?)') +" 2>/dev/null) + [ "$caps" = "(none — P3.5 not deployed?)" ] && warn "capabilities: $caps" || pass "capabilities: $caps" + + local mem_p inflight + mem_p=$(echo "$raw" | python3 -c "import json,sys;print(json.load(sys.stdin).get('node_load',{}).get('mem_pressure','?'))" 2>/dev/null) + inflight=$(echo "$raw" | python3 -c "import json,sys;print(json.load(sys.stdin).get('node_load',{}).get('inflight',json.load(open('/dev/stdin')).get('node_load',{}).get('inflight_jobs',0)) if False else json.load(sys.stdin).get('node_load',{}).get('inflight_jobs',0))" 2>/dev/null || echo "?") + [ "$mem_p" = "high" ] && warn "mem_pressure=$mem_p inflight=$inflight" \ + || pass "mem_pressure=$mem_p inflight=$inflight" + + local vision_count + vision_count=$(echo "$raw" | python3 -c "import json,sys;print(sum(1 for m in json.load(sys.stdin).get('served_models',[]) if m.get('type')=='vision'))" 2>/dev/null) + [ "$vision_count" -gt 0 ] && pass "vision models: $vision_count" || warn "no vision models served" + + NCS_RAW="$raw" + NCS_NODE_ID="$node_id" +} + +# ── Router check ────────────────────────────────────────────────────────────── + +check_router() { + local label="$1" url="$2" + echo "── $label ($url) ──" + local health + health=$(curl -sf "$url/health" 2>/dev/null) || { fail "Router unreachable at $url"; return; } + local status + status=$(echo "$health" | python3 -c "import json,sys;print(json.load(sys.stdin).get('status','?'))" 2>/dev/null) + [ "$status" = "ok" ] && pass "health=$status" || fail "health=$status" + + local models_total + models_total=$(curl -sf "$url/v1/models" 2>/dev/null | python3 -c "import json,sys;print(json.load(sys.stdin).get('total',0))" 2>/dev/null) || models_total=0 + [ "$models_total" -gt 0 ] && pass "global pool: $models_total models" || fail "global pool empty" + + local caps_nodes + caps_nodes=$(curl -sf "$url/v1/capabilities" 2>/dev/null | python3 -c " +import json,sys +d=json.load(sys.stdin) +nodes=list(d.get('capabilities_by_node',{}).keys()) +print(f'{len(nodes)} node(s): {\" \".join(nodes)}' if nodes else '(none)') +" 2>/dev/null) + [ "$caps_nodes" = "(none)" ] && warn "capabilities_by_node: $caps_nodes" || pass "capabilities_by_node: $caps_nodes" + + ROUTER_MODELS=$(curl -sf "$url/v1/models" 2>/dev/null || echo '{}') +} + +# ── Snapshot + diff ─────────────────────────────────────────────────────────── + +NCS_RAW="{}" +NCS_NODE_ID="unknown" +ROUTER_MODELS="{}" + +save_and_diff() { + local ts + ts=$(date +%Y-%m-%d_%H%M%S) + local snap_file="${SNAPSHOT_DIR}/${NCS_NODE_ID}_${ts}.json" + + python3 -c " +import json, glob, os, sys +from datetime import datetime + +ncs = json.loads('''$(echo "$NCS_RAW" | python3 -c "import sys;print(sys.stdin.read().replace(\"'\",\"\"))")''') if '''$NCS_RAW''' != '{}' else {} +router = json.loads('''$(echo "$ROUTER_MODELS" | python3 -c "import sys;print(sys.stdin.read().replace(\"'\",\"\"))")''') if '''$ROUTER_MODELS''' != '{}' else {} + +snapshot = { + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'node_id': ncs.get('node_id', '$NCS_NODE_ID'), + 'errors': $ERRORS, + 'passed': $ERRORS == 0, + 'served_count': ncs.get('served_count', 0), + 'installed_count': ncs.get('installed_count', 0), + 'capabilities': {k:v for k,v in ncs.get('capabilities', {}).items() if k != 'providers'}, + 'providers': ncs.get('capabilities', {}).get('providers', {}), + 'node_load': ncs.get('node_load', {}), + 'router_models_total': router.get('total', 0), + 'capabilities_by_node': router.get('capabilities_by_node', {}), +} + +with open('$snap_file', 'w') as f: + json.dump(snapshot, f, indent=2, ensure_ascii=False) + +# Find previous snapshot for diff +prefix = '${NCS_NODE_ID}'.lower() + '_' +snaps = sorted(glob.glob(os.path.join('$SNAPSHOT_DIR', prefix + '*.json')), reverse=True) +prev = None +if len(snaps) >= 2: + with open(snaps[1]) as f: + prev = json.load(f) + +if prev: + diffs = [] + for key in ('served_count', 'installed_count', 'router_models_total'): + ov, nv = prev.get(key, '?'), snapshot.get(key, '?') + if ov != nv: + diffs.append(f' {key}: {ov} → {nv}') + old_caps = prev.get('capabilities', {}) + new_caps = snapshot.get('capabilities', {}) + for k in sorted(set(list(old_caps.keys()) + list(new_caps.keys()))): + ov, nv = old_caps.get(k, '?'), new_caps.get(k, '?') + if ov != nv: + diffs.append(f' caps.{k}: {ov} → {nv}') + if diffs: + print('Changes vs previous:') + for d in diffs: + print(d) + else: + print('(no changes vs previous snapshot)') +else: + print('(first snapshot for this node)') +" 2>/dev/null || echo "(snapshot diff failed)" + + info "Snapshot: $snap_file" +} + +# ── Main ────────────────────────────────────────────────────────────────────── + +echo "╔══════════════════════════════════════╗" +echo "║ Fabric Preflight Check ║" +echo "╚══════════════════════════════════════╝" +echo "" + +check_ncs "NCS" "$NODA_NCS" +echo "" +check_router "Router" "$ROUTER_URL" +echo "" +save_and_diff +echo "" + +if [ $ERRORS -gt 0 ]; then + echo -e "${RED}Preflight FAILED: $ERRORS error(s)${NC}" + echo -e "${RED}BLOCKED: no changes allowed until all errors resolved${NC}" + exit 1 +else + echo -e "${GREEN}Preflight PASSED — changes allowed${NC}" +fi diff --git a/ops/fabric_snapshot.py b/ops/fabric_snapshot.py new file mode 100755 index 00000000..c50ef61c --- /dev/null +++ b/ops/fabric_snapshot.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +"""Fabric Snapshot — collect full runtime truth from a node. + +Queries every endpoint (NCS, node-worker, router, Ollama, docker) +and saves a single JSON artifact for preflight/postflight comparison. + +Usage: + python3 ops/fabric_snapshot.py [--node-id NODA2] [--ncs URL] [--router URL] ... + python3 ops/fabric_snapshot.py --ssh root@144.76.224.179 # remote node +""" +import argparse +import asyncio +import json +import os +import subprocess +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +SCRIPT_DIR = Path(__file__).parent +SNAPSHOT_DIR = SCRIPT_DIR / "preflight_snapshots" + +try: + import httpx +except ImportError: + print("httpx not installed; pip install httpx", file=sys.stderr) + sys.exit(1) + + +async def _get(url: str, timeout: float = 5.0) -> Dict[str, Any]: + try: + async with httpx.AsyncClient(timeout=timeout) as c: + r = await c.get(url) + if r.status_code == 200: + return {"status": "ok", "data": r.json()} + return {"status": f"http_{r.status_code}", "data": None} + except Exception as e: + return {"status": f"error:{type(e).__name__}", "data": None, "error": str(e)[:200]} + + +def _run_cmd(cmd: List[str], timeout: int = 10) -> Dict[str, Any]: + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + return {"status": "ok" if result.returncode == 0 else f"exit_{result.returncode}", + "stdout": result.stdout[:10000], "stderr": result.stderr[:2000]} + except FileNotFoundError: + return {"status": "not_found"} + except subprocess.TimeoutExpired: + return {"status": "timeout"} + except Exception as e: + return {"status": f"error:{e}"} + + +def _ssh_cmd(ssh_target: str, remote_cmd: str, timeout: int = 15) -> Dict[str, Any]: + full = ["ssh", "-o", "StrictHostKeyChecking=accept-new", + "-o", "ConnectTimeout=5", ssh_target, remote_cmd] + return _run_cmd(full, timeout=timeout) + + +async def collect_snapshot( + node_id: str, + ncs_url: str, + worker_url: str, + router_url: str, + ollama_url: str, + ssh_target: str = "", +) -> Dict[str, Any]: + snap: Dict[str, Any] = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "node_id": node_id, + "collector": "fabric_snapshot.py", + "errors": [], + } + + # --- NCS --- + ncs_caps = await _get(f"{ncs_url}/capabilities") + ncs_caps_only = await _get(f"{ncs_url}/capabilities/caps") + ncs_installed = await _get(f"{ncs_url}/capabilities/installed") + + snap["ncs"] = { + "url": ncs_url, + "capabilities_full": ncs_caps.get("data"), + "capabilities_flags": ncs_caps_only.get("data"), + "installed": ncs_installed.get("data"), + } + if ncs_caps["status"] != "ok": + snap["errors"].append(f"NCS /capabilities: {ncs_caps['status']}") + + ncs_data = ncs_caps.get("data") or {} + snap["served_models"] = ncs_data.get("served_models", []) + snap["served_count"] = ncs_data.get("served_count", 0) + snap["installed_artifacts"] = (ncs_installed.get("data") or {}).get("installed_artifacts", []) + snap["installed_count"] = (ncs_installed.get("data") or {}).get("installed_count", 0) + snap["capabilities"] = ncs_data.get("capabilities", {}) + snap["node_load"] = ncs_data.get("node_load", {}) + snap["runtimes"] = ncs_data.get("runtimes", {}) + + # --- Node Worker --- + worker_caps = await _get(f"{worker_url}/caps") + worker_health = await _get(f"{worker_url}/healthz") + worker_metrics = await _get(f"{worker_url}/metrics") + snap["worker"] = { + "url": worker_url, + "caps": worker_caps.get("data"), + "health": worker_health.get("data"), + "metrics": worker_metrics.get("data"), + } + if worker_health["status"] != "ok": + snap["errors"].append(f"Worker /healthz: {worker_health['status']}") + + # --- Router --- + router_health = await _get(f"{router_url}/health") + router_models = await _get(f"{router_url}/v1/models") + router_caps = await _get(f"{router_url}/v1/capabilities") + snap["router"] = { + "url": router_url, + "health": router_health.get("data"), + "models": router_models.get("data"), + "capabilities": router_caps.get("data"), + } + if router_health["status"] != "ok": + snap["errors"].append(f"Router /health: {router_health['status']}") + + # --- Ollama --- + ollama_tags = await _get(f"{ollama_url}/api/tags") + ollama_ps = await _get(f"{ollama_url}/api/ps") + snap["ollama"] = { + "url": ollama_url, + "tags": ollama_tags.get("data"), + "ps": ollama_ps.get("data"), + } + if ollama_tags["status"] != "ok": + snap["errors"].append(f"Ollama /api/tags: {ollama_tags['status']}") + + # --- Docker --- + if ssh_target: + docker_ps = _ssh_cmd(ssh_target, "docker ps --format '{{.Names}}\\t{{.Status}}\\t{{.Ports}}'") + else: + docker_ps = _run_cmd(["docker", "ps", "--format", "{{.Names}}\t{{.Status}}\t{{.Ports}}"]) + containers = [] + if docker_ps["status"] == "ok": + for line in docker_ps.get("stdout", "").strip().split("\n"): + parts = line.split("\t") + if len(parts) >= 2: + containers.append({ + "name": parts[0], + "status": parts[1], + "ports": parts[2] if len(parts) > 2 else "", + }) + snap["docker"] = {"containers": containers, "container_count": len(containers)} + + # --- Summary --- + snap["summary"] = { + "ncs_ok": ncs_caps["status"] == "ok", + "worker_ok": worker_health["status"] == "ok", + "router_ok": router_health["status"] == "ok", + "ollama_ok": ollama_tags["status"] == "ok", + "served_count": snap["served_count"], + "installed_count": snap["installed_count"], + "capabilities": {k: v for k, v in snap["capabilities"].items() if k != "providers"}, + "container_count": len(containers), + "error_count": len(snap["errors"]), + "passed": len(snap["errors"]) == 0, + } + + return snap + + +def save_snapshot(snap: Dict[str, Any], out_dir: Path = SNAPSHOT_DIR) -> Path: + out_dir.mkdir(parents=True, exist_ok=True) + node = snap.get("node_id", "unknown").lower() + ts = datetime.now().strftime("%Y-%m-%d_%H%M%S") + path = out_dir / f"{node}_{ts}.json" + with open(path, "w") as f: + json.dump(snap, f, indent=2, ensure_ascii=False, default=str) + return path + + +def find_previous_snapshot(node_id: str, out_dir: Path = SNAPSHOT_DIR) -> Optional[Dict]: + prefix = node_id.lower() + "_" + files = sorted( + [f for f in out_dir.glob(f"{prefix}*.json")], + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + if len(files) < 2: + return None + with open(files[1]) as f: + return json.load(f) + + +def print_diff(current: Dict, previous: Optional[Dict]): + if not previous: + print(" (no previous snapshot to compare)") + return + diffs = [] + for key in ("served_count", "installed_count"): + old = previous.get("summary", {}).get(key, previous.get(key, "?")) + new = current.get("summary", {}).get(key, current.get(key, "?")) + if old != new: + diffs.append(f" {key}: {old} → {new}") + + old_caps = previous.get("summary", {}).get("capabilities", previous.get("capabilities", {})) + new_caps = current.get("summary", {}).get("capabilities", current.get("capabilities", {})) + all_keys = set(list(old_caps.keys()) + list(new_caps.keys())) + for k in sorted(all_keys): + if k == "providers": + continue + ov, nv = old_caps.get(k, "?"), new_caps.get(k, "?") + if ov != nv: + diffs.append(f" caps.{k}: {ov} → {nv}") + + old_ct = previous.get("docker", {}).get("container_count", "?") + new_ct = current.get("docker", {}).get("container_count", "?") + if old_ct != new_ct: + diffs.append(f" containers: {old_ct} → {new_ct}") + + if diffs: + print(" Changes vs previous snapshot:") + for d in diffs: + print(d) + else: + print(" (no changes vs previous snapshot)") + + +def print_summary(snap: Dict): + s = snap.get("summary", {}) + print(f" node_id: {snap.get('node_id')}") + print(f" served: {s.get('served_count')}") + print(f" installed: {s.get('installed_count')}") + print(f" containers: {s.get('container_count')}") + caps = s.get("capabilities", {}) + cap_str = " ".join(f"{k}={'Y' if v else 'N'}" for k, v in caps.items()) + print(f" capabilities: {cap_str}") + print(f" errors: {s.get('error_count')}") + if snap.get("errors"): + for e in snap["errors"]: + print(f" ✗ {e}") + passed = s.get("passed", False) + status = "\033[32mPASSED\033[0m" if passed else "\033[31mFAILED\033[0m" + print(f" result: {status}") + + +def main(): + parser = argparse.ArgumentParser(description="Fabric Snapshot Collector") + parser.add_argument("--node-id", default="NODA2") + parser.add_argument("--ncs", default="http://127.0.0.1:8099") + parser.add_argument("--worker", default="http://127.0.0.1:8109") + parser.add_argument("--router", default="http://127.0.0.1:9102") + parser.add_argument("--ollama", default="http://127.0.0.1:11434") + parser.add_argument("--ssh", default="", help="SSH target for remote docker ps (e.g. root@1.2.3.4)") + parser.add_argument("--out-dir", default=str(SNAPSHOT_DIR)) + parser.add_argument("--json-only", action="store_true", help="Print JSON to stdout, no save") + args = parser.parse_args() + + snap = asyncio.run(collect_snapshot( + node_id=args.node_id, + ncs_url=args.ncs, + worker_url=args.worker, + router_url=args.router, + ollama_url=args.ollama, + ssh_target=args.ssh, + )) + + if args.json_only: + print(json.dumps(snap, indent=2, ensure_ascii=False, default=str)) + return + + out_dir = Path(args.out_dir) + path = save_snapshot(snap, out_dir) + print(f"╔══════════════════════════════════════╗") + print(f"║ Fabric Snapshot: {args.node_id:<18s} ║") + print(f"╚══════════════════════════════════════╝") + print() + print_summary(snap) + print() + prev = find_previous_snapshot(args.node_id, out_dir) + print_diff(snap, prev) + print() + print(f"Saved: {path}") + + if not snap.get("summary", {}).get("passed"): + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/ops/preflight_snapshots/.gitkeep b/ops/preflight_snapshots/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/services/node-capabilities/main.py b/services/node-capabilities/main.py index ee3094dd..607efae2 100644 --- a/services/node-capabilities/main.py +++ b/services/node-capabilities/main.py @@ -20,8 +20,10 @@ app = FastAPI(title="Node Capabilities Service", version="1.0.0") NODE_ID = os.getenv("NODE_ID", "noda2") OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434") -SWAPPER_URL = os.getenv("SWAPPER_URL", "") # empty = skip Swapper probing +SWAPPER_URL = os.getenv("SWAPPER_URL", "") LLAMA_SERVER_URL = os.getenv("LLAMA_SERVER_URL", "") +NODE_WORKER_URL = os.getenv("NODE_WORKER_URL", "http://node-worker:8109") +DISK_SCAN_PATHS = os.getenv("DISK_SCAN_PATHS", "") # comma-sep extra dirs _cache: Dict[str, Any] = {} _cache_ts: float = 0 @@ -129,30 +131,48 @@ async def _collect_llama_server() -> Optional[Dict[str, Any]]: return runtime +async def _collect_worker_caps() -> Dict[str, Any]: + """Fetch capability flags from local Node Worker.""" + default = {"capabilities": {}, "providers": {}, "defaults": {}} + try: + async with httpx.AsyncClient(timeout=3) as c: + r = await c.get(f"{NODE_WORKER_URL}/caps") + if r.status_code == 200: + return r.json() + except Exception as e: + logger.debug(f"Worker caps unavailable: {e}") + return default + + def _collect_disk_inventory() -> List[Dict[str, Any]]: """Scan known model directories — NOT for routing, only inventory.""" import pathlib inventory: List[Dict[str, Any]] = [] scan_dirs = [ - ("cursor_worktrees", pathlib.Path.home() / ".cursor" / "worktrees"), - ("jan_ai", pathlib.Path.home() / "Library" / "Application Support" / "Jan"), + ("ollama", pathlib.Path.home() / ".ollama" / "models"), ("hf_cache", pathlib.Path.home() / ".cache" / "huggingface" / "hub"), ("comfyui_main", pathlib.Path.home() / "ComfyUI" / "models"), ("comfyui_docs", pathlib.Path.home() / "Documents" / "ComfyUI" / "models"), ("llama_cpp", pathlib.Path.home() / "Library" / "Application Support" / "llama.cpp" / "models"), ("hf_models", pathlib.Path.home() / "hf_models"), + ("jan_ai", pathlib.Path.home() / "Library" / "Application Support" / "Jan"), ] + if DISK_SCAN_PATHS: + for p in DISK_SCAN_PATHS.split(","): + p = p.strip() + if p: + scan_dirs.append(("custom", pathlib.Path(p))) for source, base in scan_dirs: if not base.exists(): continue try: for f in base.rglob("*"): - if f.suffix in (".gguf", ".safetensors", ".bin", ".pt") and f.stat().st_size > 100_000_000: + if f.suffix in (".gguf", ".safetensors", ".bin", ".pt", ".mlx") and f.stat().st_size > 50_000_000: inventory.append({ "name": f.stem, - "path": str(f.relative_to(pathlib.Path.home())), + "path": str(f), "source": source, "size_gb": round(f.stat().st_size / 1e9, 1), "type": _classify_model(f.stem), @@ -191,6 +211,24 @@ def _build_served_models(ollama: Dict, swapper: Dict, llama: Optional[Dict]) -> return served +def _derive_capabilities(served: List[Dict], worker_caps: Dict) -> Dict[str, Any]: + """Merge served model types + worker provider flags into capability map.""" + served_types = {m.get("type", "llm") for m in served} + wc = worker_caps.get("capabilities", {}) + wp = worker_caps.get("providers", {}) + + has_vision = "vision" in served_types or wc.get("vision", False) + return { + "llm": "llm" in served_types or "code" in served_types, + "vision": has_vision, + "stt": wc.get("stt", False), + "tts": wc.get("tts", False), + "ocr": has_vision and wp.get("ocr", "none") != "none", + "image": wc.get("image", False), + "providers": wp, + } + + async def _build_capabilities() -> Dict[str, Any]: global _cache, _cache_ts @@ -200,6 +238,7 @@ async def _build_capabilities() -> Dict[str, Any]: ollama = await _collect_ollama() swapper = await _collect_swapper() llama = await _collect_llama_server() + worker_caps = await _collect_worker_caps() disk = _collect_disk_inventory() served = _build_served_models(ollama, swapper, llama) @@ -209,6 +248,7 @@ async def _build_capabilities() -> Dict[str, Any]: node_load = await build_node_load() runtime_load = await build_runtime_load(runtimes) + capabilities = _derive_capabilities(served, worker_caps) result = { "node_id": NODE_ID, @@ -216,10 +256,12 @@ async def _build_capabilities() -> Dict[str, Any]: "runtimes": runtimes, "served_models": served, "served_count": len(served), + "capabilities": capabilities, "node_load": node_load, "runtime_load": runtime_load, - "inventory_only": disk, - "inventory_count": len(disk), + "installed_artifacts": disk, + "installed_count": len(disk), + "worker": worker_caps, } _cache = result @@ -245,6 +287,26 @@ async def capabilities_models(): return JSONResponse(content={"node_id": data["node_id"], "served_models": data["served_models"]}) +@app.get("/capabilities/caps") +async def capabilities_caps(): + data = await _build_capabilities() + return JSONResponse(content={ + "node_id": data["node_id"], + "capabilities": data.get("capabilities", {}), + "worker": data.get("worker", {}), + }) + + +@app.get("/capabilities/installed") +async def capabilities_installed(): + data = await _build_capabilities() + return JSONResponse(content={ + "node_id": data["node_id"], + "installed_artifacts": data.get("installed_artifacts", []), + "installed_count": data.get("installed_count", 0), + }) + + @app.post("/capabilities/refresh") async def capabilities_refresh(): global _cache_ts diff --git a/services/node-worker/config.py b/services/node-worker/config.py index cba9137f..e4f9163c 100644 --- a/services/node-worker/config.py +++ b/services/node-worker/config.py @@ -9,3 +9,8 @@ DEFAULT_VISION = os.getenv("NODE_DEFAULT_VISION", "llava:13b") MAX_CONCURRENCY = int(os.getenv("NODE_WORKER_MAX_CONCURRENCY", "2")) MAX_PAYLOAD_BYTES = int(os.getenv("NODE_WORKER_MAX_PAYLOAD_BYTES", str(1024 * 1024))) PORT = int(os.getenv("PORT", "8109")) + +STT_PROVIDER = os.getenv("STT_PROVIDER", "none") +TTS_PROVIDER = os.getenv("TTS_PROVIDER", "none") +OCR_PROVIDER = os.getenv("OCR_PROVIDER", "vision_prompted") +IMAGE_PROVIDER = os.getenv("IMAGE_PROVIDER", "none") diff --git a/services/node-worker/main.py b/services/node-worker/main.py index 5abb099c..634c0a8e 100644 --- a/services/node-worker/main.py +++ b/services/node-worker/main.py @@ -41,6 +41,33 @@ async def prom_metrics(): return {"error": "prometheus_client not installed"} +@app.get("/caps") +async def caps(): + """Capability flags for NCS to aggregate.""" + return { + "node_id": config.NODE_ID, + "capabilities": { + "llm": True, + "vision": True, + "stt": config.STT_PROVIDER != "none", + "tts": config.TTS_PROVIDER != "none", + "ocr": config.OCR_PROVIDER != "none", + "image": config.IMAGE_PROVIDER != "none", + }, + "providers": { + "stt": config.STT_PROVIDER, + "tts": config.TTS_PROVIDER, + "ocr": config.OCR_PROVIDER, + "image": config.IMAGE_PROVIDER, + }, + "defaults": { + "llm": config.DEFAULT_LLM, + "vision": config.DEFAULT_VISION, + }, + "concurrency": config.MAX_CONCURRENCY, + } + + @app.on_event("startup") async def startup(): global _nats_client diff --git a/services/node-worker/models.py b/services/node-worker/models.py index b24ce36f..03149518 100644 --- a/services/node-worker/models.py +++ b/services/node-worker/models.py @@ -14,7 +14,7 @@ class JobRequest(BaseModel): trace_id: str = "" actor_agent_id: str = "" target_agent_id: str = "" - required_type: Literal["llm", "vision", "stt", "tts"] = "llm" + required_type: Literal["llm", "vision", "stt", "tts", "image", "ocr"] = "llm" deadline_ts: int = 0 idempotency_key: str = "" payload: Dict[str, Any] = Field(default_factory=dict) diff --git a/services/node-worker/providers/stt_mlx_whisper.py b/services/node-worker/providers/stt_mlx_whisper.py new file mode 100644 index 00000000..0fe9b773 --- /dev/null +++ b/services/node-worker/providers/stt_mlx_whisper.py @@ -0,0 +1,135 @@ +"""MLX Whisper STT provider — transcribes audio via mlx-whisper on host. + +Runs inside Docker; delegates to MLX Whisper HTTP API on the host. +If MLX_WHISPER_URL is not set, falls back to running mlx_whisper directly +(only works when node-worker runs natively, not in Docker). +""" +import base64 +import logging +import os +import tempfile +from typing import Any, Dict, Optional + +import httpx + +logger = logging.getLogger("provider.stt_mlx_whisper") + +MLX_WHISPER_URL = os.getenv("MLX_WHISPER_URL", "") +MLX_WHISPER_MODEL = os.getenv("MLX_WHISPER_MODEL", "mlx-community/whisper-large-v3-turbo") +MAX_AUDIO_BYTES = int(os.getenv("STT_MAX_AUDIO_BYTES", str(25 * 1024 * 1024))) # 25MB + +_local_model = None +_local_lock = None + + +def _lazy_init_local(): + """Lazy-load mlx_whisper for native (non-Docker) execution.""" + global _local_model, _local_lock + if _local_lock is not None: + return + import asyncio + _local_lock = asyncio.Lock() + + +async def _transcribe_local(audio_path: str, language: Optional[str]) -> Dict[str, Any]: + """Transcribe using local mlx_whisper (Apple Silicon only).""" + _lazy_init_local() + async with _local_lock: + import mlx_whisper + kwargs: Dict[str, Any] = {"path_or_hf_repo": MLX_WHISPER_MODEL} + if language: + kwargs["language"] = language + result = mlx_whisper.transcribe(audio_path, **kwargs) + segments = [] + for seg in result.get("segments", []): + segments.append({ + "start": seg.get("start", 0), + "end": seg.get("end", 0), + "text": seg.get("text", ""), + }) + return { + "text": result.get("text", ""), + "segments": segments, + "language": result.get("language", ""), + } + + +async def _transcribe_remote(audio_b64: str, language: Optional[str]) -> Dict[str, Any]: + """Transcribe via MLX Whisper HTTP service on host.""" + payload: Dict[str, Any] = {"audio_b64": audio_b64} + if language: + payload["language"] = language + + async with httpx.AsyncClient(timeout=120) as c: + resp = await c.post(f"{MLX_WHISPER_URL}/transcribe", json=payload) + resp.raise_for_status() + return resp.json() + + +async def _resolve_audio(payload: Dict[str, Any]) -> tuple: + """Return (audio_bytes, audio_b64) from payload.""" + audio_b64 = payload.get("audio_b64", "") + audio_url = payload.get("audio_url", "") + + if audio_b64: + raw = base64.b64decode(audio_b64) + if len(raw) > MAX_AUDIO_BYTES: + raise ValueError(f"Audio exceeds {MAX_AUDIO_BYTES} bytes") + return raw, audio_b64 + + if audio_url: + if audio_url.startswith(("file://", "/")): + path = audio_url.replace("file://", "") + with open(path, "rb") as f: + raw = f.read() + if len(raw) > MAX_AUDIO_BYTES: + raise ValueError(f"Audio exceeds {MAX_AUDIO_BYTES} bytes") + return raw, base64.b64encode(raw).decode() + + async with httpx.AsyncClient(timeout=30) as c: + resp = await c.get(audio_url) + resp.raise_for_status() + raw = resp.content + if len(raw) > MAX_AUDIO_BYTES: + raise ValueError(f"Audio exceeds {MAX_AUDIO_BYTES} bytes") + return raw, base64.b64encode(raw).decode() + + raise ValueError("Either audio_b64 or audio_url is required") + + +async def transcribe(payload: Dict[str, Any]) -> Dict[str, Any]: + """Canonical STT entry point. + + Payload: + audio_url: str (http/file) — OR — + audio_b64: str (base64 encoded) + language: str (optional, e.g. "uk", "en") + format: "text" | "segments" | "json" + """ + language = payload.get("language") + fmt = payload.get("format", "json") + + audio_bytes, audio_b64 = await _resolve_audio(payload) + + if MLX_WHISPER_URL: + result = await _transcribe_remote(audio_b64, language) + else: + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: + tmp.write(audio_bytes) + tmp_path = tmp.name + try: + result = await _transcribe_local(tmp_path, language) + finally: + os.unlink(tmp_path) + + meta = { + "model": MLX_WHISPER_MODEL, + "provider": "mlx_whisper", + "device": "apple_silicon", + } + + if fmt == "text": + return {"text": result.get("text", ""), "meta": meta, "provider": "mlx_whisper", "model": MLX_WHISPER_MODEL} + if fmt == "segments": + return {"text": result.get("text", ""), "segments": result.get("segments", []), "meta": meta, "provider": "mlx_whisper", "model": MLX_WHISPER_MODEL} + return {**result, "meta": meta, "provider": "mlx_whisper", "model": MLX_WHISPER_MODEL} diff --git a/services/node-worker/providers/tts_mlx_kokoro.py b/services/node-worker/providers/tts_mlx_kokoro.py new file mode 100644 index 00000000..14b2f5d4 --- /dev/null +++ b/services/node-worker/providers/tts_mlx_kokoro.py @@ -0,0 +1,123 @@ +"""MLX Kokoro TTS provider — generates speech via kokoro on host. + +Runs inside Docker; delegates to Kokoro HTTP API on the host. +Falls back to local kokoro-onnx if running natively on Apple Silicon. +""" +import base64 +import logging +import os +import tempfile +from typing import Any, Dict + +import httpx + +logger = logging.getLogger("provider.tts_mlx_kokoro") + +MLX_KOKORO_URL = os.getenv("MLX_KOKORO_URL", "") +MLX_KOKORO_MODEL = os.getenv("MLX_KOKORO_MODEL", "kokoro-v1.0") +DEFAULT_VOICE = os.getenv("TTS_DEFAULT_VOICE", "af_heart") +MAX_TEXT_CHARS = int(os.getenv("TTS_MAX_TEXT_CHARS", "5000")) +DEFAULT_SAMPLE_RATE = int(os.getenv("TTS_SAMPLE_RATE", "24000")) + +_local_pipeline = None +_local_lock = None + + +def _lazy_init_local(): + global _local_lock + if _local_lock is not None: + return + import asyncio + _local_lock = asyncio.Lock() + + +async def _synthesize_local(text: str, voice: str, sample_rate: int) -> bytes: + """Synthesize via local kokoro (Apple Silicon).""" + _lazy_init_local() + global _local_pipeline + async with _local_lock: + if _local_pipeline is None: + from kokoro import KPipeline + _local_pipeline = KPipeline(lang_code="a") + logger.info(f"Kokoro pipeline initialized: voice={voice}") + + import soundfile as sf + import io + + generator = _local_pipeline(text, voice=voice) + all_audio = [] + for _, _, audio in generator: + all_audio.append(audio) + + if not all_audio: + raise RuntimeError("Kokoro produced no audio") + + import numpy as np + combined = np.concatenate(all_audio) + + buf = io.BytesIO() + sf.write(buf, combined, sample_rate, format="WAV") + return buf.getvalue() + + +async def _synthesize_remote(text: str, voice: str, fmt: str, sample_rate: int) -> Dict[str, Any]: + """Synthesize via Kokoro HTTP service on host.""" + payload = { + "text": text, + "voice": voice, + "format": fmt, + "sample_rate": sample_rate, + } + async with httpx.AsyncClient(timeout=120) as c: + resp = await c.post(f"{MLX_KOKORO_URL}/synthesize", json=payload) + resp.raise_for_status() + return resp.json() + + +async def synthesize(payload: Dict[str, Any]) -> Dict[str, Any]: + """Canonical TTS entry point. + + Payload: + text: str (required) + voice: str (optional, default "af_heart") + format: "wav" | "mp3" (default "wav") + sample_rate: int (default 24000) + """ + text = payload.get("text", "") + if not text: + raise ValueError("text is required") + if len(text) > MAX_TEXT_CHARS: + raise ValueError(f"Text exceeds {MAX_TEXT_CHARS} chars") + + voice = payload.get("voice", DEFAULT_VOICE) + fmt = payload.get("format", "wav") + sample_rate = payload.get("sample_rate", DEFAULT_SAMPLE_RATE) + + meta = { + "model": MLX_KOKORO_MODEL, + "provider": "mlx_kokoro", + "voice": voice, + "device": "apple_silicon", + } + + if MLX_KOKORO_URL: + result = await _synthesize_remote(text, voice, fmt, sample_rate) + return { + "audio_b64": result.get("audio_b64", ""), + "audio_url": result.get("audio_url", ""), + "format": fmt, + "meta": meta, + "provider": "mlx_kokoro", + "model": MLX_KOKORO_MODEL, + } + + wav_bytes = await _synthesize_local(text, voice, sample_rate) + audio_b64 = base64.b64encode(wav_bytes).decode() + + return { + "audio_b64": audio_b64, + "format": "wav", + "meta": meta, + "provider": "mlx_kokoro", + "model": MLX_KOKORO_MODEL, + } diff --git a/services/node-worker/worker.py b/services/node-worker/worker.py index 13697fb3..b9d68f95 100644 --- a/services/node-worker/worker.py +++ b/services/node-worker/worker.py @@ -10,6 +10,7 @@ import config from models import JobRequest, JobResponse, JobError from idempotency import IdempotencyStore from providers import ollama, ollama_vision +from providers import stt_mlx_whisper, tts_mlx_kokoro import fabric_metrics as fm logger = logging.getLogger("node-worker") @@ -34,6 +35,7 @@ async def start(nats_client): f"node.{nid}.stt.request", f"node.{nid}.tts.request", f"node.{nid}.image.request", + f"node.{nid}.ocr.request", ] for subj in subjects: await nats_client.subscribe(subj, cb=_handle_request) @@ -175,14 +177,52 @@ async def _execute(job: JobRequest, remaining_ms: int) -> JobResponse: ), timeout=timeout_s, ) - elif job.required_type in ("stt", "tts", "image"): + elif job.required_type == "stt": + if config.STT_PROVIDER == "none": + return JobResponse( + job_id=job.job_id, trace_id=job.trace_id, node_id=config.NODE_ID, + status="error", + error=JobError(code="NOT_AVAILABLE", message="STT not configured on this node"), + ) + result = await asyncio.wait_for( + stt_mlx_whisper.transcribe(payload), timeout=timeout_s, + ) + elif job.required_type == "tts": + if config.TTS_PROVIDER == "none": + return JobResponse( + job_id=job.job_id, trace_id=job.trace_id, node_id=config.NODE_ID, + status="error", + error=JobError(code="NOT_AVAILABLE", message="TTS not configured on this node"), + ) + result = await asyncio.wait_for( + tts_mlx_kokoro.synthesize(payload), timeout=timeout_s, + ) + elif job.required_type == "ocr": + if config.OCR_PROVIDER == "none": + return JobResponse( + job_id=job.job_id, trace_id=job.trace_id, node_id=config.NODE_ID, + status="error", + error=JobError(code="NOT_AVAILABLE", message="OCR not configured on this node"), + ) + ocr_prompt = payload.get("prompt", "Extract all text from this image. Return JSON: {\"text\": \"...\", \"language\": \"...\"}") + result = await asyncio.wait_for( + ollama_vision.infer( + images=payload.get("images"), + prompt=ocr_prompt, + model=model or config.DEFAULT_VISION, + system="You are an OCR engine. Extract text precisely. Return valid JSON only.", + max_tokens=hints.get("max_tokens", 4096), + temperature=0.05, + timeout_s=timeout_s, + ), + timeout=timeout_s, + ) + result["provider"] = "vision_prompted_ocr" + elif job.required_type == "image": return JobResponse( job_id=job.job_id, trace_id=job.trace_id, node_id=config.NODE_ID, status="error", - error=JobError( - code="NOT_YET_IMPLEMENTED", - message=f"{job.required_type} adapter coming soon; use direct runtime API for now", - ), + error=JobError(code="NOT_YET_IMPLEMENTED", message="Image adapter pending P3.7"), ) else: return JobResponse( diff --git a/services/router/global_capabilities_client.py b/services/router/global_capabilities_client.py index 10281d95..3708421e 100644 --- a/services/router/global_capabilities_client.py +++ b/services/router/global_capabilities_client.py @@ -100,8 +100,8 @@ async def _discover_remote_nodes() -> List[Dict[str, Any]]: sub = await _nats_client.subscribe(inbox) try: - await _nats_client.publish_request( - "node.*.capabilities.get", inbox, b"" + await _nats_client.publish( + CAPS_DISCOVERY_SUBJECT, b"", reply=inbox, ) await _nats_client.flush() @@ -183,6 +183,7 @@ async def get_global_capabilities(force: bool = False) -> Dict[str, Any]: def _build_global_view() -> Dict[str, Any]: """Build a unified view from all cached node capabilities.""" all_served: List[Dict[str, Any]] = [] + global_caps: Dict[str, Dict[str, Any]] = {} for node_id, caps in _node_cache.items(): is_local = (node_id.lower() == LOCAL_NODE_ID.lower()) @@ -194,16 +195,27 @@ def _build_global_view() -> Dict[str, Any]: "local": is_local, "node_age_s": round(age, 1), }) + node_caps = caps.get("capabilities", {}) + if node_caps: + global_caps[node_id] = { + k: v for k, v in node_caps.items() if k != "providers" + } all_served.sort(key=lambda m: (0 if m.get("local") else 1, m.get("name", ""))) return { "local_node": LOCAL_NODE_ID, - "nodes": {nid: {"node_id": nid, "served_count": len(c.get("served_models", [])), - "age_s": round(time.time() - _node_timestamps.get(nid, 0), 1)} - for nid, c in _node_cache.items()}, + "nodes": {nid: { + "node_id": nid, + "served_count": len(c.get("served_models", [])), + "installed_count": c.get("installed_count", 0), + "capabilities": c.get("capabilities", {}), + "node_load": c.get("node_load", {}), + "age_s": round(time.time() - _node_timestamps.get(nid, 0), 1), + } for nid, c in _node_cache.items()}, "served_models": all_served, "served_count": len(all_served), + "capabilities_by_node": global_caps, "node_count": len(_node_cache), "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } @@ -214,6 +226,44 @@ def get_cached_global() -> Dict[str, Any]: return _build_global_view() +async def require_fresh_caps(ttl: int = 30) -> Optional[Dict[str, Any]]: + """Preflight: return global caps only if fresh enough. + + Returns None if NCS data is stale beyond ttl — caller should use + safe fallback instead of making routing decisions on outdated info. + """ + if not _node_timestamps: + gcaps = await get_global_capabilities(force=True) + if not _node_timestamps: + return None + return gcaps + oldest = min(_node_timestamps.values()) + if (time.time() - oldest) > ttl: + gcaps = await get_global_capabilities(force=True) + oldest = min(_node_timestamps.values()) if _node_timestamps else 0 + if (time.time() - oldest) > ttl: + logger.warning("[preflight] caps stale after refresh, age=%ds", int(time.time() - oldest)) + return None + return gcaps + return _build_global_view() + + +def find_nodes_with_capability(cap: str) -> List[str]: + """Return node IDs that have a given capability enabled.""" + result = [] + for nid, caps in _node_cache.items(): + node_caps = caps.get("capabilities", {}) + if node_caps.get(cap, False): + result.append(nid) + return result + + +def get_node_load(node_id: str) -> Dict[str, Any]: + """Get cached node_load for a specific node.""" + caps = _node_cache.get(node_id, {}) + return caps.get("node_load", {}) + + async def send_offload_request( node_id: str, request_type: str, diff --git a/services/router/main.py b/services/router/main.py index c131c8bd..b617e90a 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -1,5 +1,5 @@ from fastapi import FastAPI, HTTPException, Request -from fastapi.responses import Response +from fastapi.responses import JSONResponse, Response from pydantic import BaseModel, ConfigDict from typing import Literal, Optional, Dict, Any, List import asyncio @@ -3542,6 +3542,7 @@ async def documents_versions(doc_id: str, agent_id: str, limit: int = 20): async def list_available_models(): """List all available models from NCS (global capabilities pool).""" models = [] + caps_by_node = {} try: from global_capabilities_client import get_global_capabilities @@ -3555,6 +3556,7 @@ async def list_available_models(): "size_gb": m.get("size_gb"), "status": "served", }) + caps_by_node = pool.get("capabilities_by_node", {}) except Exception as e: logger.warning(f"Cannot get NCS global models: {e}") @@ -3572,7 +3574,110 @@ async def list_available_models(): except Exception as e: logger.warning(f"Cannot get Ollama models: {e}") - return {"models": models, "total": len(models)} + return { + "models": models, + "total": len(models), + "capabilities_by_node": caps_by_node, + } + + +# ── Capability-based offload routing ──────────────────────────────────────── + +@app.post("/v1/capability/{cap_type}") +async def capability_offload(cap_type: str, request: Request): + """Route a capability request (stt/tts/ocr/image) to the best node. + + Router selects the node based on capabilities_by_node, circuit breaker, + and node_load — no static assumptions about which node has what. + """ + valid_types = {"stt", "tts", "ocr", "image"} + if cap_type not in valid_types: + return JSONResponse(status_code=400, content={ + "error": f"Invalid capability type: {cap_type}. Valid: {sorted(valid_types)}", + }) + + if not NCS_AVAILABLE or not global_capabilities_client: + return JSONResponse(status_code=503, content={ + "error": "NCS not available — cannot route capability requests", + }) + + gcaps = await global_capabilities_client.require_fresh_caps(ttl=30) + if gcaps is None: + return JSONResponse(status_code=503, content={ + "error": "NCS caps stale — preflight failed, refusing to route", + }) + + eligible_nodes = global_capabilities_client.find_nodes_with_capability(cap_type) + if not eligible_nodes: + return JSONResponse(status_code=404, content={ + "error": f"No node with capability '{cap_type}' available", + "capabilities_by_node": gcaps.get("capabilities_by_node", {}), + }) + + unavailable = offload_client.get_unavailable_nodes(cap_type) if offload_client else set() + available = [n for n in eligible_nodes if n.lower() not in {u.lower() for u in unavailable}] + if not available: + return JSONResponse(status_code=503, content={ + "error": f"All nodes with '{cap_type}' are circuit-broken", + "eligible": eligible_nodes, + "unavailable": list(unavailable), + }) + + best_node = available[0] + if len(available) > 1: + loads = [] + for nid in available: + nl = global_capabilities_client.get_node_load(nid) + score = nl.get("inflight", 0) * 10 + if nl.get("mem_pressure") == "high": + score += 100 + loads.append((score, nid)) + loads.sort() + best_node = loads[0][1] + + payload = await request.json() + logger.info(f"[cap.offload] type={cap_type} → node={best_node} (of {available})") + + nats_ok = nc is not None and nats_available + if nats_ok and offload_client: + import uuid as _uuid + job = { + "job_id": str(_uuid.uuid4()), + "required_type": cap_type, + "payload": payload, + "deadline_ts": int(time.time() * 1000) + 60000, + "hints": payload.pop("hints", {}), + } + result = await offload_client.offload_infer( + nats_client=nc, node_id=best_node, required_type=cap_type, + job_payload=job, timeout_ms=60000, + ) + if result and result.get("status") == "ok": + return JSONResponse(content=result.get("result", result)) + error = result.get("error", {}) if result else {} + return JSONResponse(status_code=502, content={ + "error": error.get("message", f"Offload to {best_node} failed"), + "code": error.get("code", "OFFLOAD_FAILED"), + "node": best_node, + }) + + return JSONResponse(status_code=503, content={ + "error": "NATS not connected — cannot offload", + }) + + +@app.get("/v1/capabilities") +async def list_global_capabilities(): + """Return full capabilities view across all nodes.""" + if not NCS_AVAILABLE or not global_capabilities_client: + return JSONResponse(status_code=503, content={"error": "NCS not available"}) + gcaps = await global_capabilities_client.get_global_capabilities() + return JSONResponse(content={ + "node_count": gcaps.get("node_count", 0), + "nodes": gcaps.get("nodes", {}), + "capabilities_by_node": gcaps.get("capabilities_by_node", {}), + "served_count": gcaps.get("served_count", 0), + }) @app.get("/v1/agromatrix/shared-memory/pending") diff --git a/services/router/offload_client.py b/services/router/offload_client.py index 906ae2e2..78f9da6c 100644 --- a/services/router/offload_client.py +++ b/services/router/offload_client.py @@ -81,7 +81,7 @@ def get_unavailable_nodes(req_type: str) -> Set[str]: async def offload_infer( nats_client, node_id: str, - required_type: Literal["llm", "vision", "stt", "tts"], + required_type: Literal["llm", "vision", "stt", "tts", "ocr", "image"], job_payload: Dict[str, Any], timeout_ms: int = 25000, ) -> Optional[Dict[str, Any]]: diff --git a/tests/test_fabric_contract.py b/tests/test_fabric_contract.py new file mode 100644 index 00000000..9ffec8bb --- /dev/null +++ b/tests/test_fabric_contract.py @@ -0,0 +1,137 @@ +"""CI checks enforcing Fabric Dev Contract v0.1. + +No hardcoded models in swapper configs. +No silent cloud fallbacks without WARNING. +No hardcoded Docker bridge IPs (unless explicitly allowed). +""" +import os +import re +from pathlib import Path + +REPO_ROOT = Path(__file__).parent.parent + +ALLOWED_BRIDGE_IPS = { + "docker-compose.node1.yml", # NODA1 Ollama uses 172.18.0.1 + "docker-compose.staging.yml", # legacy staging config +} + +ALLOWED_BRIDGE_ROUTER_CONFIGS = { + "services/router/router-config.yml", # NODA1 config, Ollama on bridge + "services/router/router-config.node2.yml", # NODA2 config, Ollama on bridge + "router-config.yml", # root-level copy +} + +SWAPPER_CONFIG_GLOBS = [ + "services/swapper-service/config/*.yaml", + "services/swapper-service/config/*.yml", +] + +ROUTER_CONFIG_GLOBS = [ + "services/router/router-config*.yml", + "router-config*.yml", +] + + +def _read(path: Path) -> str: + try: + return path.read_text(encoding="utf-8", errors="replace") + except Exception: + return "" + + +def test_no_hardcoded_models_in_swapper(): + """Swapper configs must not contain models: blocks (served_models come from NCS).""" + for pattern in SWAPPER_CONFIG_GLOBS: + for f in REPO_ROOT.glob(pattern): + content = _read(f) + models_block = re.search(r"^models:\s*$", content, re.MULTILINE) + assert models_block is None, ( + f"BLOCKED: {f.relative_to(REPO_ROOT)} contains 'models:' block. " + f"Served models must come from NCS, not static config." + ) + + +def test_no_hardcoded_bridge_ips_in_router_config(): + """Router configs should not contain 172.17.0.1 or 172.18.0.1 Docker bridge IPs.""" + bridge_pattern = re.compile(r"172\.(17|18)\.0\.1") + for pattern in ROUTER_CONFIG_GLOBS: + for f in REPO_ROOT.glob(pattern): + rel = str(f.relative_to(REPO_ROOT)) + if rel in ALLOWED_BRIDGE_ROUTER_CONFIGS: + continue + content = _read(f) + matches = bridge_pattern.findall(content) + if matches: + assert False, ( + f"BLOCKED: {rel} contains Docker bridge IP(s). " + f"Use service names or host.docker.internal instead." + ) + + +def test_no_bridge_ips_in_compose_unless_allowed(): + """Docker compose files should not contain bridge IPs except in allowed files.""" + bridge_pattern = re.compile(r"172\.(17|18)\.0\.1") + for f in REPO_ROOT.glob("docker-compose*.yml"): + if f.name in ALLOWED_BRIDGE_IPS: + continue + content = _read(f) + matches = bridge_pattern.findall(content) + if matches: + assert False, ( + f"WARNING: {f.name} contains Docker bridge IP(s). " + f"Add to ALLOWED_BRIDGE_IPS if intentional, or use service names." + ) + + +def test_no_silent_cloud_fallback(): + """Router main.py must log WARNING before any cloud fallback. + + Exemptions: cloud_providers declarations (explicit, not hidden) + and config/mapping dictionaries. + """ + router_main = REPO_ROOT / "services" / "router" / "main.py" + if not router_main.exists(): + return + content = _read(router_main) + + cloud_keywords = ["deepseek-chat", "deepseek-coder", "gpt-4", "gpt-3.5", "claude"] + exempt_contexts = [ + "cloud_providers", # explicit cloud provider list + "CLOUD_MODELS", "cloud_model_map", "model_config", + "llm_profile.get", # default model from profile config + "profile.get", + ] + for kw in cloud_keywords: + positions = [m.start() for m in re.finditer(re.escape(kw), content)] + for pos in positions: + context_start = max(0, pos - 500) + context = content[context_start:pos + len(kw) + 200] + is_exempt = any(ex in context for ex in exempt_contexts) + if is_exempt: + continue + has_warning = any(w in context.lower() for w in [ + "logger.warning", "logger.warn", "log.warning", "log.warn", + "WARNING", "fallback", + ]) + if not has_warning: + line_no = content[:pos].count("\n") + 1 + assert False, ( + f"BLOCKED: Router main.py line ~{line_no} references '{kw}' " + f"without WARNING/fallback log nearby. " + f"Silent cloud fallback violates Fabric Contract §6." + ) + + +def test_preflight_snapshots_dir_exists(): + """ops/preflight_snapshots/ must exist.""" + snap_dir = REPO_ROOT / "ops" / "preflight_snapshots" + assert snap_dir.exists(), ( + "ops/preflight_snapshots/ directory missing. " + "Create it: mkdir -p ops/preflight_snapshots" + ) + + +def test_fabric_contract_doc_exists(): + """docs/fabric_contract.md must exist.""" + contract = REPO_ROOT / "docs" / "fabric_contract.md" + assert contract.exists(), "docs/fabric_contract.md missing — Fabric Contract not documented"