NCS (services/node-capabilities/metrics.py): - NodeLoad: inflight_jobs, queue_depth, concurrency_limit, estimated_wait_ms, cpu_load_1m, mem_pressure (macOS + Linux), rtt_ms_to_hub - RuntimeLoad: per-runtime healthy, p50_ms, p95_ms from rolling 50-sample window - POST /capabilities/report_latency for node-worker → NCS reporting - NCS fetches worker metrics via NODE_WORKER_URL Node Worker: - GET /metrics endpoint (inflight, concurrency, latency buffers) - Latency tracking per job type (llm/vision) with rolling buffer - Fire-and-forget latency reporting to NCS after each successful job Router (model_select v3): - score_candidate(): wait + model_latency + cross_node_penalty + prefer_bonus - LOCAL_THRESHOLD_MS=250: prefer local if within threshold of remote - ModelSelection.score field for observability - Structured [score] logs with chosen node, model, and score breakdown Tests: 19 new (12 scoring + 7 NCS metrics), 36 total pass Docs: ops/runbook_p3_1.md, ops/CHANGELOG_FABRIC.md No breaking changes to JobRequest/JobResponse or capabilities schema. Made-with: Cursor
165 lines
5.4 KiB
Python
165 lines
5.4 KiB
Python
"""Runtime health and load metrics for NCS capabilities payload."""
|
|
import logging
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
import time
|
|
from collections import deque
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger("ncs-metrics")
|
|
|
|
NODE_WORKER_URL = os.getenv("NODE_WORKER_URL", "http://127.0.0.1:8109")
|
|
_latency_buffer: Dict[str, deque] = {} # key: "runtime:type" → deque of (latency_ms, ts)
|
|
LATENCY_BUFFER_SIZE = 50
|
|
|
|
|
|
def record_latency(runtime: str, req_type: str, latency_ms: int):
|
|
key = f"{runtime}:{req_type}"
|
|
buf = _latency_buffer.setdefault(key, deque(maxlen=LATENCY_BUFFER_SIZE))
|
|
buf.append((latency_ms, time.time()))
|
|
|
|
|
|
def _percentile(values: List[int], p: float) -> int:
|
|
if not values:
|
|
return 0
|
|
s = sorted(values)
|
|
idx = int(len(s) * p / 100)
|
|
return s[min(idx, len(s) - 1)]
|
|
|
|
|
|
def get_latency_stats(runtime: str, req_type: str) -> Dict[str, Optional[int]]:
|
|
key = f"{runtime}:{req_type}"
|
|
buf = _latency_buffer.get(key)
|
|
if not buf or len(buf) == 0:
|
|
return {"p50_ms": None, "p95_ms": None, "samples": 0}
|
|
cutoff = time.time() - 600
|
|
recent = [lat for lat, ts in buf if ts > cutoff]
|
|
if not recent:
|
|
return {"p50_ms": None, "p95_ms": None, "samples": 0}
|
|
return {
|
|
"p50_ms": _percentile(recent, 50),
|
|
"p95_ms": _percentile(recent, 95),
|
|
"samples": len(recent),
|
|
}
|
|
|
|
|
|
async def fetch_worker_metrics() -> Dict[str, Any]:
|
|
"""Fetch inflight/concurrency from local node-worker /metrics."""
|
|
defaults = {"inflight_jobs": 0, "concurrency_limit": 1, "queue_depth": 0,
|
|
"last_latencies_llm": [], "last_latencies_vision": []}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=2) as c:
|
|
r = await c.get(f"{NODE_WORKER_URL}/metrics")
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
except Exception as e:
|
|
logger.debug(f"Node-worker metrics unavailable: {e}")
|
|
return defaults
|
|
|
|
|
|
def get_cpu_load() -> Optional[float]:
|
|
try:
|
|
return round(os.getloadavg()[0], 2)
|
|
except (OSError, AttributeError):
|
|
return None
|
|
|
|
|
|
def get_mem_pressure() -> Optional[str]:
|
|
"""macOS: use memory_pressure -Q or vm_stat. Linux: /proc/meminfo."""
|
|
if platform.system() == "Darwin":
|
|
try:
|
|
out = subprocess.check_output(
|
|
["memory_pressure", "-Q"], timeout=2, stderr=subprocess.DEVNULL
|
|
).decode()
|
|
for line in out.splitlines():
|
|
ll = line.lower()
|
|
if "system-wide" in ll and "level" in ll:
|
|
if "critical" in ll:
|
|
return "critical"
|
|
if "warn" in ll:
|
|
return "high"
|
|
if "normal" in ll:
|
|
return "low"
|
|
return "low"
|
|
except Exception:
|
|
try:
|
|
out = subprocess.check_output(
|
|
["vm_stat"], timeout=2, stderr=subprocess.DEVNULL
|
|
).decode()
|
|
return "low"
|
|
except Exception:
|
|
return None
|
|
elif platform.system() == "Linux":
|
|
try:
|
|
with open("/proc/meminfo") as f:
|
|
info = {}
|
|
for line in f:
|
|
parts = line.split(":")
|
|
if len(parts) == 2:
|
|
info[parts[0].strip()] = int(parts[1].strip().split()[0])
|
|
total = info.get("MemTotal", 1)
|
|
avail = info.get("MemAvailable", total)
|
|
ratio = avail / total
|
|
if ratio < 0.05:
|
|
return "critical"
|
|
elif ratio < 0.15:
|
|
return "high"
|
|
elif ratio < 0.30:
|
|
return "medium"
|
|
return "low"
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
async def build_node_load(worker_metrics: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Build NodeLoad object for capabilities payload."""
|
|
wm = worker_metrics or await fetch_worker_metrics()
|
|
|
|
inflight = wm.get("inflight_jobs", 0)
|
|
concurrency = wm.get("concurrency_limit", 1)
|
|
queue_depth = wm.get("queue_depth", 0)
|
|
|
|
llm_stats = get_latency_stats("ollama", "llm")
|
|
p50 = llm_stats["p50_ms"] or 1500
|
|
|
|
if inflight < concurrency:
|
|
estimated_wait = 0
|
|
else:
|
|
estimated_wait = (inflight - concurrency + 1) * p50
|
|
|
|
return {
|
|
"ts": int(time.time() * 1000),
|
|
"inflight_jobs": inflight,
|
|
"queue_depth": queue_depth,
|
|
"concurrency_limit": concurrency,
|
|
"estimated_wait_ms": estimated_wait,
|
|
"cpu_load_1m": get_cpu_load(),
|
|
"mem_pressure": get_mem_pressure(),
|
|
"rtt_ms_to_hub": None,
|
|
}
|
|
|
|
|
|
async def build_runtime_load(runtimes: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Build RuntimeLoad list from collected runtimes."""
|
|
result = []
|
|
for rt_name, rt_data in runtimes.items():
|
|
status = rt_data.get("status", "unknown")
|
|
healthy = status == "ok"
|
|
|
|
llm_stats = get_latency_stats(rt_name, "llm")
|
|
vis_stats = get_latency_stats(rt_name, "vision")
|
|
best_stats = vis_stats if vis_stats["samples"] > llm_stats["samples"] else llm_stats
|
|
|
|
result.append({
|
|
"runtime": rt_name,
|
|
"healthy": healthy,
|
|
"last_check_ms": int(time.time() * 1000),
|
|
"p50_ms": best_stats["p50_ms"],
|
|
"p95_ms": best_stats["p95_ms"],
|
|
})
|
|
return result
|