Files
Apple d9ce366538 feat(sofiia-console): idempotency_key, cursor pagination, and noda2 router fallback
Add BFF runtime support for chat idempotency (header priority over body) with bounded in-memory TTL/LRU replay cache, implement cursor-based pagination for chats and messages, and add a safe NODA2 local router fallback for legacy runs without NODE_ID.

Made-with: Cursor
2026-03-02 04:14:58 -08:00

185 lines
6.4 KiB
Python

"""Load nodes_registry and env."""
import os
from pathlib import Path
from typing import Any, Dict
try:
import yaml
except ImportError:
yaml = None
# In Docker: set CONFIG_DIR=/app/config. Else: repo root / config
if os.getenv("CONFIG_DIR"):
_CONFIG_DIR = Path(os.getenv("CONFIG_DIR")).resolve()
else:
_REPO_ROOT = Path(__file__).resolve().parent.parent.parent.parent
_CONFIG_DIR = _REPO_ROOT / "config"
if not _CONFIG_DIR.exists():
_CONFIG_DIR = Path("config")
_NODES_PATH = _CONFIG_DIR / "nodes_registry.yml"
_NODES_ALT = Path("config/nodes_registry.yml")
def get_nodes_registry_path() -> Path:
for p in (_NODES_PATH, _NODES_ALT, Path("config/nodes_registry.yml")):
if p.exists():
return p
return _NODES_PATH
def load_nodes_registry() -> Dict[str, Any]:
for p in (_NODES_PATH, _NODES_ALT, Path("config/nodes_registry.yml")):
if p.exists() and yaml:
try:
with open(p) as f:
return yaml.safe_load(f) or {}
except Exception:
pass
return {"nodes": {}, "defaults": {"health_timeout_sec": 10, "tools_timeout_sec": 30}}
def save_nodes_registry(data: Dict[str, Any]) -> Path:
if not yaml:
raise RuntimeError("PyYAML is not available")
path = get_nodes_registry_path()
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f, allow_unicode=True, sort_keys=False)
return path
def get_gateway_url(node_id: str) -> str:
"""Gateway URL for node; env override: NODES_<NODEID>_GATEWAY_URL."""
env_key = f"NODES_{node_id}_GATEWAY_URL"
if os.getenv(env_key):
return os.getenv(env_key)
reg = load_nodes_registry()
nodes = reg.get("nodes", {})
return (nodes.get(node_id) or {}).get("gateway_url", "")
def get_node_policy(node_id: str) -> Dict[str, Any]:
"""Return operational policy for a node (timeouts, role, retry)."""
reg = load_nodes_registry()
defaults = reg.get("defaults", {})
node_cfg = (reg.get("nodes", {}).get(node_id) or {})
return {
"node_role": node_cfg.get("node_role", "prod"),
"gateway_timeout_ms": int(node_cfg.get(
"gateway_timeout_ms",
defaults.get("gateway_timeout_ms", 2500),
)),
"apply_timeout_ms": int(node_cfg.get(
"apply_timeout_ms",
defaults.get("apply_timeout_ms", 10000),
)),
"get_retry": int(node_cfg.get("get_retry", defaults.get("get_retry", 1))),
"post_retry": int(node_cfg.get("post_retry", defaults.get("post_retry", 0))),
"enabled": node_cfg.get("enabled", True),
}
def get_router_url(node_id: str) -> str:
"""Router URL for node.
Priority:
1) NODES_<NODEID>_ROUTER_URL
2) ROUTER_URL for current NODE_ID (single-node/local dev fallback)
3) nodes_registry.yml value
4) hardcoded localhost fallback
"""
env_key = f"NODES_{node_id}_ROUTER_URL"
if os.getenv(env_key):
return os.getenv(env_key)
# Local fallback: when running console outside Docker, NODE_ID may be absent
# while only ROUTER_URL is configured (without per-node env override).
current_node = os.getenv("NODE_ID", "").strip().upper()
router_url = os.getenv("ROUTER_URL", "").strip()
target_node = str(node_id).strip().upper()
if router_url and current_node and current_node == target_node:
return router_url
# Compatibility fallback for legacy local startup scripts that target NODA2
# but do not export NODE_ID.
if router_url and not current_node and target_node == "NODA2":
return router_url
reg = load_nodes_registry()
nodes = reg.get("nodes", {})
return (nodes.get(node_id) or {}).get("router_url", "http://localhost:8000")
def get_node_ssh_profile(node_id: str) -> Dict[str, Any]:
"""SSH profile for node with env overrides.
Env overrides:
NODES_<NODEID>_SSH_HOST
NODES_<NODEID>_SSH_PORT
NODES_<NODEID>_SSH_USER
NODES_<NODEID>_SSH_PASSWORD
NODES_<NODEID>_SSH_PRIVATE_KEY
"""
reg = load_nodes_registry()
nodes = reg.get("nodes", {})
node = nodes.get(node_id, {}) or {}
ssh = dict(node.get("ssh") or {})
auth = dict(ssh.get("auth") or {})
prefix = f"NODES_{node_id}_SSH_"
host = os.getenv(f"{prefix}HOST", ssh.get("host", "")).strip()
user = os.getenv(f"{prefix}USER", ssh.get("user", "")).strip()
private_key = os.getenv(f"{prefix}PRIVATE_KEY", auth.get("private_key", "")).strip()
password_env = (auth.get("password_env") or f"{prefix}PASSWORD").strip()
password = os.getenv(f"{prefix}PASSWORD", os.getenv(password_env, "")).strip()
try:
port = int(os.getenv(f"{prefix}PORT", str(ssh.get("port", 22))))
except Exception:
port = 22
return {
"configured": bool(host and user),
"host": host,
"ipv6": ssh.get("ipv6", ""),
"port": port,
"user": user,
"host_keys": ssh.get("host_keys", []),
"auth": {
"password_env": password_env,
"password_set": bool(password),
"private_key_set": bool(private_key),
},
}
def get_memory_service_url() -> str:
"""Memory-service URL; env override: MEMORY_SERVICE_URL."""
if os.getenv("MEMORY_SERVICE_URL"):
return os.getenv("MEMORY_SERVICE_URL").rstrip("/")
reg = load_nodes_registry()
defaults = reg.get("defaults", {})
if defaults.get("memory_service_url"):
return defaults["memory_service_url"].rstrip("/")
return "http://localhost:8000"
def get_ollama_url() -> str:
"""Ollama URL; env override: OLLAMA_URL."""
return os.getenv("OLLAMA_URL", "http://localhost:11434").rstrip("/")
def is_voice_ha_enabled() -> bool:
"""Voice HA feature flag. Set VOICE_HA_ENABLED=true to opt-in.
When enabled, /api/voice/tts and /api/voice/chat/stream use Router
/v1/capability/voice_tts and /v1/capability/voice_llm endpoints for
multi-node failover instead of calling memory-service directly.
Default: False (safe for existing deployments).
"""
return os.getenv("VOICE_HA_ENABLED", "false").lower() in ("1", "true", "yes")
def get_voice_ha_router_url(node_id: str = "NODA2") -> str:
"""Router URL used for Voice HA offload. Defaults to same router as LLM."""
override = os.getenv("VOICE_HA_ROUTER_URL")
if override:
return override.rstrip("/")
return get_router_url(node_id).rstrip("/")