diff --git a/config/nats-server.conf b/config/nats-server.conf new file mode 100644 index 00000000..600b2e18 --- /dev/null +++ b/config/nats-server.conf @@ -0,0 +1,116 @@ +# NATS Server config — Fabric v0.3 with accounts +# Hub node (NODA1). Leafnodes connect to this. + +listen: 0.0.0.0:4222 +jetstream { + store_dir: /data/jetstream + max_mem: 256MB + max_file: 2GB +} + +http_port: 8222 + +# ── Accounts ──────────────────────────────────────────────────────────────── + +accounts { + SYS { + users: [ + { user: sys, password: "$SYS_NATS_PASS" } + ] + } + + FABRIC { + users: [ + # Router — publishes capability queries + offload requests + { + user: router + password: "$FABRIC_NATS_PASS" + permissions: { + publish: { + allow: [ + "node.*.capabilities.get", + "node.*.llm.request", + "node.*.vision.request", + "node.*.stt.request", + "node.*.tts.request", + "_INBOX.>" + ] + } + subscribe: { + allow: ["_INBOX.>"] + } + } + } + # NCS — responds to capability queries + { + user: ncs + password: "$FABRIC_NATS_PASS" + permissions: { + publish: { + allow: ["_INBOX.>"] + } + subscribe: { + allow: [ + "node.*.capabilities.get", + "node.*.capabilities.report" + ] + } + } + } + # Node Worker — responds to inference requests + { + user: node_worker + password: "$FABRIC_NATS_PASS" + permissions: { + publish: { + allow: [ + "_INBOX.>", + "node.*.capabilities.report" + ] + } + subscribe: { + allow: [ + "node.*.llm.request", + "node.*.vision.request", + "node.*.stt.request", + "node.*.tts.request" + ] + } + } + } + ] + exports: [ + { stream: ">" } + ] + } + + APP { + users: [ + # Gateway, other services + { + user: app + password: "$APP_NATS_PASS" + permissions: { + publish: { allow: [">"] } + subscribe: { allow: [">"] } + } + } + ] + imports: [ + { stream: { account: FABRIC, subject: ">" } } + ] + } +} + +system_account: SYS + +# ── Leafnode listener ─────────────────────────────────────────────────────── + +leafnodes { + listen: 0.0.0.0:7422 + authorization { + user: leaf + password: "$LEAF_NATS_PASS" + account: FABRIC + } +} diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml index 6e83c44e..2a3a1b19 100644 --- a/docker-compose.node1.yml +++ b/docker-compose.node1.yml @@ -13,7 +13,7 @@ services: - NATS_URL=nats://nats:4222 - ROUTER_CONFIG_PATH=/app/router_config.yaml - LOG_LEVEL=info - - NODE_ID=node-1-hetzner-gex44 + - NODE_ID=noda1 - MEMORY_SERVICE_URL=http://memory-service:8000 # Timeout policy: Gateway (180s) > Router (60s) > LLM (30s) - ROUTER_TIMEOUT=180 @@ -503,6 +503,7 @@ services: - CACHE_TTL_SEC=15 - ENABLE_NATS_CAPS=true - NATS_URL=nats://nats:4222 + - NODE_WORKER_URL=http://node-worker:8109 extra_hosts: - "host.docker.internal:host-gateway" depends_on: @@ -513,6 +514,32 @@ services: - node-capabilities restart: unless-stopped + # Node Worker — NATS offload executor + node-worker: + build: + context: ./services/node-worker + dockerfile: Dockerfile + container_name: node-worker-node1 + ports: + - "127.0.0.1:8109:8109" + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + - NODE_ID=noda1 + - NATS_URL=nats://nats:4222 + - OLLAMA_BASE_URL=http://host.docker.internal:11434 + - SWAPPER_URL=http://swapper-service:8890 + - NODE_DEFAULT_LLM=qwen3.5:27b + - NODE_DEFAULT_VISION=qwen3-vl-8b + - NODE_WORKER_MAX_CONCURRENCY=2 + - NCS_REPORT_URL=http://node-capabilities:8099 + depends_on: + - nats + - swapper-service + networks: + - dagi-network + restart: unless-stopped + # NATS (JetStream) nats: image: nats:2.10-alpine diff --git a/ops/CHANGELOG_FABRIC.md b/ops/CHANGELOG_FABRIC.md index 1265be93..75ecaa65 100644 --- a/ops/CHANGELOG_FABRIC.md +++ b/ops/CHANGELOG_FABRIC.md @@ -1,5 +1,37 @@ # Agent Fabric Layer — Changelog +## v0.4 — P3.2/P3.3/P3.4 Multi-node Deploy + Auth + Prometheus (2026-02-27) + +### P3.2 — NCS + Node Worker on NODA1 +- Added `node-worker` service to `docker-compose.node1.yml` (NODE_ID=noda1) +- NCS on NODA1 now has `NODE_WORKER_URL` for metrics collection +- Fixed NODE_ID consistency: router on NODA1 now uses `noda1` (was `node-1-hetzner-gex44`) +- Global pool will show 2 nodes after NODA1 deployment + +### P3.3 — NATS Accounts/Auth Config +- Created `config/nats-server.conf` with 3 accounts: SYS, FABRIC, APP +- FABRIC account: per-user permissions for router, ncs, node_worker +- Leafnode listener on :7422 with auth +- Opt-in: not yet active (requires credential setup + client changes) + +### P3.4 — Prometheus Counters +- **Router** (`/fabric_metrics`): + - `fabric_caps_refresh_total{status}`, `fabric_caps_stale_total` + - `fabric_model_select_total{chosen_node,chosen_runtime,type}` + - `fabric_offload_total{status,node,type}` + - `fabric_breaker_state{node,type}` (gauge) + - `fabric_score_ms` (histogram: 100-10000ms buckets) +- **Node Worker** (`/prom_metrics`): + - `node_worker_jobs_total{type,status}` + - `node_worker_inflight` (gauge) + - `node_worker_latency_ms{type,model}` (histogram) +- **NCS** (`/prom_metrics`): + - `ncs_runtime_health{runtime}` (gauge) + - `ncs_runtime_p50_ms{runtime}`, `ncs_runtime_p95_ms{runtime}` + - `ncs_node_wait_ms` + +--- + ## v0.3 — P3.1 GPU/Queue-aware Routing (2026-02-27) ### NCS (Node Capabilities Service) diff --git a/services/node-capabilities/main.py b/services/node-capabilities/main.py index 340c9138..c2562640 100644 --- a/services/node-capabilities/main.py +++ b/services/node-capabilities/main.py @@ -11,6 +11,7 @@ import httpx from metrics import ( build_node_load, build_runtime_load, record_latency, ) +import prom_metrics logging.basicConfig(level=logging.INFO) logger = logging.getLogger("node-capabilities") @@ -231,6 +232,7 @@ async def healthz(): @app.get("/capabilities") async def capabilities(): data = await _build_capabilities() + prom_metrics.update_from_caps(data) return JSONResponse(content=data) @@ -248,6 +250,15 @@ async def capabilities_refresh(): return JSONResponse(content={"refreshed": True, "served_count": data["served_count"]}) +@app.get("/prom_metrics") +async def prom_metrics_endpoint(): + data = prom_metrics.get_metrics_text() + if data: + from fastapi.responses import Response + return Response(content=data, media_type="text/plain; charset=utf-8") + return {"error": "prometheus_client not installed"} + + @app.post("/capabilities/report_latency") async def report_latency_endpoint(request: Request): data = await request.json() diff --git a/services/node-capabilities/prom_metrics.py b/services/node-capabilities/prom_metrics.py new file mode 100644 index 00000000..060f4da8 --- /dev/null +++ b/services/node-capabilities/prom_metrics.py @@ -0,0 +1,50 @@ +"""Prometheus metrics for NCS.""" +import logging + +logger = logging.getLogger("ncs_prom") + +try: + from prometheus_client import Gauge, CollectorRegistry, generate_latest + PROM_AVAILABLE = True + REGISTRY = CollectorRegistry() + + runtime_health = Gauge( + "ncs_runtime_health", "Runtime health (1=ok, 0=down)", + ["runtime"], registry=REGISTRY, + ) + runtime_p50 = Gauge( + "ncs_runtime_p50_ms", "Runtime p50 latency", + ["runtime"], registry=REGISTRY, + ) + runtime_p95 = Gauge( + "ncs_runtime_p95_ms", "Runtime p95 latency", + ["runtime"], registry=REGISTRY, + ) + node_wait = Gauge( + "ncs_node_wait_ms", "Estimated wait for node", + registry=REGISTRY, + ) + +except ImportError: + PROM_AVAILABLE = False + REGISTRY = None + + +def update_from_caps(caps: dict): + if not PROM_AVAILABLE: + return + nl = caps.get("node_load", {}) + node_wait.set(nl.get("estimated_wait_ms", 0)) + for rl in caps.get("runtime_load", []): + rt = rl.get("runtime", "?") + runtime_health.labels(runtime=rt).set(1 if rl.get("healthy") else 0) + if rl.get("p50_ms") is not None: + runtime_p50.labels(runtime=rt).set(rl["p50_ms"]) + if rl.get("p95_ms") is not None: + runtime_p95.labels(runtime=rt).set(rl["p95_ms"]) + + +def get_metrics_text(): + if PROM_AVAILABLE and REGISTRY: + return generate_latest(REGISTRY) + return None diff --git a/services/node-capabilities/requirements.txt b/services/node-capabilities/requirements.txt index 8cdb0009..93264bb1 100644 --- a/services/node-capabilities/requirements.txt +++ b/services/node-capabilities/requirements.txt @@ -2,3 +2,4 @@ fastapi>=0.110.0 uvicorn>=0.29.0 httpx>=0.27.0 nats-py>=2.7.0 +prometheus-client>=0.20.0 diff --git a/services/node-worker/fabric_metrics.py b/services/node-worker/fabric_metrics.py new file mode 100644 index 00000000..2f599403 --- /dev/null +++ b/services/node-worker/fabric_metrics.py @@ -0,0 +1,50 @@ +"""Prometheus metrics for Node Worker.""" +import logging + +logger = logging.getLogger("worker_metrics") + +try: + from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, generate_latest + PROM_AVAILABLE = True + REGISTRY = CollectorRegistry() + + jobs_total = Counter( + "node_worker_jobs_total", "Jobs processed", + ["type", "status"], registry=REGISTRY, + ) + inflight_gauge = Gauge( + "node_worker_inflight", "Currently inflight jobs", + registry=REGISTRY, + ) + latency_hist = Histogram( + "node_worker_latency_ms", "Job latency in ms", + ["type", "model"], + buckets=[100, 250, 500, 1000, 2500, 5000, 15000, 30000], + registry=REGISTRY, + ) + +except ImportError: + PROM_AVAILABLE = False + REGISTRY = None + logger.info("prometheus_client not installed, worker metrics disabled") + + +def inc_job(req_type: str, status: str): + if PROM_AVAILABLE: + jobs_total.labels(type=req_type, status=status).inc() + + +def set_inflight(count: int): + if PROM_AVAILABLE: + inflight_gauge.set(count) + + +def observe_latency(req_type: str, model: str, latency_ms: int): + if PROM_AVAILABLE: + latency_hist.labels(type=req_type, model=model).observe(latency_ms) + + +def get_metrics_text(): + if PROM_AVAILABLE and REGISTRY: + return generate_latest(REGISTRY) + return None diff --git a/services/node-worker/main.py b/services/node-worker/main.py index db9999e4..5abb099c 100644 --- a/services/node-worker/main.py +++ b/services/node-worker/main.py @@ -31,6 +31,16 @@ async def metrics(): return worker.get_metrics() +@app.get("/prom_metrics") +async def prom_metrics(): + from fastapi.responses import Response + import fabric_metrics as fm + data = fm.get_metrics_text() + if data: + return Response(content=data, media_type="text/plain; charset=utf-8") + return {"error": "prometheus_client not installed"} + + @app.on_event("startup") async def startup(): global _nats_client diff --git a/services/node-worker/requirements.txt b/services/node-worker/requirements.txt index 4a7c9644..2d4967f6 100644 --- a/services/node-worker/requirements.txt +++ b/services/node-worker/requirements.txt @@ -3,3 +3,4 @@ uvicorn>=0.29.0 httpx>=0.27.0 nats-py>=2.7.0 pydantic>=2.5.0 +prometheus-client>=0.20.0 diff --git a/services/node-worker/worker.py b/services/node-worker/worker.py index 5990b442..82118cc6 100644 --- a/services/node-worker/worker.py +++ b/services/node-worker/worker.py @@ -10,6 +10,7 @@ import config from models import JobRequest, JobResponse, JobError from idempotency import IdempotencyStore from providers import ollama, swapper_vision +import fabric_metrics as fm logger = logging.getLogger("node-worker") @@ -95,17 +96,21 @@ async def _handle_request(msg): global _inflight_count _inflight_count += 1 + fm.set_inflight(_inflight_count) try: async with _semaphore: resp = await _execute(job, remaining) finally: _inflight_count -= 1 + fm.set_inflight(_inflight_count) _idem.put(idem_key, resp) _idem.complete_inflight(idem_key, resp) resp.latency_ms = int((time.time() - t0) * 1000) + fm.inc_job(job.required_type, resp.status) if resp.status == "ok" and resp.latency_ms > 0: + fm.observe_latency(job.required_type, resp.model or "?", resp.latency_ms) buf = _latencies_llm if job.required_type in ("llm", "code") else _latencies_vision buf.append(resp.latency_ms) if len(buf) > _LATENCY_BUFFER: diff --git a/services/router/fabric_metrics.py b/services/router/fabric_metrics.py new file mode 100644 index 00000000..c364e0d4 --- /dev/null +++ b/services/router/fabric_metrics.py @@ -0,0 +1,82 @@ +"""Prometheus metrics for Fabric routing layer. + +Exposed via /fabric_metrics (separate from main /metrics to avoid conflicts). +Falls back to no-op counters if prometheus_client is not installed. +""" +import logging +import time +from typing import Optional + +logger = logging.getLogger("fabric_metrics") + +try: + from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, generate_latest + PROM_AVAILABLE = True + REGISTRY = CollectorRegistry() + + caps_refresh = Counter( + "fabric_caps_refresh_total", "Capabilities refresh attempts", + ["status"], registry=REGISTRY, + ) + caps_stale = Counter( + "fabric_caps_stale_total", "Stale capabilities used", + registry=REGISTRY, + ) + model_select = Counter( + "fabric_model_select_total", "Model selection decisions", + ["chosen_node", "chosen_runtime", "type"], registry=REGISTRY, + ) + offload_total = Counter( + "fabric_offload_total", "Offload attempts", + ["status", "node", "type"], registry=REGISTRY, + ) + breaker_state = Gauge( + "fabric_breaker_state", "Circuit breaker state (1=open)", + ["node", "type"], registry=REGISTRY, + ) + score_hist = Histogram( + "fabric_score_ms", "Model selection score distribution", + buckets=[100, 250, 500, 1000, 2000, 5000, 10000], + registry=REGISTRY, + ) + +except ImportError: + PROM_AVAILABLE = False + REGISTRY = None + logger.info("prometheus_client not installed, fabric metrics disabled") + + +def inc_caps_refresh(status: str): + if PROM_AVAILABLE: + caps_refresh.labels(status=status).inc() + + +def inc_caps_stale(): + if PROM_AVAILABLE: + caps_stale.inc() + + +def inc_model_select(node: str, runtime: str, req_type: str): + if PROM_AVAILABLE: + model_select.labels(chosen_node=node, chosen_runtime=runtime, type=req_type).inc() + + +def inc_offload(status: str, node: str, req_type: str): + if PROM_AVAILABLE: + offload_total.labels(status=status, node=node, type=req_type).inc() + + +def set_breaker(node: str, req_type: str, is_open: bool): + if PROM_AVAILABLE: + breaker_state.labels(node=node, type=req_type).set(1 if is_open else 0) + + +def observe_score(score: int): + if PROM_AVAILABLE: + score_hist.observe(score) + + +def get_metrics_text() -> Optional[bytes]: + if PROM_AVAILABLE and REGISTRY: + return generate_latest(REGISTRY) + return None diff --git a/services/router/main.py b/services/router/main.py index c96893fc..3e74d7d5 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -52,6 +52,7 @@ try: import global_capabilities_client from model_select import select_model_for_agent, ModelSelection, CLOUD_PROVIDERS as NCS_CLOUD_PROVIDERS import offload_client + import fabric_metrics as fm NCS_AVAILABLE = True except ImportError: NCS_AVAILABLE = False @@ -940,6 +941,17 @@ async def healthz(): return await health() +@app.get("/fabric_metrics") +async def fabric_metrics_endpoint(): + """Prometheus metrics for Fabric routing layer.""" + if NCS_AVAILABLE: + data = fm.get_metrics_text() + if data: + from starlette.responses import Response + return Response(content=data, media_type="text/plain; charset=utf-8") + return {"error": "fabric metrics not available"} + + @app.get("/monitor/status") async def monitor_status(request: Request = None): """ @@ -1747,6 +1759,8 @@ async def agent_infer(agent_id: str, request: InferRequest): timeout_ms=infer_timeout, ) if offload_resp and offload_resp.get("status") == "ok": + if NCS_AVAILABLE: + fm.inc_offload("ok", ncs_selection.node, job_payload["required_type"]) result_text = offload_resp.get("result", {}).get("text", "") return InferResponse( response=result_text, @@ -1756,6 +1770,8 @@ async def agent_infer(agent_id: str, request: InferRequest): ) else: offload_status = offload_resp.get("status", "none") if offload_resp else "no_reply" + if NCS_AVAILABLE: + fm.inc_offload(offload_status, ncs_selection.node, job_payload["required_type"]) logger.warning( f"[fallback] offload to {ncs_selection.node} failed ({offload_status}) " f"→ re-selecting with exclude={ncs_selection.node}, force_local" diff --git a/services/router/model_select.py b/services/router/model_select.py index 6fd02d37..a46da90e 100644 --- a/services/router/model_select.py +++ b/services/router/model_select.py @@ -326,6 +326,12 @@ async def select_model_for_agent( f"{' (force_local)' if force_local else ''}" f"{' (excluded: ' + ','.join(excl) + ')' if excl else ''}" ) + try: + from fabric_metrics import inc_model_select, observe_score + inc_model_select(sel.node, sel.runtime, reqs.required_type) + observe_score(sel.score) + except ImportError: + pass return sel logger.warning( f"[select] agent={agent_id} profile={profile} → no match "