From 0b30775ac164c985c81e53576ae3e175f56bb012 Mon Sep 17 00:00:00 2001 From: Apple Date: Mon, 2 Mar 2026 08:24:54 -0800 Subject: [PATCH] feat(sofiia-console): add structured json logging for chat ops Made-with: Cursor --- services/sofiia-console/app/logging.py | 49 ++++++++++++ services/sofiia-console/app/main.py | 99 ++++++++++++++++++++++++- tests/test_sofiia_structured_logging.py | 88 ++++++++++++++++++++++ 3 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 services/sofiia-console/app/logging.py create mode 100644 tests/test_sofiia_structured_logging.py diff --git a/services/sofiia-console/app/logging.py b/services/sofiia-console/app/logging.py new file mode 100644 index 00000000..10806d3f --- /dev/null +++ b/services/sofiia-console/app/logging.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import hashlib +import json +import logging +import os +import uuid +from datetime import datetime, timezone +from typing import Any + + +_LOGGER_NAME = "sofiia" +_JSON_ENABLED = os.getenv("SOFIIA_LOG_JSON", "1").strip().lower() not in {"0", "false", "no", "off"} +_LOG_LEVEL_NAME = os.getenv("SOFIIA_LOG_LEVEL", "INFO").strip().upper() +_LOG_LEVEL = getattr(logging, _LOG_LEVEL_NAME, logging.INFO) + + +def configure_sofiia_logger() -> logging.Logger: + logger = logging.getLogger(_LOGGER_NAME) + logger.setLevel(_LOG_LEVEL) + return logger + + +def hash_idempotency_key(key: str) -> str: + cleaned = (key or "").strip() + if not cleaned: + return "" + return hashlib.sha256(cleaned.encode("utf-8")).hexdigest()[:12] + + +def get_request_id(request: Any) -> str: + headers = getattr(request, "headers", None) + req_id = "" + if headers is not None: + req_id = (headers.get("X-Request-Id") or "").strip() + return req_id[:64] if req_id else uuid.uuid4().hex + + +def log_event(event: str, **fields: Any) -> None: + logger = logging.getLogger(_LOGGER_NAME) + payload = { + "event": event, + "ts": datetime.now(timezone.utc).isoformat(), + } + payload.update({k: v for k, v in fields.items() if v is not None}) + if _JSON_ENABLED: + logger.info(json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str)) + else: + logger.info("%s %s", event, payload) diff --git a/services/sofiia-console/app/main.py b/services/sofiia-console/app/main.py index d4935447..cf1632bf 100644 --- a/services/sofiia-console/app/main.py +++ b/services/sofiia-console/app/main.py @@ -64,8 +64,15 @@ from .metrics import ( render_metrics, ) from .idempotency import get_idempotency_store, ReplayEntry +from .logging import ( + configure_sofiia_logger, + get_request_id, + hash_idempotency_key, + log_event, +) logger = logging.getLogger(__name__) +configure_sofiia_logger() # ── Build info ──────────────────────────────────────────────────────────────── _VERSION = "0.4.0" @@ -3106,6 +3113,7 @@ def _cursor_decode(cursor: Optional[str]) -> Dict[str, Any]: @app.get("/api/chats") async def api_chats_list( + request: Request, nodes: str = Query("NODA1,NODA2"), agent_id: Optional[str] = Query(None), q: Optional[str] = Query(None), @@ -3184,6 +3192,17 @@ async def api_chats_list( } ) has_more = len(sessions) >= fetch_limit or len(items) >= limit + log_event( + "chat.list", + request_id=get_request_id(request), + node_id=",".join(sorted(node_filter)) if node_filter else None, + agent_id=(agent_id or None), + cursor_present=bool(cursor), + limit=limit, + has_more=has_more, + next_cursor_present=bool(next_cursor), + status="ok", + ) return { "items": items, "count": len(items), @@ -3212,6 +3231,7 @@ async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_aut @app.get("/api/chats/{chat_id}/messages") async def api_chat_messages( chat_id: str, + request: Request, limit: int = Query(100, ge=1, le=500), cursor: Optional[str] = Query(None), _auth: str = Depends(require_auth), @@ -3249,6 +3269,18 @@ async def api_chat_messages( if has_more and page_desc: tail = page_desc[-1] next_cursor = _cursor_encode({"ts": tail.get("ts"), "message_id": tail.get("msg_id")}) + log_event( + "chat.messages.list", + request_id=get_request_id(request), + chat_id=chat_id, + node_id=info["node_id"], + agent_id=info["agent_id"], + cursor_present=bool(cursor), + limit=limit, + has_more=has_more, + next_cursor_present=bool(next_cursor), + status="ok", + ) return { "items": messages, "count": len(messages), @@ -3260,12 +3292,14 @@ async def api_chat_messages( @app.post("/api/chats/{chat_id}/send") async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Request, _auth: str = Depends(require_auth)): + started_at = time.monotonic() client_ip = request.client.host if request.client else "unknown" if not _check_rate(f"chat_v2:{client_ip}", max_calls=30, window_sec=60): raise HTTPException(status_code=429, detail="Rate limit: 30 messages/min") text = (body.text or "").strip() if not text: raise HTTPException(status_code=400, detail="text is required") + request_id = get_request_id(request) idem_key = ( ( request.headers.get("Idempotency-Key") @@ -3273,19 +3307,41 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req or "" ).strip() )[:128] + idem_hash = hash_idempotency_key(idem_key) + info = _parse_chat_id(chat_id) + target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper() + target_agent = info["agent_id"] or "sofiia" + log_event( + "chat.send", + request_id=request_id, + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + idempotency_key_hash=(idem_hash or None), + replayed=False, + status="ok", + ) if idem_key: cache_key = f"{chat_id}::{idem_key}" cached = _idempotency_store.get(cache_key) if cached: SOFIIA_IDEMPOTENCY_REPLAYS_TOTAL.inc() + log_event( + "chat.send.replay", + request_id=request_id, + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + idempotency_key_hash=(idem_hash or None), + replayed=True, + message_id=cached.message_id, + status="ok", + ) replay = dict(cached.response_body) replay["idempotency"] = {"replayed": True, "key": idem_key} return replay await _ensure_chat_project() - info = _parse_chat_id(chat_id) - target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper() - target_agent = info["agent_id"] or "sofiia" SOFIIA_SEND_REQUESTS_TOTAL.labels(node_id=target_node).inc() project_id = body.project_id or CHAT_PROJECT_ID session_id = body.session_id or chat_id @@ -3309,6 +3365,18 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req } base_url = get_router_url(target_node) if not base_url: + duration_ms = int((time.monotonic() - started_at) * 1000) + log_event( + "chat.send.error", + request_id=request_id, + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + idempotency_key_hash=(idem_hash or None), + status="error", + error_code="router_url_not_configured", + duration_ms=duration_ms, + ) raise HTTPException(status_code=400, detail=f"router_url is not configured for node {target_node}") try: out = await infer( @@ -3321,6 +3389,19 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req api_key=ROUTER_API_KEY, ) except Exception as e: + duration_ms = int((time.monotonic() - started_at) * 1000) + log_event( + "chat.send.error", + request_id=request_id, + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + idempotency_key_hash=(idem_hash or None), + status="error", + error_code="upstream_error", + error=str(e)[:180], + duration_ms=duration_ms, + ) _broadcast_bg( _make_event( "error", @@ -3367,6 +3448,18 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req ), ) result["idempotency"] = {"replayed": False, "key": idem_key} + duration_ms = int((time.monotonic() - started_at) * 1000) + log_event( + "chat.send.ok", + request_id=request_id, + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + idempotency_key_hash=(idem_hash or None), + message_id=(result.get("message") or {}).get("message_id"), + status="ok", + duration_ms=duration_ms, + ) return result diff --git a/tests/test_sofiia_structured_logging.py b/tests/test_sofiia_structured_logging.py new file mode 100644 index 00000000..95eea872 --- /dev/null +++ b/tests/test_sofiia_structured_logging.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import json +import logging + + +def _create_chat(client, agent_id: str, node_id: str, ref: str) -> str: + r = client.post( + "/api/chats", + json={ + "agent_id": agent_id, + "node_id": node_id, + "source": "web", + "external_chat_ref": ref, + }, + ) + assert r.status_code == 200, r.text + return r.json()["chat"]["chat_id"] + + +def _event_messages(caplog, event: str): + out = [] + for rec in caplog.records: + try: + payload = json.loads(rec.getMessage()) + except Exception: + continue + if payload.get("event") == event: + out.append(payload) + return out + + +def test_structured_logging_send_and_replay(sofiia_client, sofiia_module, monkeypatch, caplog): + def _router_url(node_id: str) -> str: + return {"NODA1": "http://noda1-router.test", "NODA2": "http://noda2-router.test"}.get(node_id, "") + + async def _fake_infer(base_url, agent_id, text, **kwargs): + return {"response": "ok-structured", "backend": "fake", "model": "fake-model"} + + monkeypatch.setattr(sofiia_module, "get_router_url", _router_url) + monkeypatch.setattr(sofiia_module, "infer", _fake_infer) + + chat_id = _create_chat(sofiia_client, "sofiia", "NODA1", "log-send") + headers = {"Idempotency-Key": "idem-log-1", "X-Request-Id": "req-123"} + + with caplog.at_level(logging.INFO, logger="sofiia"): + r1 = sofiia_client.post(f"/api/chats/{chat_id}/send", json={"text": "hello"}, headers=headers) + r2 = sofiia_client.post(f"/api/chats/{chat_id}/send", json={"text": "hello"}, headers=headers) + + assert r1.status_code == 200, r1.text + assert r2.status_code == 200, r2.text + assert r2.json().get("idempotency", {}).get("replayed") is True + + send_events = _event_messages(caplog, "chat.send") + replay_events = _event_messages(caplog, "chat.send.replay") + ok_events = _event_messages(caplog, "chat.send.ok") + + assert send_events, "Expected chat.send structured log" + assert replay_events, "Expected chat.send.replay structured log" + assert ok_events, "Expected chat.send.ok structured log" + + first_send = send_events[0] + assert first_send["chat_id"] == chat_id + assert first_send["node_id"] == "NODA1" + assert first_send["event"] == "chat.send" + assert first_send["request_id"] == "req-123" + assert "idempotency_key_hash" in first_send + + replay = replay_events[0] + assert replay["chat_id"] == chat_id + assert replay["status"] == "ok" + assert replay["replayed"] is True + + +def test_structured_logging_pagination_events(sofiia_client, caplog): + with caplog.at_level(logging.INFO, logger="sofiia"): + r = sofiia_client.get("/api/chats?nodes=NODA1,NODA2&limit=5") + + assert r.status_code == 200, r.text + list_events = _event_messages(caplog, "chat.list") + assert list_events, "Expected chat.list structured log" + + entry = list_events[0] + assert entry["event"] == "chat.list" + assert entry["limit"] == 5 + assert "cursor_present" in entry + assert "has_more" in entry + assert entry["status"] == "ok"