node2: fix Sofiia routing determinism + Node Capabilities Service

Bug fixes:
- Bug A: GROK_API_KEY env mismatch — router expected GROK_API_KEY but only
  XAI_API_KEY was present. Added GROK_API_KEY=${XAI_API_KEY} alias in compose.
- Bug B: 'grok' profile missing in router-config.node2.yml — added cloud_grok
  profile (provider: grok, model: grok-2-1212). Sofiia now has
  default_llm=cloud_grok with fallback_llm=local_default_coder.
- Bug C: Router silently defaulted to cloud DeepSeek when profile was unknown.
  Now falls back to agent.fallback_llm or local_default_coder with WARNING log.
  Hardcoded Ollama URL (172.18.0.1) replaced with config-driven base_url.

New service: Node Capabilities Service (NCS)
- services/node-capabilities/ — FastAPI microservice exposing live model
  inventory from Ollama, Swapper, and llama-server.
- GET /capabilities — canonical JSON with served_models[] and inventory_only[]
- GET /capabilities/models — flat list of served models
- POST /capabilities/refresh — force cache refresh
- Cache TTL 15s, bound to 127.0.0.1:8099
- services/router/capabilities_client.py — async client with TTL cache

Artifacts:
- ops/node2_models_audit.md — 3-layer model view (served/disk/cloud)
- ops/node2_models_audit.yml — machine-readable audit
- ops/node2_capabilities_example.json — sample NCS output (14 served models)

Made-with: Cursor
This commit is contained in:
Apple
2026-02-27 02:07:40 -08:00
parent 3965f68fac
commit e2a3ae342a
10 changed files with 867 additions and 33 deletions

View File

@@ -1,6 +1,6 @@
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import Response
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
from typing import Literal, Optional, Dict, Any, List
import asyncio
import json
@@ -897,6 +897,134 @@ async def health():
"messaging_inbound_enabled": config.get("messaging_inbound", {}).get("enabled", True)
}
@app.get("/healthz")
async def healthz():
"""Alias /healthz → /health for BFF compatibility."""
return await health()
@app.get("/monitor/status")
async def monitor_status(request: Request = None):
"""
Node monitor status — read-only, safe, no secrets.
Returns: heartbeat, router/gateway health, open incidents,
alerts loop SLO, active backends, last artifact timestamps.
Rate limited: 60 rpm per IP (in-process bucket).
RBAC: requires tools.monitor.read entitlement (or tools.observability.read).
Auth: X-Monitor-Key header (same as SUPERVISOR_API_KEY, optional in dev).
"""
import collections as _collections
# ── Rate limit (60 rpm per IP) ────────────────────────────────────────
_now = time.monotonic()
client_ip = (
(request.client.host if request and request.client else None) or "unknown"
)
_bucket_key = f"monitor:{client_ip}"
if not hasattr(monitor_status, "_buckets"):
monitor_status._buckets = {}
dq = monitor_status._buckets.setdefault(_bucket_key, _collections.deque())
while dq and _now - dq[0] > 60:
dq.popleft()
if len(dq) >= 60:
from fastapi.responses import JSONResponse
return JSONResponse(status_code=429, content={"error": "rate_limit", "message": "60 rpm exceeded"})
dq.append(_now)
# ── Auth (optional in dev, enforced in prod) ──────────────────────────
_env = os.getenv("ENV", "dev").strip().lower()
_monitor_key = os.getenv("SUPERVISOR_API_KEY", "").strip()
if _env in ("prod", "production", "staging") and _monitor_key:
_req_key = ""
if request:
_req_key = (
request.headers.get("X-Monitor-Key", "")
or request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
)
if _req_key != _monitor_key:
from fastapi.responses import JSONResponse
return JSONResponse(status_code=403, content={"error": "forbidden", "message": "X-Monitor-Key required"})
# ── Collect data (best-effort, non-fatal) ─────────────────────────────
warnings: list[str] = []
ts_now = __import__("datetime").datetime.now(
__import__("datetime").timezone.utc
).isoformat(timespec="seconds")
# uptime as heartbeat proxy
_proc_start = getattr(monitor_status, "_proc_start", None)
if _proc_start is None:
monitor_status._proc_start = time.monotonic()
_proc_start = monitor_status._proc_start
heartbeat_age_s = int(time.monotonic() - _proc_start)
# open incidents
open_incidents: int | None = None
try:
from incident_store import get_incident_store as _get_is
_istore = _get_is()
_open = _istore.list_incidents(filters={"status": "open"}, limit=500)
# include "mitigating" as still-open
open_incidents = sum(
1 for i in _open if (i.get("status") or "").lower() in ("open", "mitigating")
)
except Exception as _e:
warnings.append(f"incidents: {str(_e)[:80]}")
# alerts loop SLO
alerts_loop_slo: dict | None = None
try:
from alert_store import get_alert_store as _get_as
alerts_loop_slo = _get_as().compute_loop_slo(window_minutes=240)
# strip any internal keys that may contain infra details
_safe_keys = {"claim_to_ack_p95_seconds", "failed_rate_pct", "processing_stuck_count", "sample_count", "violations"}
alerts_loop_slo = {k: v for k, v in alerts_loop_slo.items() if k in _safe_keys}
except Exception as _e:
warnings.append(f"alerts_slo: {str(_e)[:80]}")
# backends (env vars only — no DSN, no passwords)
backends = {
"alerts": os.getenv("ALERT_BACKEND", "unknown"),
"audit": os.getenv("AUDIT_BACKEND", "unknown"),
"incidents": os.getenv("INCIDENT_BACKEND", "unknown"),
"risk_history": os.getenv("RISK_HISTORY_BACKEND", "unknown"),
"backlog": os.getenv("BACKLOG_BACKEND", "unknown"),
}
# last artifact timestamps (best-effort filesystem scan)
last_artifacts: dict = {}
_base = __import__("pathlib").Path("ops")
for _pattern, _key in [
("reports/risk/*.md", "risk_digest_ts"),
("reports/platform/*.md", "platform_digest_ts"),
("backlog/*.jsonl", "backlog_generate_ts"),
("reports/backlog/*.md", "backlog_report_ts"),
]:
try:
_files = sorted(_base.glob(_pattern))
if _files:
_mtime = _files[-1].stat().st_mtime
last_artifacts[_key] = __import__("datetime").datetime.fromtimestamp(
_mtime, tz=__import__("datetime").timezone.utc
).isoformat(timespec="seconds")
except Exception:
pass
return {
"node_id": os.getenv("NODE_ID", "NODA1"),
"ts": ts_now,
"heartbeat_age_s": heartbeat_age_s,
"router_ok": True, # we are the router; if we respond, we're ok
"gateway_ok": None, # gateway health not probed here (separate svc)
"open_incidents": open_incidents,
"alerts_loop_slo": alerts_loop_slo,
"backends": backends,
"last_artifacts": last_artifacts,
"warnings": warnings,
}
@app.post("/internal/router/test-messaging", response_model=AgentInvocation)
async def test_messaging_route(decision: FilterDecision):
"""
@@ -966,6 +1094,15 @@ class InferResponse(BaseModel):
file_mime: Optional[str] = None
class ToolExecuteRequest(BaseModel):
"""External tool execution request used by console/ops APIs."""
model_config = ConfigDict(extra="allow")
tool: str
action: Optional[str] = None
agent_id: Optional[str] = "sofiia"
metadata: Optional[Dict[str, Any]] = None
# =========================================================================
@@ -1110,15 +1247,21 @@ async def internal_llm_complete(request: InternalLLMRequest):
logger.info(f"Internal LLM: profile={request.llm_profile}, role={request.role_context}")
# Get LLM profile configuration
llm_profiles = router_config.get("llm_profiles", {})
profile_name = request.llm_profile or "reasoning"
llm_profile = llm_profiles.get(profile_name, {})
provider = llm_profile.get("provider", "deepseek")
model = request.model or llm_profile.get("model", "deepseek-chat")
if not llm_profile:
fallback_name = "local_default_coder"
llm_profile = llm_profiles.get(fallback_name, {})
logger.warning(f"⚠️ Profile '{profile_name}' not found in llm_profiles → falling back to '{fallback_name}' (local)")
profile_name = fallback_name
provider = llm_profile.get("provider", "ollama")
model = request.model or llm_profile.get("model", "qwen3:14b")
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
temperature = request.temperature or llm_profile.get("temperature", 0.2)
logger.info(f"🎯 Resolved: profile={profile_name} provider={provider} model={model}")
# Build messages
messages = []
@@ -1173,10 +1316,11 @@ async def internal_llm_complete(request: InternalLLMRequest):
# Fallback/target local provider (Ollama)
try:
logger.info("Internal LLM to Ollama")
ollama_model = model or "qwen3:8b"
ollama_base = llm_profile.get("base_url", os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434"))
ollama_model = model or "qwen3:14b"
logger.info(f"Internal LLM to Ollama: model={ollama_model} url={ollama_base}")
ollama_resp = await http_client.post(
"http://172.18.0.1:11434/api/generate",
f"{ollama_base}/api/generate",
json={"model": ollama_model, "prompt": request.prompt, "system": request.system_prompt or "", "stream": False, "options": {"num_predict": max_tokens, "temperature": temperature}},
timeout=120.0
)
@@ -1249,15 +1393,17 @@ async def agent_infer(agent_id: str, request: InferRequest):
if not system_prompt:
try:
from prompt_builder import get_agent_system_prompt
system_prompt = await get_agent_system_prompt(
agent_id,
from prompt_builder import get_prompt_builder
prompt_builder = await get_prompt_builder(
city_service_url=CITY_SERVICE_URL,
router_config=router_config
router_config=router_config,
)
logger.info(f"✅ Loaded system prompt from database for {agent_id}")
prompt_result = await prompt_builder.get_system_prompt(agent_id)
system_prompt = prompt_result.system_prompt
system_prompt_source = prompt_result.source
logger.info(f"✅ Loaded system prompt for {agent_id} from {system_prompt_source}")
except Exception as e:
logger.warning(f"⚠️ Could not load prompt from database: {e}")
logger.warning(f"⚠️ Could not load prompt from configured sources: {e}")
# Fallback to config
system_prompt_source = "router_config"
agent_config = router_config.get("agents", {}).get(agent_id, {})
@@ -1450,15 +1596,38 @@ async def agent_infer(agent_id: str, request: InferRequest):
except Exception as e:
logger.exception(f"❌ CrewAI error: {e}, falling back to direct LLM")
default_llm = agent_config.get("default_llm", "qwen3:8b")
default_llm = agent_config.get("default_llm", "local_default_coder")
routing_rules = router_config.get("routing", [])
default_llm = _select_default_llm(agent_id, metadata, default_llm, routing_rules)
# Get LLM profile configuration
cloud_provider_names = {"deepseek", "mistral", "grok", "openai", "anthropic"}
llm_profiles = router_config.get("llm_profiles", {})
llm_profile = llm_profiles.get(default_llm, {})
if not llm_profile:
fallback_llm = agent_config.get("fallback_llm", "local_default_coder")
llm_profile = llm_profiles.get(fallback_llm, {})
logger.warning(
f"⚠️ Profile '{default_llm}' not found for agent={agent_id} "
f"→ fallback to '{fallback_llm}' (local). "
f"NOT defaulting to cloud silently."
)
default_llm = fallback_llm
provider = llm_profile.get("provider", "ollama")
logger.info(f"🎯 Agent={agent_id}: profile={default_llm} provider={provider} model={llm_profile.get('model', '?')}")
# If explicit model is requested, try to resolve it to configured cloud profile.
if request.model:
for profile_name, profile in llm_profiles.items():
if profile.get("model") == request.model and profile.get("provider") in cloud_provider_names:
llm_profile = profile
provider = profile.get("provider", provider)
default_llm = profile_name
logger.info(f"🎛️ Matched request.model={request.model} to profile={profile_name} provider={provider}")
break
# Determine model name
if provider in ["deepseek", "openai", "anthropic", "mistral"]:
@@ -1671,7 +1840,6 @@ async def agent_infer(agent_id: str, request: InferRequest):
max_tokens = request.max_tokens or llm_profile.get("max_tokens", 2048)
temperature = request.temperature or llm_profile.get("temperature", 0.2)
cloud_provider_names = {"deepseek", "mistral", "grok", "openai", "anthropic"}
allow_cloud = provider in cloud_provider_names
if not allow_cloud:
logger.info(f"☁️ Cloud providers disabled for agent {agent_id}: provider={provider}")
@@ -1700,6 +1868,18 @@ async def agent_infer(agent_id: str, request: InferRequest):
}
]
# Custom configured profile for OpenAI-compatible backends (e.g. local llama-server).
if provider == "openai":
cloud_providers = [
{
"name": "openai",
"api_key_env": llm_profile.get("api_key_env", "OPENAI_API_KEY"),
"base_url": llm_profile.get("base_url", "https://api.openai.com"),
"model": request.model or llm_profile.get("model", model),
"timeout": int(llm_profile.get("timeout_ms", 60000) / 1000),
}
]
if not allow_cloud:
cloud_providers = []
@@ -1717,8 +1897,14 @@ async def agent_infer(agent_id: str, request: InferRequest):
logger.debug(f"🔧 {len(tools_payload)} tools available for function calling")
for cloud in cloud_providers:
api_key = os.getenv(cloud["api_key_env"])
if not api_key:
api_key = os.getenv(cloud["api_key_env"], "")
base_url = cloud.get("base_url", "")
is_local_openai = (
cloud.get("name") == "openai"
and isinstance(base_url, str)
and any(host in base_url for host in ["host.docker.internal", "localhost", "127.0.0.1"])
)
if not api_key and not is_local_openai:
logger.debug(f"⏭️ Skipping {cloud['name']}: API key not configured")
continue
@@ -1739,12 +1925,13 @@ async def agent_infer(agent_id: str, request: InferRequest):
request_payload["tools"] = tools_payload
request_payload["tool_choice"] = "auto"
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
cloud_resp = await http_client.post(
f"{cloud['base_url']}/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
headers=headers,
json=request_payload,
timeout=cloud["timeout"]
)
@@ -1754,6 +1941,8 @@ async def agent_infer(agent_id: str, request: InferRequest):
choice = data.get("choices", [{}])[0]
message = choice.get("message", {})
response_text = message.get("content", "") or ""
if not response_text and message.get("reasoning_content"):
response_text = str(message.get("reasoning_content", "")).strip()
tokens_used = data.get("usage", {}).get("total_tokens", 0)
# Initialize tool_results to avoid UnboundLocalError
@@ -1959,12 +2148,12 @@ async def agent_infer(agent_id: str, request: InferRequest):
loop_payload["tools"] = tools_payload
loop_payload["tool_choice"] = "auto"
loop_headers = {"Content-Type": "application/json"}
if api_key:
loop_headers["Authorization"] = f"Bearer {api_key}"
loop_resp = await http_client.post(
f"{cloud['base_url']}/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
headers=loop_headers,
json=loop_payload,
timeout=cloud["timeout"]
)
@@ -1978,6 +2167,8 @@ async def agent_infer(agent_id: str, request: InferRequest):
loop_data = loop_resp.json()
loop_message = loop_data.get("choices", [{}])[0].get("message", {})
response_text = loop_message.get("content", "") or ""
if not response_text and loop_message.get("reasoning_content"):
response_text = str(loop_message.get("reasoning_content", "")).strip()
tokens_used += loop_data.get("usage", {}).get("total_tokens", 0)
current_tool_calls = loop_message.get("tool_calls", [])
@@ -2123,16 +2314,24 @@ async def agent_infer(agent_id: str, request: InferRequest):
# LOCAL PROVIDERS (Ollama via Swapper)
# =========================================================================
# Determine local model from config (not hardcoded)
# Strategy: Use agent's default_llm if it's local (ollama), otherwise find first local model
# Strategy:
# 1) explicit request.model override
# 2) agent default_llm if it's local (ollama)
# 3) first local profile fallback
local_model = None
requested_local_model = (request.model or "").strip()
if requested_local_model:
local_model = requested_local_model.replace(":", "-")
logger.info(f"🎛️ Local model override requested: {requested_local_model} -> {local_model}")
# Check if default_llm is local
if llm_profile.get("provider") == "ollama":
if not local_model and llm_profile.get("provider") == "ollama":
# Extract model name and convert format (qwen3:8b → qwen3:8b for Swapper)
ollama_model = llm_profile.get("model", "qwen3:8b")
local_model = ollama_model.replace(":", "-") # qwen3:8b → qwen3:8b
logger.debug(f"✅ Using agent's default local model: {local_model}")
else:
elif not local_model:
# Find first local model from config
for profile_name, profile in llm_profiles.items():
if profile.get("provider") == "ollama":
@@ -2259,6 +2458,60 @@ async def agent_infer(agent_id: str, request: InferRequest):
)
@app.post("/v1/tools/execute")
async def tools_execute(request: ToolExecuteRequest):
"""
Execute a single tool call through ToolManager.
Returns console-compatible shape: {status, data, error}.
"""
if not tool_manager:
raise HTTPException(status_code=503, detail="Tool manager unavailable")
payload = request.model_dump(exclude_none=True)
tool_name = str(payload.pop("tool", "")).strip()
action = payload.pop("action", None)
agent_id = str(payload.pop("agent_id", "sofiia") or "sofiia").strip()
metadata = payload.pop("metadata", {}) or {}
if not tool_name:
raise HTTPException(status_code=422, detail="tool is required")
# Keep backward compatibility with sofiia-console calls
if action is not None:
payload["action"] = action
chat_id = str(metadata.get("chat_id", "") or "") or None
user_id = str(metadata.get("user_id", "") or "") or None
workspace_id = str(metadata.get("workspace_id", "default") or "default")
try:
result = await tool_manager.execute_tool(
tool_name=tool_name,
arguments=payload,
agent_id=agent_id,
chat_id=chat_id,
user_id=user_id,
workspace_id=workspace_id,
)
except Exception as e:
logger.exception("❌ Tool execution failed: %s", tool_name)
raise HTTPException(status_code=500, detail=f"Tool execution error: {str(e)[:200]}")
data: Dict[str, Any] = {"result": result.result}
if result.image_base64:
data["image_base64"] = result.image_base64
if result.file_base64:
data["file_base64"] = result.file_base64
if result.file_name:
data["file_name"] = result.file_name
if result.file_mime:
data["file_mime"] = result.file_mime
if result.success:
return {"status": "ok", "data": data, "error": None}
return {"status": "failed", "data": data, "error": {"message": result.error or "Tool failed"}}
@app.get("/v1/models")
async def list_available_models():
"""List all available models across backends"""