feat(sofiia-console): idempotency_key, cursor pagination, and noda2 router fallback

Add BFF runtime support for chat idempotency (header priority over body) with bounded in-memory TTL/LRU replay cache, implement cursor-based pagination for chats and messages, and add a safe NODA2 local router fallback for legacy runs without NODE_ID.

Made-with: Cursor
This commit is contained in:
Apple
2026-03-02 04:14:58 -08:00
parent 5a886a56ca
commit d9ce366538
3 changed files with 6326 additions and 8 deletions

View File

@@ -0,0 +1,184 @@
"""Load nodes_registry and env."""
import os
from pathlib import Path
from typing import Any, Dict
try:
import yaml
except ImportError:
yaml = None
# In Docker: set CONFIG_DIR=/app/config. Else: repo root / config
if os.getenv("CONFIG_DIR"):
_CONFIG_DIR = Path(os.getenv("CONFIG_DIR")).resolve()
else:
_REPO_ROOT = Path(__file__).resolve().parent.parent.parent.parent
_CONFIG_DIR = _REPO_ROOT / "config"
if not _CONFIG_DIR.exists():
_CONFIG_DIR = Path("config")
_NODES_PATH = _CONFIG_DIR / "nodes_registry.yml"
_NODES_ALT = Path("config/nodes_registry.yml")
def get_nodes_registry_path() -> Path:
for p in (_NODES_PATH, _NODES_ALT, Path("config/nodes_registry.yml")):
if p.exists():
return p
return _NODES_PATH
def load_nodes_registry() -> Dict[str, Any]:
for p in (_NODES_PATH, _NODES_ALT, Path("config/nodes_registry.yml")):
if p.exists() and yaml:
try:
with open(p) as f:
return yaml.safe_load(f) or {}
except Exception:
pass
return {"nodes": {}, "defaults": {"health_timeout_sec": 10, "tools_timeout_sec": 30}}
def save_nodes_registry(data: Dict[str, Any]) -> Path:
if not yaml:
raise RuntimeError("PyYAML is not available")
path = get_nodes_registry_path()
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f, allow_unicode=True, sort_keys=False)
return path
def get_gateway_url(node_id: str) -> str:
"""Gateway URL for node; env override: NODES_<NODEID>_GATEWAY_URL."""
env_key = f"NODES_{node_id}_GATEWAY_URL"
if os.getenv(env_key):
return os.getenv(env_key)
reg = load_nodes_registry()
nodes = reg.get("nodes", {})
return (nodes.get(node_id) or {}).get("gateway_url", "")
def get_node_policy(node_id: str) -> Dict[str, Any]:
"""Return operational policy for a node (timeouts, role, retry)."""
reg = load_nodes_registry()
defaults = reg.get("defaults", {})
node_cfg = (reg.get("nodes", {}).get(node_id) or {})
return {
"node_role": node_cfg.get("node_role", "prod"),
"gateway_timeout_ms": int(node_cfg.get(
"gateway_timeout_ms",
defaults.get("gateway_timeout_ms", 2500),
)),
"apply_timeout_ms": int(node_cfg.get(
"apply_timeout_ms",
defaults.get("apply_timeout_ms", 10000),
)),
"get_retry": int(node_cfg.get("get_retry", defaults.get("get_retry", 1))),
"post_retry": int(node_cfg.get("post_retry", defaults.get("post_retry", 0))),
"enabled": node_cfg.get("enabled", True),
}
def get_router_url(node_id: str) -> str:
"""Router URL for node.
Priority:
1) NODES_<NODEID>_ROUTER_URL
2) ROUTER_URL for current NODE_ID (single-node/local dev fallback)
3) nodes_registry.yml value
4) hardcoded localhost fallback
"""
env_key = f"NODES_{node_id}_ROUTER_URL"
if os.getenv(env_key):
return os.getenv(env_key)
# Local fallback: when running console outside Docker, NODE_ID may be absent
# while only ROUTER_URL is configured (without per-node env override).
current_node = os.getenv("NODE_ID", "").strip().upper()
router_url = os.getenv("ROUTER_URL", "").strip()
target_node = str(node_id).strip().upper()
if router_url and current_node and current_node == target_node:
return router_url
# Compatibility fallback for legacy local startup scripts that target NODA2
# but do not export NODE_ID.
if router_url and not current_node and target_node == "NODA2":
return router_url
reg = load_nodes_registry()
nodes = reg.get("nodes", {})
return (nodes.get(node_id) or {}).get("router_url", "http://localhost:8000")
def get_node_ssh_profile(node_id: str) -> Dict[str, Any]:
"""SSH profile for node with env overrides.
Env overrides:
NODES_<NODEID>_SSH_HOST
NODES_<NODEID>_SSH_PORT
NODES_<NODEID>_SSH_USER
NODES_<NODEID>_SSH_PASSWORD
NODES_<NODEID>_SSH_PRIVATE_KEY
"""
reg = load_nodes_registry()
nodes = reg.get("nodes", {})
node = nodes.get(node_id, {}) or {}
ssh = dict(node.get("ssh") or {})
auth = dict(ssh.get("auth") or {})
prefix = f"NODES_{node_id}_SSH_"
host = os.getenv(f"{prefix}HOST", ssh.get("host", "")).strip()
user = os.getenv(f"{prefix}USER", ssh.get("user", "")).strip()
private_key = os.getenv(f"{prefix}PRIVATE_KEY", auth.get("private_key", "")).strip()
password_env = (auth.get("password_env") or f"{prefix}PASSWORD").strip()
password = os.getenv(f"{prefix}PASSWORD", os.getenv(password_env, "")).strip()
try:
port = int(os.getenv(f"{prefix}PORT", str(ssh.get("port", 22))))
except Exception:
port = 22
return {
"configured": bool(host and user),
"host": host,
"ipv6": ssh.get("ipv6", ""),
"port": port,
"user": user,
"host_keys": ssh.get("host_keys", []),
"auth": {
"password_env": password_env,
"password_set": bool(password),
"private_key_set": bool(private_key),
},
}
def get_memory_service_url() -> str:
"""Memory-service URL; env override: MEMORY_SERVICE_URL."""
if os.getenv("MEMORY_SERVICE_URL"):
return os.getenv("MEMORY_SERVICE_URL").rstrip("/")
reg = load_nodes_registry()
defaults = reg.get("defaults", {})
if defaults.get("memory_service_url"):
return defaults["memory_service_url"].rstrip("/")
return "http://localhost:8000"
def get_ollama_url() -> str:
"""Ollama URL; env override: OLLAMA_URL."""
return os.getenv("OLLAMA_URL", "http://localhost:11434").rstrip("/")
def is_voice_ha_enabled() -> bool:
"""Voice HA feature flag. Set VOICE_HA_ENABLED=true to opt-in.
When enabled, /api/voice/tts and /api/voice/chat/stream use Router
/v1/capability/voice_tts and /v1/capability/voice_llm endpoints for
multi-node failover instead of calling memory-service directly.
Default: False (safe for existing deployments).
"""
return os.getenv("VOICE_HA_ENABLED", "false").lower() in ("1", "true", "yes")
def get_voice_ha_router_url(node_id: str = "NODA2") -> str:
"""Router URL used for Voice HA offload. Defaults to same router as LLM."""
override = os.getenv("VOICE_HA_ROUTER_URL")
if override:
return override.rstrip("/")
return get_router_url(node_id).rstrip("/")

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,7 @@ Runtime contract (project/session/user), full status, WebSocket events,
voice proxy, ops, nodes. UI never calls external services directly. voice proxy, ops, nodes. UI never calls external services directly.
""" """
import asyncio import asyncio
import base64
import io import io
import json import json
import os import os
@@ -70,6 +71,10 @@ _NODE_ID = os.getenv("NODE_ID", os.getenv("HOSTNAME", "noda2"))
# ── Rate limiter ────────────────────────────────────────────────────────────── # ── Rate limiter ──────────────────────────────────────────────────────────────
_rate_buckets: Dict[str, collections.deque] = {} _rate_buckets: Dict[str, collections.deque] = {}
# ── Chat idempotency cache (TTL in-memory) ───────────────────────────────────
_IDEMPOTENCY_TTL_SEC = int(os.getenv("CHAT_IDEMPOTENCY_TTL_SEC", "900"))
_idempotency_cache: "collections.OrderedDict[str, Dict[str, Any]]" = collections.OrderedDict()
def _check_rate(key: str, max_calls: int, window_sec: int = 60) -> bool: def _check_rate(key: str, max_calls: int, window_sec: int = 60) -> bool:
now = time.monotonic() now = time.monotonic()
dq = _rate_buckets.setdefault(key, collections.deque()) dq = _rate_buckets.setdefault(key, collections.deque())
@@ -80,6 +85,44 @@ def _check_rate(key: str, max_calls: int, window_sec: int = 60) -> bool:
dq.append(now) dq.append(now)
return True return True
def _idem_cleanup(now: Optional[float] = None) -> None:
ts = now if now is not None else time.monotonic()
while _idempotency_cache:
first_key = next(iter(_idempotency_cache))
exp = float((_idempotency_cache[first_key] or {}).get("expires_at", 0))
if exp > ts:
break
_idempotency_cache.popitem(last=False)
def _idem_get(chat_id: str, idem_key: str) -> Optional[Dict[str, Any]]:
_idem_cleanup()
cache_key = f"{chat_id}::{idem_key}"
hit = _idempotency_cache.get(cache_key)
if not hit:
return None
# Touch LRU
_idempotency_cache.move_to_end(cache_key, last=True)
payload = hit.get("payload")
return payload if isinstance(payload, dict) else None
def _idem_put(chat_id: str, idem_key: str, payload: Dict[str, Any]) -> None:
if not idem_key:
return
now = time.monotonic()
_idem_cleanup(now)
cache_key = f"{chat_id}::{idem_key}"
_idempotency_cache[cache_key] = {
"expires_at": now + max(60, _IDEMPOTENCY_TTL_SEC),
"payload": payload,
}
_idempotency_cache.move_to_end(cache_key, last=True)
# Bound memory growth
while len(_idempotency_cache) > 5000:
_idempotency_cache.popitem(last=False)
# ── Voice error rings (repro pack for incident diagnosis) ───────────────────── # ── Voice error rings (repro pack for incident diagnosis) ─────────────────────
# Circular buffers: last 5 TTS errors and last 5 LLM errors. # Circular buffers: last 5 TTS errors and last 5 LLM errors.
# Populated by all voice endpoints. Read by /api/voice/degradation_status. # Populated by all voice endpoints. Read by /api/voice/degradation_status.
@@ -1400,7 +1443,7 @@ async def _smart_monitor_run(run_id: str) -> None:
run["kling"] = { run["kling"] = {
**kling, **kling,
"status": "failed", "status": "failed",
"error": str(exc)[:320], "error": str(exc)[:640],
} }
run["status"] = "completed" run["status"] = "completed"
run["phase"] = "completed_with_kling_failure" run["phase"] = "completed_with_kling_failure"
@@ -1415,6 +1458,7 @@ async def _smart_monitor_run(run_id: str) -> None:
**kling, **kling,
"task_id": task_id, "task_id": task_id,
"status": str(submit.get("status") or "submitted").lower(), "status": str(submit.get("status") or "submitted").lower(),
"endpoint": str(submit.get("kling_endpoint") or "video2video"),
"submitted_at": _smart_now_iso(), "submitted_at": _smart_now_iso(),
} }
_smart_append_audit(run, "kling.submitted", {"task_id": task_id}) _smart_append_audit(run, "kling.submitted", {"task_id": task_id})
@@ -2638,7 +2682,7 @@ async def api_aurora_report_pdf(job_id: str) -> StreamingResponse:
@app.get("/api/aurora/files/{job_id}/{file_name:path}") @app.get("/api/aurora/files/{job_id}/{file_name:path}")
async def api_aurora_file(job_id: str, file_name: str) -> StreamingResponse: async def api_aurora_file(job_id: str, file_name: str, request: Request) -> StreamingResponse:
encoded_job = quote(job_id, safe="") encoded_job = quote(job_id, safe="")
encoded_name = quote(file_name, safe="") encoded_name = quote(file_name, safe="")
paths = [AURORA_SERVICE_URL] paths = [AURORA_SERVICE_URL]
@@ -2649,7 +2693,13 @@ async def api_aurora_file(job_id: str, file_name: str) -> StreamingResponse:
url = f"{base}/api/aurora/files/{encoded_job}/{encoded_name}" url = f"{base}/api/aurora/files/{encoded_job}/{encoded_name}"
client = httpx.AsyncClient(timeout=httpx.Timeout(10.0, read=300.0)) client = httpx.AsyncClient(timeout=httpx.Timeout(10.0, read=300.0))
try: try:
resp = await client.send(client.build_request("GET", url), stream=True) upstream_headers: Dict[str, str] = {}
for name in ("range", "if-range", "if-none-match", "if-modified-since"):
value = request.headers.get(name)
if value:
upstream_headers[name] = value
resp = await client.send(client.build_request("GET", url, headers=upstream_headers), stream=True)
if resp.status_code >= 400: if resp.status_code >= 400:
body = (await resp.aread()).decode(errors="replace")[:400] body = (await resp.aread()).decode(errors="replace")[:400]
await resp.aclose() await resp.aclose()
@@ -2659,7 +2709,22 @@ async def api_aurora_file(job_id: str, file_name: str) -> StreamingResponse:
continue continue
raise HTTPException(status_code=resp.status_code, detail=body or f"Aurora file error {resp.status_code}") raise HTTPException(status_code=resp.status_code, detail=body or f"Aurora file error {resp.status_code}")
ct = resp.headers.get("content-type", "application/octet-stream") ct = resp.headers.get("content-type", "application/octet-stream")
disp = resp.headers.get("content-disposition", f'inline; filename="{Path(file_name).name}"') passthrough_headers: Dict[str, str] = {}
for name in (
"content-disposition",
"content-length",
"content-range",
"accept-ranges",
"etag",
"last-modified",
"cache-control",
):
value = resp.headers.get(name)
if value:
passthrough_headers[name] = value
if "content-disposition" not in passthrough_headers:
passthrough_headers["content-disposition"] = f'inline; filename="{Path(file_name).name}"'
passthrough_headers.setdefault("cache-control", "no-store")
async def _stream(): async def _stream():
try: try:
@@ -2671,8 +2736,9 @@ async def api_aurora_file(job_id: str, file_name: str) -> StreamingResponse:
return StreamingResponse( return StreamingResponse(
_stream(), _stream(),
status_code=resp.status_code,
media_type=ct, media_type=ct,
headers={"Content-Disposition": disp, "Cache-Control": "no-store"}, headers=passthrough_headers,
) )
except HTTPException: except HTTPException:
raise raise
@@ -2977,6 +3043,340 @@ class ChatSendBody(BaseModel):
voice_profile: Optional[str] = None voice_profile: Optional[str] = None
CHAT_PROJECT_ID = "chats"
class ChatCreateBody(BaseModel):
agent_id: str
node_id: str = "NODA2"
source: str = "console"
external_chat_ref: Optional[str] = None
title: Optional[str] = None
class ChatMessageSendBody(BaseModel):
text: str
attachments: List[Dict[str, Any]] = []
project_id: Optional[str] = None
session_id: Optional[str] = None
user_id: Optional[str] = None
routing: Optional[Dict[str, Any]] = None
client: Optional[Dict[str, Any]] = None
idempotency_key: Optional[str] = None
def _make_chat_id(node_id: str, agent_id: str, source: str = "console", external_chat_ref: Optional[str] = None) -> str:
ext = (external_chat_ref or "main").strip() or "main"
return f"chat:{node_id.upper()}:{agent_id.strip().lower()}:{source.strip().lower()}:{ext}"
def _parse_chat_id(chat_id: str) -> Dict[str, str]:
raw = (chat_id or "").strip()
parts = raw.split(":", 4)
if len(parts) == 5 and parts[0] == "chat":
return {
"chat_id": raw,
"node_id": parts[1].upper(),
"agent_id": parts[2].lower(),
"source": parts[3].lower(),
"external_chat_ref": parts[4],
}
# Legacy fallback: treat arbitrary session_id as local NODA2 chat with sofiia
return {
"chat_id": raw,
"node_id": "NODA2",
"agent_id": "sofiia",
"source": "console",
"external_chat_ref": raw or "main",
}
async def _ensure_chat_project() -> None:
proj = await _app_db.get_project(CHAT_PROJECT_ID)
if not proj:
await _app_db.create_project(
name="Chats",
description="Cross-node chat index for Sofiia Console",
project_id=CHAT_PROJECT_ID,
)
def _clean_chat_reply(text: str) -> str:
import re
cleaned = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE)
if "<think>" in cleaned.lower():
cleaned = re.split(r"(?i)<think>", cleaned)[0]
return cleaned.strip()
def _cursor_encode(payload: Dict[str, Any]) -> str:
raw = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii")
def _cursor_decode(cursor: Optional[str]) -> Dict[str, Any]:
if not cursor:
return {}
try:
decoded = base64.urlsafe_b64decode(cursor.encode("ascii")).decode("utf-8")
data = json.loads(decoded)
return data if isinstance(data, dict) else {}
except Exception:
return {}
@app.get("/api/chats")
async def api_chats_list(
nodes: str = Query("NODA1,NODA2"),
agent_id: Optional[str] = Query(None),
q: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
cursor: Optional[str] = Query(None),
_auth: str = Depends(require_auth),
):
await _ensure_chat_project()
node_filter = {n.strip().upper() for n in nodes.split(",") if n.strip()}
cur = _cursor_decode(cursor)
before_last_active = str(cur.get("last_active") or "").strip() or None
before_chat_id = str(cur.get("chat_id") or "").strip() or None
fetch_limit = max(limit * 5, limit + 1)
sessions = await _app_db.list_sessions_page(
CHAT_PROJECT_ID,
limit=fetch_limit,
before_last_active=before_last_active,
before_session_id=before_chat_id,
)
items: List[Dict[str, Any]] = []
agent_filter = (agent_id or "").strip().lower()
q_filter = (q or "").strip().lower()
for s in sessions:
sid = str(s.get("session_id") or "")
if not sid:
continue
info = _parse_chat_id(sid)
if node_filter and info["node_id"] not in node_filter:
continue
if agent_filter and info["agent_id"] != agent_filter:
continue
msgs = await _app_db.list_messages(sid, limit=200)
last = msgs[-1] if msgs else None
item = {
"chat_id": sid,
"title": (s.get("title") or f"{info['agent_id']}{info['node_id']}").strip(),
"agent_id": info["agent_id"],
"node_id": info["node_id"],
"source": info["source"],
"external_chat_ref": info["external_chat_ref"],
"updated_at": s.get("last_active"),
"last_message": (
{
"message_id": last.get("msg_id"),
"role": last.get("role"),
"text": (last.get("content") or "")[:280],
"ts": last.get("ts"),
} if last else None
),
"turn_count": s.get("turn_count", 0),
}
if q_filter:
hay = " ".join(
[
item["title"],
item["agent_id"],
item["node_id"],
(item["last_message"] or {}).get("text", ""),
]
).lower()
if q_filter not in hay:
continue
items.append(item)
if len(items) >= limit:
break
next_cursor = None
if items:
last_item = items[-1]
next_cursor = _cursor_encode(
{
"last_active": last_item.get("updated_at"),
"chat_id": last_item.get("chat_id"),
}
)
has_more = len(sessions) >= fetch_limit or len(items) >= limit
return {
"items": items,
"count": len(items),
"nodes": sorted(node_filter),
"project_id": CHAT_PROJECT_ID,
"next_cursor": next_cursor,
"has_more": has_more,
}
@app.post("/api/chats")
async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_auth)):
await _ensure_chat_project()
cid = _make_chat_id(
node_id=body.node_id,
agent_id=body.agent_id,
source=body.source,
external_chat_ref=body.external_chat_ref,
)
info = _parse_chat_id(cid)
title = (body.title or f"{info['agent_id']}{info['node_id']}{info['source']}").strip()
sess = await _app_db.upsert_session(cid, project_id=CHAT_PROJECT_ID, title=title)
return {"ok": True, "chat": {"chat_id": cid, "title": title, "agent_id": info["agent_id"], "node_id": info["node_id"], "source": info["source"], "external_chat_ref": info["external_chat_ref"], "updated_at": sess.get("last_active")}}
@app.get("/api/chats/{chat_id}/messages")
async def api_chat_messages(
chat_id: str,
limit: int = Query(100, ge=1, le=500),
cursor: Optional[str] = Query(None),
_auth: str = Depends(require_auth),
):
cur = _cursor_decode(cursor)
before_ts = str(cur.get("ts") or "").strip() or None
before_message_id = str(cur.get("message_id") or "").strip() or None
rows_desc = await _app_db.list_messages_page(
chat_id,
limit=limit + 1,
before_ts=before_ts,
before_msg_id=before_message_id,
)
has_more = len(rows_desc) > limit
page_desc = rows_desc[:limit]
rows = list(reversed(page_desc))
info = _parse_chat_id(chat_id)
messages = [
{
"message_id": r.get("msg_id"),
"chat_id": chat_id,
"role": r.get("role"),
"text": r.get("content", ""),
"ts": r.get("ts"),
"meta": {
"node_id": info["node_id"],
"agent_id": info["agent_id"],
"source": info["source"],
},
}
for r in rows
]
next_cursor = None
if has_more and page_desc:
tail = page_desc[-1]
next_cursor = _cursor_encode({"ts": tail.get("ts"), "message_id": tail.get("msg_id")})
return {
"items": messages,
"count": len(messages),
"chat_id": chat_id,
"next_cursor": next_cursor,
"has_more": has_more,
}
@app.post("/api/chats/{chat_id}/send")
async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Request, _auth: str = Depends(require_auth)):
client_ip = request.client.host if request.client else "unknown"
if not _check_rate(f"chat_v2:{client_ip}", max_calls=30, window_sec=60):
raise HTTPException(status_code=429, detail="Rate limit: 30 messages/min")
text = (body.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="text is required")
idem_key = (
(
request.headers.get("Idempotency-Key")
or body.idempotency_key
or ""
).strip()
)[:128]
if idem_key:
cached = _idem_get(chat_id, idem_key)
if cached:
replay = dict(cached)
replay["idempotency"] = {"replayed": True, "key": idem_key}
return replay
await _ensure_chat_project()
info = _parse_chat_id(chat_id)
target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper()
target_agent = info["agent_id"] or "sofiia"
project_id = body.project_id or CHAT_PROJECT_ID
session_id = body.session_id or chat_id
user_id = body.user_id or "console_user"
title = f"{target_agent}{target_node}{info['source']}"
await _app_db.upsert_session(chat_id, project_id=CHAT_PROJECT_ID, title=title)
user_saved = await _app_db.save_message(chat_id, "user", text[:4096])
metadata: Dict[str, Any] = {
"project_id": project_id,
"session_id": session_id,
"user_id": user_id,
"client": "sofiia-console",
"chat_id": chat_id,
"node_id": target_node,
"agent_id": target_agent,
"source": info["source"],
"external_chat_ref": info["external_chat_ref"],
"attachments": body.attachments or [],
"client_meta": body.client or {},
}
base_url = get_router_url(target_node)
if not base_url:
raise HTTPException(status_code=400, detail=f"router_url is not configured for node {target_node}")
try:
out = await infer(
base_url,
target_agent,
text,
model=None,
metadata=metadata,
timeout=300.0,
api_key=ROUTER_API_KEY,
)
except Exception as e:
_broadcast_bg(
_make_event(
"error",
{"where": "chat_v2.router", "message": str(e)[:180], "chat_id": chat_id, "node_id": target_node, "agent_id": target_agent},
project_id=project_id,
session_id=session_id,
user_id=user_id,
)
)
raise HTTPException(status_code=502, detail=str(e)[:300])
reply = _clean_chat_reply(out.get("response", out.get("text", "")))
assistant_saved = await _app_db.save_message(chat_id, "assistant", (reply or "")[:4096], parent_msg_id=user_saved.get("msg_id"))
trace_id = f"chatv2_{session_id}_{uuid.uuid4().hex[:8]}"
result = {
"ok": True,
"accepted": True,
"chat_id": chat_id,
"node_id": target_node,
"agent_id": target_agent,
"trace_id": trace_id,
"message": {
"message_id": assistant_saved.get("msg_id"),
"role": "assistant",
"text": reply,
"ts": assistant_saved.get("ts"),
"meta": {
"node_id": target_node,
"agent_id": target_agent,
"backend": out.get("backend"),
"model": out.get("model"),
},
},
}
if idem_key:
_idem_put(chat_id, idem_key, result)
result["idempotency"] = {"replayed": False, "key": idem_key}
return result
@app.post("/api/chat/send") @app.post("/api/chat/send")
async def api_chat_send(body: ChatSendBody, request: Request): async def api_chat_send(body: ChatSendBody, request: Request):
"""BFF chat: Ollama or router. Returns runtime contract fields. Rate: 30/min.""" """BFF chat: Ollama or router. Returns runtime contract fields. Rate: 30/min."""
@@ -6821,9 +7221,9 @@ async def console_kling_health() -> Dict[str, Any]:
return {"ok": False, "error": str(exc)} return {"ok": False, "error": str(exc)}
@app.post("/api/aurora/kling/enhance/{job_id}") @app.post("/api/aurora/kling/enhance")
async def console_kling_enhance( async def console_kling_enhance_plain(
job_id: str, job_id: str = Form(...),
prompt: str = Form("enhance video quality, improve sharpness and clarity"), prompt: str = Form("enhance video quality, improve sharpness and clarity"),
negative_prompt: str = Form("noise, blur, artifacts, distortion"), negative_prompt: str = Form("noise, blur, artifacts, distortion"),
mode: str = Form("pro"), mode: str = Form("pro"),
@@ -6846,6 +7246,25 @@ async def console_kling_enhance(
) )
@app.post("/api/aurora/kling/enhance/{job_id}")
async def console_kling_enhance(
job_id: str,
prompt: str = Form("enhance video quality, improve sharpness and clarity"),
negative_prompt: str = Form("noise, blur, artifacts, distortion"),
mode: str = Form("pro"),
duration: str = Form("5"),
cfg_scale: float = Form(0.5),
) -> Dict[str, Any]:
return await console_kling_enhance_plain(
job_id=job_id,
prompt=prompt,
negative_prompt=negative_prompt,
mode=mode,
duration=duration,
cfg_scale=cfg_scale,
)
@app.get("/api/aurora/kling/status/{job_id}") @app.get("/api/aurora/kling/status/{job_id}")
async def console_kling_status(job_id: str) -> Dict[str, Any]: async def console_kling_status(job_id: str) -> Dict[str, Any]:
return await _aurora_request_json("GET", f"/api/aurora/kling/status/{job_id}", timeout=20.0, retries=2) return await _aurora_request_json("GET", f"/api/aurora/kling/status/{job_id}", timeout=20.0, retries=2)