feat(sofiia-console): add structured json logging for chat ops

Made-with: Cursor
This commit is contained in:
Apple
2026-03-02 08:24:54 -08:00
parent 98555aa483
commit 0b30775ac1
3 changed files with 233 additions and 3 deletions

View File

@@ -0,0 +1,49 @@
from __future__ import annotations
import hashlib
import json
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import Any
_LOGGER_NAME = "sofiia"
_JSON_ENABLED = os.getenv("SOFIIA_LOG_JSON", "1").strip().lower() not in {"0", "false", "no", "off"}
_LOG_LEVEL_NAME = os.getenv("SOFIIA_LOG_LEVEL", "INFO").strip().upper()
_LOG_LEVEL = getattr(logging, _LOG_LEVEL_NAME, logging.INFO)
def configure_sofiia_logger() -> logging.Logger:
logger = logging.getLogger(_LOGGER_NAME)
logger.setLevel(_LOG_LEVEL)
return logger
def hash_idempotency_key(key: str) -> str:
cleaned = (key or "").strip()
if not cleaned:
return ""
return hashlib.sha256(cleaned.encode("utf-8")).hexdigest()[:12]
def get_request_id(request: Any) -> str:
headers = getattr(request, "headers", None)
req_id = ""
if headers is not None:
req_id = (headers.get("X-Request-Id") or "").strip()
return req_id[:64] if req_id else uuid.uuid4().hex
def log_event(event: str, **fields: Any) -> None:
logger = logging.getLogger(_LOGGER_NAME)
payload = {
"event": event,
"ts": datetime.now(timezone.utc).isoformat(),
}
payload.update({k: v for k, v in fields.items() if v is not None})
if _JSON_ENABLED:
logger.info(json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str))
else:
logger.info("%s %s", event, payload)

View File

@@ -64,8 +64,15 @@ from .metrics import (
render_metrics,
)
from .idempotency import get_idempotency_store, ReplayEntry
from .logging import (
configure_sofiia_logger,
get_request_id,
hash_idempotency_key,
log_event,
)
logger = logging.getLogger(__name__)
configure_sofiia_logger()
# ── Build info ────────────────────────────────────────────────────────────────
_VERSION = "0.4.0"
@@ -3106,6 +3113,7 @@ def _cursor_decode(cursor: Optional[str]) -> Dict[str, Any]:
@app.get("/api/chats")
async def api_chats_list(
request: Request,
nodes: str = Query("NODA1,NODA2"),
agent_id: Optional[str] = Query(None),
q: Optional[str] = Query(None),
@@ -3184,6 +3192,17 @@ async def api_chats_list(
}
)
has_more = len(sessions) >= fetch_limit or len(items) >= limit
log_event(
"chat.list",
request_id=get_request_id(request),
node_id=",".join(sorted(node_filter)) if node_filter else None,
agent_id=(agent_id or None),
cursor_present=bool(cursor),
limit=limit,
has_more=has_more,
next_cursor_present=bool(next_cursor),
status="ok",
)
return {
"items": items,
"count": len(items),
@@ -3212,6 +3231,7 @@ async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_aut
@app.get("/api/chats/{chat_id}/messages")
async def api_chat_messages(
chat_id: str,
request: Request,
limit: int = Query(100, ge=1, le=500),
cursor: Optional[str] = Query(None),
_auth: str = Depends(require_auth),
@@ -3249,6 +3269,18 @@ async def api_chat_messages(
if has_more and page_desc:
tail = page_desc[-1]
next_cursor = _cursor_encode({"ts": tail.get("ts"), "message_id": tail.get("msg_id")})
log_event(
"chat.messages.list",
request_id=get_request_id(request),
chat_id=chat_id,
node_id=info["node_id"],
agent_id=info["agent_id"],
cursor_present=bool(cursor),
limit=limit,
has_more=has_more,
next_cursor_present=bool(next_cursor),
status="ok",
)
return {
"items": messages,
"count": len(messages),
@@ -3260,12 +3292,14 @@ async def api_chat_messages(
@app.post("/api/chats/{chat_id}/send")
async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Request, _auth: str = Depends(require_auth)):
started_at = time.monotonic()
client_ip = request.client.host if request.client else "unknown"
if not _check_rate(f"chat_v2:{client_ip}", max_calls=30, window_sec=60):
raise HTTPException(status_code=429, detail="Rate limit: 30 messages/min")
text = (body.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="text is required")
request_id = get_request_id(request)
idem_key = (
(
request.headers.get("Idempotency-Key")
@@ -3273,19 +3307,41 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
or ""
).strip()
)[:128]
idem_hash = hash_idempotency_key(idem_key)
info = _parse_chat_id(chat_id)
target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper()
target_agent = info["agent_id"] or "sofiia"
log_event(
"chat.send",
request_id=request_id,
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
idempotency_key_hash=(idem_hash or None),
replayed=False,
status="ok",
)
if idem_key:
cache_key = f"{chat_id}::{idem_key}"
cached = _idempotency_store.get(cache_key)
if cached:
SOFIIA_IDEMPOTENCY_REPLAYS_TOTAL.inc()
log_event(
"chat.send.replay",
request_id=request_id,
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
idempotency_key_hash=(idem_hash or None),
replayed=True,
message_id=cached.message_id,
status="ok",
)
replay = dict(cached.response_body)
replay["idempotency"] = {"replayed": True, "key": idem_key}
return replay
await _ensure_chat_project()
info = _parse_chat_id(chat_id)
target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper()
target_agent = info["agent_id"] or "sofiia"
SOFIIA_SEND_REQUESTS_TOTAL.labels(node_id=target_node).inc()
project_id = body.project_id or CHAT_PROJECT_ID
session_id = body.session_id or chat_id
@@ -3309,6 +3365,18 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
}
base_url = get_router_url(target_node)
if not base_url:
duration_ms = int((time.monotonic() - started_at) * 1000)
log_event(
"chat.send.error",
request_id=request_id,
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
idempotency_key_hash=(idem_hash or None),
status="error",
error_code="router_url_not_configured",
duration_ms=duration_ms,
)
raise HTTPException(status_code=400, detail=f"router_url is not configured for node {target_node}")
try:
out = await infer(
@@ -3321,6 +3389,19 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
api_key=ROUTER_API_KEY,
)
except Exception as e:
duration_ms = int((time.monotonic() - started_at) * 1000)
log_event(
"chat.send.error",
request_id=request_id,
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
idempotency_key_hash=(idem_hash or None),
status="error",
error_code="upstream_error",
error=str(e)[:180],
duration_ms=duration_ms,
)
_broadcast_bg(
_make_event(
"error",
@@ -3367,6 +3448,18 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
),
)
result["idempotency"] = {"replayed": False, "key": idem_key}
duration_ms = int((time.monotonic() - started_at) * 1000)
log_event(
"chat.send.ok",
request_id=request_id,
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
idempotency_key_hash=(idem_hash or None),
message_id=(result.get("message") or {}).get("message_id"),
status="ok",
duration_ms=duration_ms,
)
return result

View File

@@ -0,0 +1,88 @@
from __future__ import annotations
import json
import logging
def _create_chat(client, agent_id: str, node_id: str, ref: str) -> str:
r = client.post(
"/api/chats",
json={
"agent_id": agent_id,
"node_id": node_id,
"source": "web",
"external_chat_ref": ref,
},
)
assert r.status_code == 200, r.text
return r.json()["chat"]["chat_id"]
def _event_messages(caplog, event: str):
out = []
for rec in caplog.records:
try:
payload = json.loads(rec.getMessage())
except Exception:
continue
if payload.get("event") == event:
out.append(payload)
return out
def test_structured_logging_send_and_replay(sofiia_client, sofiia_module, monkeypatch, caplog):
def _router_url(node_id: str) -> str:
return {"NODA1": "http://noda1-router.test", "NODA2": "http://noda2-router.test"}.get(node_id, "")
async def _fake_infer(base_url, agent_id, text, **kwargs):
return {"response": "ok-structured", "backend": "fake", "model": "fake-model"}
monkeypatch.setattr(sofiia_module, "get_router_url", _router_url)
monkeypatch.setattr(sofiia_module, "infer", _fake_infer)
chat_id = _create_chat(sofiia_client, "sofiia", "NODA1", "log-send")
headers = {"Idempotency-Key": "idem-log-1", "X-Request-Id": "req-123"}
with caplog.at_level(logging.INFO, logger="sofiia"):
r1 = sofiia_client.post(f"/api/chats/{chat_id}/send", json={"text": "hello"}, headers=headers)
r2 = sofiia_client.post(f"/api/chats/{chat_id}/send", json={"text": "hello"}, headers=headers)
assert r1.status_code == 200, r1.text
assert r2.status_code == 200, r2.text
assert r2.json().get("idempotency", {}).get("replayed") is True
send_events = _event_messages(caplog, "chat.send")
replay_events = _event_messages(caplog, "chat.send.replay")
ok_events = _event_messages(caplog, "chat.send.ok")
assert send_events, "Expected chat.send structured log"
assert replay_events, "Expected chat.send.replay structured log"
assert ok_events, "Expected chat.send.ok structured log"
first_send = send_events[0]
assert first_send["chat_id"] == chat_id
assert first_send["node_id"] == "NODA1"
assert first_send["event"] == "chat.send"
assert first_send["request_id"] == "req-123"
assert "idempotency_key_hash" in first_send
replay = replay_events[0]
assert replay["chat_id"] == chat_id
assert replay["status"] == "ok"
assert replay["replayed"] is True
def test_structured_logging_pagination_events(sofiia_client, caplog):
with caplog.at_level(logging.INFO, logger="sofiia"):
r = sofiia_client.get("/api/chats?nodes=NODA1,NODA2&limit=5")
assert r.status_code == 200, r.text
list_events = _event_messages(caplog, "chat.list")
assert list_events, "Expected chat.list structured log"
entry = list_events[0]
assert entry["event"] == "chat.list"
assert entry["limit"] == 5
assert "cursor_present" in entry
assert "has_more" in entry
assert entry["status"] == "ok"