From a605b8c43eb22017fbba24cb5c6f3fa0e92fda95 Mon Sep 17 00:00:00 2001 From: Apple Date: Fri, 27 Feb 2026 02:55:44 -0800 Subject: [PATCH] =?UTF-8?q?P3.1:=20GPU/Queue-aware=20routing=20=E2=80=94?= =?UTF-8?q?=20NCS=20metrics=20+=20scoring-based=20model=20selection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NCS (services/node-capabilities/metrics.py): - NodeLoad: inflight_jobs, queue_depth, concurrency_limit, estimated_wait_ms, cpu_load_1m, mem_pressure (macOS + Linux), rtt_ms_to_hub - RuntimeLoad: per-runtime healthy, p50_ms, p95_ms from rolling 50-sample window - POST /capabilities/report_latency for node-worker → NCS reporting - NCS fetches worker metrics via NODE_WORKER_URL Node Worker: - GET /metrics endpoint (inflight, concurrency, latency buffers) - Latency tracking per job type (llm/vision) with rolling buffer - Fire-and-forget latency reporting to NCS after each successful job Router (model_select v3): - score_candidate(): wait + model_latency + cross_node_penalty + prefer_bonus - LOCAL_THRESHOLD_MS=250: prefer local if within threshold of remote - ModelSelection.score field for observability - Structured [score] logs with chosen node, model, and score breakdown Tests: 19 new (12 scoring + 7 NCS metrics), 36 total pass Docs: ops/runbook_p3_1.md, ops/CHANGELOG_FABRIC.md No breaking changes to JobRequest/JobResponse or capabilities schema. Made-with: Cursor --- docker-compose.node2-sofiia.yml | 2 + ops/CHANGELOG_FABRIC.md | 54 ++++++++ ops/runbook_p3_1.md | 77 +++++++++++ services/node-capabilities/Dockerfile | 2 +- services/node-capabilities/main.py | 33 ++++- services/node-capabilities/metrics.py | 164 ++++++++++++++++++++++++ services/node-worker/main.py | 5 + services/node-worker/worker.py | 53 +++++++- services/router/model_select.py | 110 +++++++++++----- tests/test_ncs_metrics.py | 69 ++++++++++ tests/test_scoring.py | 177 ++++++++++++++++++++++++++ 11 files changed, 706 insertions(+), 40 deletions(-) create mode 100644 ops/CHANGELOG_FABRIC.md create mode 100644 ops/runbook_p3_1.md create mode 100644 services/node-capabilities/metrics.py create mode 100644 tests/test_ncs_metrics.py create mode 100644 tests/test_scoring.py diff --git a/docker-compose.node2-sofiia.yml b/docker-compose.node2-sofiia.yml index 61e72e1e..564cf77d 100644 --- a/docker-compose.node2-sofiia.yml +++ b/docker-compose.node2-sofiia.yml @@ -126,6 +126,7 @@ services: - CACHE_TTL_SEC=15 - ENABLE_NATS_CAPS=true - NATS_URL=nats://dagi-nats:4222 + - NODE_WORKER_URL=http://node-worker:8109 depends_on: - swapper-service - dagi-nats @@ -150,6 +151,7 @@ services: - NODE_DEFAULT_LLM=qwen3:14b - NODE_DEFAULT_VISION=llava:13b - NODE_WORKER_MAX_CONCURRENCY=2 + - NCS_REPORT_URL=http://node-capabilities:8099 depends_on: - dagi-nats - swapper-service diff --git a/ops/CHANGELOG_FABRIC.md b/ops/CHANGELOG_FABRIC.md new file mode 100644 index 00000000..1265be93 --- /dev/null +++ b/ops/CHANGELOG_FABRIC.md @@ -0,0 +1,54 @@ +# Agent Fabric Layer — Changelog + +## v0.3 — P3.1 GPU/Queue-aware Routing (2026-02-27) + +### NCS (Node Capabilities Service) +- **NEW** `metrics.py` module: NodeLoad + RuntimeLoad collection +- Capabilities payload now includes `node_load` and `runtime_load` +- `node_load`: inflight_jobs, queue_depth, concurrency_limit, estimated_wait_ms, cpu_load_1m, mem_pressure +- `runtime_load`: per-runtime healthy status, p50_ms, p95_ms from rolling window +- **NEW** `POST /capabilities/report_latency` — accepts latency reports from node-worker +- NCS fetches worker metrics via `NODE_WORKER_URL` env + +### Node Worker +- **NEW** `GET /metrics` endpoint: inflight_jobs, concurrency_limit, last_latencies_llm/vision +- Latency tracking: rolling buffer of last 50 latencies per type +- Fire-and-forget latency reporting to NCS after each successful job + +### Router (model_select v3) +- **NEW** `score_candidate()` function: wait + model_latency + cross_penalty + prefer_bonus +- Selection uses scoring instead of simple local-first ordering +- `LOCAL_THRESHOLD_MS = 250`: prefer local if within threshold of remote +- `ModelSelection.score` field added +- Structured log format: `[score] agent=X type=Y chosen=LOCAL:node/model score=N` + +### Tests +- 12 scoring tests (local wins, remote wins, exclude, breaker, type filter, prefer list, cross penalty, wait, threshold) +- 7 NCS metrics tests (latency stats, cpu load, mem pressure, node load, runtime load) + +### No Breaking Changes +- JobRequest/JobResponse envelope unchanged +- Existing capabilities fields preserved +- All new fields are optional/additive + +--- + +## v0.2 — P2.2+P2.3 NATS Offload (2026-02-26) + +- Node Worker service (NATS offload executor) +- offload_client.py (circuit breaker, retries, deadline) +- model_select with exclude_nodes + force_local +- Router /infer remote offload path + +## v0.1 — P2 Global Capabilities (2026-02-26) + +- Node Capabilities Service (NCS) on each node +- global_capabilities_client.py (NATS scatter-gather discovery) +- model_select v2 (multi-node aware) +- NATS wildcard discovery: node.*.capabilities.get + +## v0.0 — P1 NCS-first Selection (2026-02-26) + +- capabilities_client.py (single-node HTTP) +- model_select v1 (profile → NCS → static fallback) +- Grok API integration fix diff --git a/ops/runbook_p3_1.md b/ops/runbook_p3_1.md new file mode 100644 index 00000000..71df19a5 --- /dev/null +++ b/ops/runbook_p3_1.md @@ -0,0 +1,77 @@ +# P3.1 — GPU/Queue-aware Routing Runbook + +## What Changed + +NCS now exposes **runtime health and load metrics** alongside model inventory. +Router uses a **scoring function** to pick the fastest node+model combo. +Node-worker reports latencies back to NCS for p50/p95 calculation. + +## Verification Commands + +### 1. NCS capabilities with load metrics +```bash +curl -s http://127.0.0.1:8099/capabilities | jq '.node_load' +``` +Expected: `inflight_jobs`, `estimated_wait_ms`, `cpu_load_1m`, `mem_pressure` + +### 2. Runtime load (p50/p95) +```bash +curl -s http://127.0.0.1:8099/capabilities | jq '.runtime_load' +``` +Expected: per-runtime `p50_ms`, `p95_ms` after some traffic + +### 3. Node-worker metrics +```bash +curl -s http://127.0.0.1:8109/metrics | jq +``` +Expected: `inflight_jobs`, `concurrency_limit`, `last_latencies_llm` + +### 4. NATS capabilities (includes metrics) +```bash +nats req node.noda2.capabilities.get '{}' +``` + +### 5. Router scoring logs +```bash +docker logs dagi-router-node2 2>&1 | grep '\[score\]' +``` +Expected: `chosen=LOCAL:nodeX/modelY score=NNN` + +### 6. Report latency manually +```bash +curl -s -X POST http://127.0.0.1:8099/capabilities/report_latency \ + -H "Content-Type: application/json" \ + -d '{"runtime":"ollama","type":"llm","latency_ms":450}' +``` + +## Scoring Formula + +``` +score = wait + model_latency + cross_node_penalty + prefer_bonus + +wait = node_load.estimated_wait_ms (0 if idle) +model_latency = model_p50_ms or runtime p50_ms or 1500 (default) +cross_penalty = 0 if local, else rtt_ms * 2 +prefer_bonus = -1000 for first prefer match, -900 for second, etc. +``` + +If best_local_score <= best_remote_score + 250ms → prefer local. + +## Estimated Wait Formula + +``` +if inflight_jobs < concurrency_limit: + estimated_wait = 0 +else: + estimated_wait = (inflight - concurrency + 1) * p50_ms +``` + +## Troubleshooting + +| Symptom | Check | Fix | +|---------|-------|-----| +| NCS shows `p50=null` | No traffic yet | Send test requests | +| `estimated_wait_ms` always 0 | Inflight < limit | Expected if not saturated | +| `mem_pressure=null` | Container lacks `memory_pressure` | Expected in Docker | +| Scoring always picks local | Remote score higher | Check remote rtt/wait | +| Node-worker latencies empty | NCS can't reach worker | Check `NODE_WORKER_URL` env | diff --git a/services/node-capabilities/Dockerfile b/services/node-capabilities/Dockerfile index 6fad1d0b..69e955a3 100644 --- a/services/node-capabilities/Dockerfile +++ b/services/node-capabilities/Dockerfile @@ -2,6 +2,6 @@ FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY main.py . +COPY . . EXPOSE 8099 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8099"] diff --git a/services/node-capabilities/main.py b/services/node-capabilities/main.py index 7e552507..340c9138 100644 --- a/services/node-capabilities/main.py +++ b/services/node-capabilities/main.py @@ -4,10 +4,14 @@ import time import logging from typing import Any, Dict, List, Optional -from fastapi import FastAPI +from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import httpx +from metrics import ( + build_node_load, build_runtime_load, record_latency, +) + logging.basicConfig(level=logging.INFO) logger = logging.getLogger("node-capabilities") @@ -195,20 +199,24 @@ async def _build_capabilities() -> Dict[str, Any]: disk = _collect_disk_inventory() served = _build_served_models(ollama, swapper, llama) + runtimes = {"ollama": ollama, "swapper": swapper} + if llama: + runtimes["llama_server"] = llama + + node_load = await build_node_load() + runtime_load = await build_runtime_load(runtimes) + result = { "node_id": NODE_ID, "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "runtimes": { - "ollama": ollama, - "swapper": swapper, - }, + "runtimes": runtimes, "served_models": served, "served_count": len(served), + "node_load": node_load, + "runtime_load": runtime_load, "inventory_only": disk, "inventory_count": len(disk), } - if llama: - result["runtimes"]["llama_server"] = llama _cache = result _cache_ts = time.time() @@ -240,6 +248,17 @@ async def capabilities_refresh(): return JSONResponse(content={"refreshed": True, "served_count": data["served_count"]}) +@app.post("/capabilities/report_latency") +async def report_latency_endpoint(request: Request): + data = await request.json() + runtime = data.get("runtime", "ollama") + req_type = data.get("type", "llm") + latency_ms = data.get("latency_ms", 0) + if latency_ms > 0: + record_latency(runtime, req_type, latency_ms) + return {"ok": True} + + # ── NATS request/reply (optional) ───────────────────────────────────────────── ENABLE_NATS = os.getenv("ENABLE_NATS_CAPS", "false").lower() in ("true", "1", "yes") diff --git a/services/node-capabilities/metrics.py b/services/node-capabilities/metrics.py new file mode 100644 index 00000000..cb15b21f --- /dev/null +++ b/services/node-capabilities/metrics.py @@ -0,0 +1,164 @@ +"""Runtime health and load metrics for NCS capabilities payload.""" +import logging +import os +import platform +import subprocess +import time +from collections import deque +from typing import Any, Dict, List, Optional + +import httpx + +logger = logging.getLogger("ncs-metrics") + +NODE_WORKER_URL = os.getenv("NODE_WORKER_URL", "http://127.0.0.1:8109") +_latency_buffer: Dict[str, deque] = {} # key: "runtime:type" → deque of (latency_ms, ts) +LATENCY_BUFFER_SIZE = 50 + + +def record_latency(runtime: str, req_type: str, latency_ms: int): + key = f"{runtime}:{req_type}" + buf = _latency_buffer.setdefault(key, deque(maxlen=LATENCY_BUFFER_SIZE)) + buf.append((latency_ms, time.time())) + + +def _percentile(values: List[int], p: float) -> int: + if not values: + return 0 + s = sorted(values) + idx = int(len(s) * p / 100) + return s[min(idx, len(s) - 1)] + + +def get_latency_stats(runtime: str, req_type: str) -> Dict[str, Optional[int]]: + key = f"{runtime}:{req_type}" + buf = _latency_buffer.get(key) + if not buf or len(buf) == 0: + return {"p50_ms": None, "p95_ms": None, "samples": 0} + cutoff = time.time() - 600 + recent = [lat for lat, ts in buf if ts > cutoff] + if not recent: + return {"p50_ms": None, "p95_ms": None, "samples": 0} + return { + "p50_ms": _percentile(recent, 50), + "p95_ms": _percentile(recent, 95), + "samples": len(recent), + } + + +async def fetch_worker_metrics() -> Dict[str, Any]: + """Fetch inflight/concurrency from local node-worker /metrics.""" + defaults = {"inflight_jobs": 0, "concurrency_limit": 1, "queue_depth": 0, + "last_latencies_llm": [], "last_latencies_vision": []} + try: + async with httpx.AsyncClient(timeout=2) as c: + r = await c.get(f"{NODE_WORKER_URL}/metrics") + if r.status_code == 200: + return r.json() + except Exception as e: + logger.debug(f"Node-worker metrics unavailable: {e}") + return defaults + + +def get_cpu_load() -> Optional[float]: + try: + return round(os.getloadavg()[0], 2) + except (OSError, AttributeError): + return None + + +def get_mem_pressure() -> Optional[str]: + """macOS: use memory_pressure -Q or vm_stat. Linux: /proc/meminfo.""" + if platform.system() == "Darwin": + try: + out = subprocess.check_output( + ["memory_pressure", "-Q"], timeout=2, stderr=subprocess.DEVNULL + ).decode() + for line in out.splitlines(): + ll = line.lower() + if "system-wide" in ll and "level" in ll: + if "critical" in ll: + return "critical" + if "warn" in ll: + return "high" + if "normal" in ll: + return "low" + return "low" + except Exception: + try: + out = subprocess.check_output( + ["vm_stat"], timeout=2, stderr=subprocess.DEVNULL + ).decode() + return "low" + except Exception: + return None + elif platform.system() == "Linux": + try: + with open("/proc/meminfo") as f: + info = {} + for line in f: + parts = line.split(":") + if len(parts) == 2: + info[parts[0].strip()] = int(parts[1].strip().split()[0]) + total = info.get("MemTotal", 1) + avail = info.get("MemAvailable", total) + ratio = avail / total + if ratio < 0.05: + return "critical" + elif ratio < 0.15: + return "high" + elif ratio < 0.30: + return "medium" + return "low" + except Exception: + return None + return None + + +async def build_node_load(worker_metrics: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Build NodeLoad object for capabilities payload.""" + wm = worker_metrics or await fetch_worker_metrics() + + inflight = wm.get("inflight_jobs", 0) + concurrency = wm.get("concurrency_limit", 1) + queue_depth = wm.get("queue_depth", 0) + + llm_stats = get_latency_stats("ollama", "llm") + p50 = llm_stats["p50_ms"] or 1500 + + if inflight < concurrency: + estimated_wait = 0 + else: + estimated_wait = (inflight - concurrency + 1) * p50 + + return { + "ts": int(time.time() * 1000), + "inflight_jobs": inflight, + "queue_depth": queue_depth, + "concurrency_limit": concurrency, + "estimated_wait_ms": estimated_wait, + "cpu_load_1m": get_cpu_load(), + "mem_pressure": get_mem_pressure(), + "rtt_ms_to_hub": None, + } + + +async def build_runtime_load(runtimes: Dict[str, Any]) -> List[Dict[str, Any]]: + """Build RuntimeLoad list from collected runtimes.""" + result = [] + for rt_name, rt_data in runtimes.items(): + status = rt_data.get("status", "unknown") + healthy = status == "ok" + + llm_stats = get_latency_stats(rt_name, "llm") + vis_stats = get_latency_stats(rt_name, "vision") + best_stats = vis_stats if vis_stats["samples"] > llm_stats["samples"] else llm_stats + + result.append({ + "runtime": rt_name, + "healthy": healthy, + "last_check_ms": int(time.time() * 1000), + "p50_ms": best_stats["p50_ms"], + "p95_ms": best_stats["p95_ms"], + }) + return result diff --git a/services/node-worker/main.py b/services/node-worker/main.py index f85b3211..db9999e4 100644 --- a/services/node-worker/main.py +++ b/services/node-worker/main.py @@ -26,6 +26,11 @@ async def healthz(): } +@app.get("/metrics") +async def metrics(): + return worker.get_metrics() + + @app.on_event("startup") async def startup(): global _nats_client diff --git a/services/node-worker/worker.py b/services/node-worker/worker.py index a0fd2a0a..5990b442 100644 --- a/services/node-worker/worker.py +++ b/services/node-worker/worker.py @@ -2,6 +2,7 @@ import asyncio import json import logging +import os import time from typing import Any, Dict @@ -15,6 +16,10 @@ logger = logging.getLogger("node-worker") _idem = IdempotencyStore() _semaphore: asyncio.Semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY) _nats_client = None +_inflight_count: int = 0 +_latencies_llm: list = [] +_latencies_vision: list = [] +_LATENCY_BUFFER = 50 async def start(nats_client): @@ -88,12 +93,25 @@ async def _handle_request(msg): await _reply(msg, resp) return - async with _semaphore: - resp = await _execute(job, remaining) + global _inflight_count + _inflight_count += 1 + try: + async with _semaphore: + resp = await _execute(job, remaining) + finally: + _inflight_count -= 1 _idem.put(idem_key, resp) _idem.complete_inflight(idem_key, resp) resp.latency_ms = int((time.time() - t0) * 1000) + + if resp.status == "ok" and resp.latency_ms > 0: + buf = _latencies_llm if job.required_type in ("llm", "code") else _latencies_vision + buf.append(resp.latency_ms) + if len(buf) > _LATENCY_BUFFER: + del buf[:len(buf) - _LATENCY_BUFFER] + _report_latency_async(job.required_type, resp.provider or "ollama", resp.latency_ms) + await _reply(msg, resp) except Exception as e: @@ -179,6 +197,37 @@ async def _execute(job: JobRequest, remaining_ms: int) -> JobResponse: ) +def get_metrics() -> Dict[str, Any]: + return { + "inflight_jobs": _inflight_count, + "concurrency_limit": config.MAX_CONCURRENCY, + "queue_depth": 0, + "last_latencies_llm": list(_latencies_llm[-_LATENCY_BUFFER:]), + "last_latencies_vision": list(_latencies_vision[-_LATENCY_BUFFER:]), + } + + +def _report_latency_async(req_type: str, runtime: str, latency_ms: int): + """Fire-and-forget latency report to local NCS.""" + import httpx as _httpx + + ncs_url = os.getenv("NCS_REPORT_URL", "http://node-capabilities:8099") + + async def _do(): + try: + async with _httpx.AsyncClient(timeout=1) as c: + await c.post(f"{ncs_url}/capabilities/report_latency", json={ + "runtime": runtime, "type": req_type, "latency_ms": latency_ms, + }) + except Exception: + pass + + try: + asyncio.get_event_loop().create_task(_do()) + except RuntimeError: + pass + + async def _reply(msg, resp: JobResponse): if msg.reply: await _nats_client.publish(msg.reply, resp.model_dump_json().encode()) diff --git a/services/router/model_select.py b/services/router/model_select.py index 7d9a173f..6fd02d37 100644 --- a/services/router/model_select.py +++ b/services/router/model_select.py @@ -26,6 +26,9 @@ class ProfileRequirements: constraints: Dict[str, Any] = field(default_factory=dict) +LOCAL_THRESHOLD_MS = 250 + + @dataclass class ModelSelection: runtime: str # ollama | swapper | llama_server | cloud @@ -39,6 +42,7 @@ class ModelSelection: via_nats: bool = False fallback_reason: str = "" caps_age_s: float = 0.0 + score: int = 0 # lower = faster # ── Profile resolution ──────────────────────────────────────────────────────── @@ -105,6 +109,56 @@ def profile_requirements( ) +# ── Scoring ─────────────────────────────────────────────────────────────────── + +def score_candidate( + model: Dict[str, Any], + capabilities: Dict[str, Any], + prefer: List[str], + rtt_hint_ms: int = 60, +) -> int: + """Lower score = better candidate. + + Formula: wait + model_latency + cross_node_penalty + prefer_bonus + """ + is_local = model.get("local", False) + node_id = model.get("node", "") + + node_load = capabilities.get("node_load", {}) + if not is_local: + for ndata in capabilities.get("nodes", {}).values(): + if ndata.get("node_id") == node_id: + node_load = ndata.get("node_load", {}) + break + + wait = node_load.get("estimated_wait_ms", 0) + + model_lat = model.get("model_p50_ms") or 0 + if not model_lat: + runtime_loads = capabilities.get("runtime_load", []) + rt = model.get("runtime", "ollama") + for rl in runtime_loads: + if rl.get("runtime") == rt: + model_lat = rl.get("p50_ms") or 0 + break + if not model_lat: + model_lat = 1500 + + rtt = 0 if is_local else (node_load.get("rtt_ms_to_hub") or rtt_hint_ms or 60) + cross_penalty = 0 if is_local else (rtt * 2) + + prefer_bonus = 0 + name = model.get("name", "") + for i, pref in enumerate(prefer): + if pref == "*": + break + if pref == name or pref in name: + prefer_bonus = -(1000 - i * 100) + break + + return wait + model_lat + cross_penalty + prefer_bonus + + # ── Multi-node model selection ──────────────────────────────────────────────── def select_best_model( @@ -114,10 +168,8 @@ def select_best_model( ) -> Optional[ModelSelection]: """Choose the best served model from global (multi-node) capabilities. - Selection order: - 1. Prefer list matches (local first, then remote) - 2. Best candidate by size (local first, then remote) - 3. None → caller should try static fallback + Uses scoring: wait + model_latency + cross_node_rtt + prefer_bonus. + If best local score <= best remote score + LOCAL_THRESHOLD_MS, prefer local. exclude_nodes: set of node_ids to skip (e.g. circuit-broken nodes). """ @@ -140,35 +192,34 @@ def select_best_model( if not candidates: return None - local_candidates = [m for m in candidates if m.get("local", False)] - remote_candidates = [m for m in candidates if not m.get("local", False)] - prefer = reqs.prefer if reqs.prefer else [] + scored = [(score_candidate(m, capabilities, prefer), m) for m in candidates] + scored.sort(key=lambda x: x[0]) - for pref in prefer: - if pref == "*": - break - for m in local_candidates: - if pref == m.get("name") or pref in m.get("name", ""): - return _make_selection(m, capabilities) - for m in remote_candidates: - if pref == m.get("name") or pref in m.get("name", ""): - return _make_selection(m, capabilities) + local_scored = [(s, m) for s, m in scored if m.get("local", False)] + remote_scored = [(s, m) for s, m in scored if not m.get("local", False)] - if local_candidates: - return _make_selection(_pick_best(local_candidates), capabilities) - if remote_candidates: - return _make_selection(_pick_best(remote_candidates), capabilities) + best_local = local_scored[0] if local_scored else None + best_remote = remote_scored[0] if remote_scored else None + + if best_local and best_remote: + if best_local[0] <= best_remote[0] + LOCAL_THRESHOLD_MS: + sel = _make_selection(best_local[1], capabilities) + sel.score = best_local[0] + return sel + sel = _make_selection(best_remote[1], capabilities) + sel.score = best_remote[0] + return sel + + winner = (best_local or best_remote) + if winner: + sel = _make_selection(winner[1], capabilities) + sel.score = winner[0] + return sel return None -def _pick_best(candidates: List[Dict[str, Any]]) -> Dict[str, Any]: - running = [m for m in candidates if m.get("running")] - pool = running if running else candidates - return max(pool, key=lambda m: m.get("size_gb", 0)) - - def _make_selection( model: Dict[str, Any], capabilities: Dict[str, Any], @@ -269,10 +320,9 @@ async def select_model_for_agent( ) if sel: logger.info( - f"[select] agent={agent_id} profile={profile} → " - f"{'LOCAL' if sel.local else 'REMOTE'} " - f"node={sel.node} runtime={sel.runtime} " - f"model={sel.name} caps_age={sel.caps_age_s}s" + f"[score] agent={agent_id} type={reqs.required_type} " + f"chosen={'LOCAL' if sel.local else 'REMOTE'}:{sel.node}/{sel.name} " + f"score={sel.score} caps_age={sel.caps_age_s}s" f"{' (force_local)' if force_local else ''}" f"{' (excluded: ' + ','.join(excl) + ')' if excl else ''}" ) diff --git a/tests/test_ncs_metrics.py b/tests/test_ncs_metrics.py new file mode 100644 index 00000000..fa03a08d --- /dev/null +++ b/tests/test_ncs_metrics.py @@ -0,0 +1,69 @@ +"""Tests for NCS metrics module.""" +import sys +import os +import asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "node-capabilities")) + +from metrics import ( + record_latency, get_latency_stats, get_cpu_load, get_mem_pressure, + build_node_load, build_runtime_load, _latency_buffer, +) + + +def setup_function(): + _latency_buffer.clear() + + +def test_record_and_get_latency(): + record_latency("ollama", "llm", 500) + record_latency("ollama", "llm", 300) + record_latency("ollama", "llm", 700) + stats = get_latency_stats("ollama", "llm") + assert stats["samples"] == 3 + assert stats["p50_ms"] == 500 + assert stats["p95_ms"] == 700 + + +def test_empty_latency_stats(): + stats = get_latency_stats("nonexistent", "llm") + assert stats["p50_ms"] is None + assert stats["samples"] == 0 + + +def test_cpu_load_returns_float_or_none(): + result = get_cpu_load() + assert result is None or isinstance(result, float) + + +def test_mem_pressure_returns_valid_or_none(): + result = get_mem_pressure() + assert result is None or result in ("low", "medium", "high", "critical") + + +def test_build_node_load_defaults(): + result = asyncio.run(build_node_load(worker_metrics={ + "inflight_jobs": 0, "concurrency_limit": 2, "queue_depth": 0, + })) + assert result["inflight_jobs"] == 0 + assert result["estimated_wait_ms"] == 0 + assert result["concurrency_limit"] == 2 + assert "ts" in result + + +def test_build_node_load_wait_when_busy(): + record_latency("ollama", "llm", 1000) + result = asyncio.run(build_node_load(worker_metrics={ + "inflight_jobs": 5, "concurrency_limit": 2, "queue_depth": 0, + })) + assert result["estimated_wait_ms"] == 4 * 1000 + + +def test_build_runtime_load(): + runtimes = {"ollama": {"status": "ok"}, "swapper": {"status": "error: timeout"}} + result = asyncio.run(build_runtime_load(runtimes)) + assert len(result) == 2 + ollama_rl = next(r for r in result if r["runtime"] == "ollama") + assert ollama_rl["healthy"] is True + swapper_rl = next(r for r in result if r["runtime"] == "swapper") + assert swapper_rl["healthy"] is False diff --git a/tests/test_scoring.py b/tests/test_scoring.py new file mode 100644 index 00000000..b3282f67 --- /dev/null +++ b/tests/test_scoring.py @@ -0,0 +1,177 @@ +"""Tests for P3.1 scoring-based model selection.""" +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router")) + +from model_select import ( + score_candidate, + select_best_model, + ProfileRequirements, + ModelSelection, + LOCAL_THRESHOLD_MS, +) + + +def _caps(served, node_load=None, runtime_load=None, nodes=None): + return { + "served_models": served, + "node_load": node_load or {}, + "runtime_load": runtime_load or [], + "nodes": nodes or {}, + } + + +def _model(name, typ="llm", local=True, node="n1", runtime="ollama", **kw): + return {"name": name, "type": typ, "local": local, "node": node, + "runtime": runtime, "base_url": "http://x", **kw} + + +def _reqs(typ="llm", prefer=None): + return ProfileRequirements("test", typ, prefer or []) + + +# ── 1) local wins when scores close ──────────────────────────────────────── + +def test_local_wins_when_scores_close(): + caps = _caps( + served=[ + _model("qwen3:14b", local=True, node="n1"), + _model("qwen3:14b", local=False, node="n2"), + ], + node_load={"estimated_wait_ms": 0, "rtt_ms_to_hub": None}, + ) + sel = select_best_model(_reqs(), caps) + assert sel is not None + assert sel.local is True + assert sel.node == "n1" + + +# ── 2) remote wins when local wait is high ───────────────────────────────── + +def test_remote_wins_when_local_wait_high(): + caps = _caps( + served=[ + _model("qwen3:14b", local=True, node="n1"), + _model("qwen3:14b", local=False, node="n2"), + ], + node_load={"estimated_wait_ms": 5000, "rtt_ms_to_hub": None}, + nodes={"n2": {"node_id": "n2", "node_load": {"estimated_wait_ms": 0, "rtt_ms_to_hub": 50}}}, + ) + sel = select_best_model(_reqs(), caps) + assert sel is not None + assert sel.local is False + assert sel.node == "n2" + + +# ── 3) exclude_nodes works ───────────────────────────────────────────────── + +def test_exclude_nodes_works(): + caps = _caps(served=[ + _model("qwen3:14b", local=False, node="n2"), + _model("qwen3:14b", local=False, node="n3"), + ]) + sel = select_best_model(_reqs(), caps, exclude_nodes={"n2"}) + assert sel is not None + assert sel.node == "n3" + + +# ── 4) breaker open → node excluded (via exclude_nodes) ─────────────────── + +def test_breaker_excludes_node(): + caps = _caps(served=[ + _model("qwen3:14b", local=False, node="broken"), + _model("qwen3:14b", local=True, node="n1"), + ]) + sel = select_best_model(_reqs(), caps, exclude_nodes={"broken"}) + assert sel is not None + assert sel.node == "n1" + + +# ── 5) required_type filter ──────────────────────────────────────────────── + +def test_required_type_filter(): + caps = _caps(served=[ + _model("qwen3:14b", typ="llm"), + _model("llava:13b", typ="vision"), + ]) + sel = select_best_model(_reqs(typ="vision"), caps) + assert sel is not None + assert sel.name == "llava:13b" + + +# ── 6) prefer list filter ───────────────────────────────────────────────── + +def test_prefer_list_selects_preferred(): + caps = _caps(served=[ + _model("qwen3:14b"), + _model("qwen3.5:35b"), + ]) + sel = select_best_model(_reqs(prefer=["qwen3.5:35b"]), caps) + assert sel is not None + assert sel.name == "qwen3.5:35b" + + +# ── 7) score formula — prefer bonus lowers score ────────────────────────── + +def test_prefer_bonus_lowers_score(): + m1 = _model("qwen3:14b") + m2 = _model("qwen3.5:35b") + caps = _caps(served=[m1, m2]) + s1 = score_candidate(m1, caps, prefer=["qwen3:14b"]) + s2 = score_candidate(m2, caps, prefer=["qwen3:14b"]) + assert s1 < s2 + + +# ── 8) score formula — cross_penalty for remote ────────────────────────── + +def test_cross_penalty_for_remote(): + local = _model("m", local=True) + remote = _model("m", local=False, node="r1") + caps = _caps(served=[local, remote]) + sl = score_candidate(local, caps, prefer=[]) + sr = score_candidate(remote, caps, prefer=[], rtt_hint_ms=50) + assert sr > sl + + +# ── 9) score formula — wait increases score ────────────────────────────── + +def test_wait_increases_score(): + m = _model("m", local=True) + caps_idle = _caps(served=[m], node_load={"estimated_wait_ms": 0}) + caps_busy = _caps(served=[m], node_load={"estimated_wait_ms": 3000}) + s_idle = score_candidate(m, caps_idle, prefer=[]) + s_busy = score_candidate(m, caps_busy, prefer=[]) + assert s_busy > s_idle + + +# ── 10) no candidates → None ───────────────────────────────────────────── + +def test_no_candidates_returns_none(): + caps = _caps(served=[_model("m", typ="stt")]) + sel = select_best_model(_reqs(typ="llm"), caps) + assert sel is None + + +# ── 11) local threshold: local wins within threshold even if remote lower ─ + +def test_local_threshold(): + caps = _caps( + served=[ + _model("qwen3:14b", local=True, node="n1"), + _model("qwen3:14b", local=False, node="n2"), + ], + node_load={"estimated_wait_ms": 100}, + nodes={"n2": {"node_id": "n2", "node_load": {"estimated_wait_ms": 0, "rtt_ms_to_hub": 10}}}, + ) + sel = select_best_model(_reqs(), caps) + assert sel.local is True + + +# ── 12) code type cross-filters with llm ───────────────────────────────── + +def test_code_type_finds_llm_models(): + caps = _caps(served=[_model("qwen3:14b", typ="llm")]) + sel = select_best_model(_reqs(typ="code"), caps) + assert sel is not None + assert sel.name == "qwen3:14b"