From 3246440ac8dd1708ef50ac2f81e7e9f1b6c15f1e Mon Sep 17 00:00:00 2001 From: Apple Date: Mon, 2 Mar 2026 09:29:14 -0800 Subject: [PATCH] feat(sofiia-console): add audit trail for operator actions Made-with: Cursor --- services/sofiia-console/app/audit.py | 37 ++++++ services/sofiia-console/app/db.py | 108 ++++++++++++++++ services/sofiia-console/app/main.py | 185 ++++++++++++++++++++++++++- tests/test_sofiia_audit_trail.py | 79 ++++++++++++ 4 files changed, 407 insertions(+), 2 deletions(-) create mode 100644 services/sofiia-console/app/audit.py create mode 100644 tests/test_sofiia_audit_trail.py diff --git a/services/sofiia-console/app/audit.py b/services/sofiia-console/app/audit.py new file mode 100644 index 00000000..e31a90f9 --- /dev/null +++ b/services/sofiia-console/app/audit.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +from . import db as _app_db + + +@dataclass +class AuditEvent: + event: str + operator_id: str + operator_id_missing: bool = False + ip: Optional[str] = None + chat_id: Optional[str] = None + node_id: Optional[str] = None + agent_id: Optional[str] = None + status: str = "ok" + error_code: Optional[str] = None + duration_ms: Optional[int] = None + data: Dict[str, Any] = field(default_factory=dict) + + +async def audit_log(audit_event: AuditEvent) -> Dict[str, Any]: + return await _app_db.append_audit_event( + audit_event.event, + audit_event.operator_id, + operator_id_missing=bool(audit_event.operator_id_missing), + ip=audit_event.ip, + chat_id=audit_event.chat_id, + node_id=audit_event.node_id, + agent_id=audit_event.agent_id, + status=audit_event.status, + error_code=audit_event.error_code, + duration_ms=audit_event.duration_ms, + data=audit_event.data, + ) diff --git a/services/sofiia-console/app/db.py b/services/sofiia-console/app/db.py index 8b12775f..9ab38228 100644 --- a/services/sofiia-console/app/db.py +++ b/services/sofiia-console/app/db.py @@ -329,6 +329,27 @@ CREATE INDEX IF NOT EXISTS idx_governance_events_scope_time CREATE INDEX IF NOT EXISTS idx_governance_events_type_time ON governance_events(event_type, created_at DESC); +-- ── Operator Audit Trail (Sofiia Console) ─────────────────────────────────── +CREATE TABLE IF NOT EXISTS audit_events ( + id TEXT PRIMARY KEY, + ts TEXT NOT NULL, + event TEXT NOT NULL, + operator_id TEXT NOT NULL, + operator_id_missing INTEGER NOT NULL DEFAULT 0, + ip TEXT, + chat_id TEXT, + node_id TEXT, + agent_id TEXT, + status TEXT NOT NULL DEFAULT 'ok', + error_code TEXT, + duration_ms INTEGER, + data_json TEXT NOT NULL DEFAULT '{}' +); +CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit_events(ts DESC); +CREATE INDEX IF NOT EXISTS idx_audit_operator_ts ON audit_events(operator_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_audit_chat_ts ON audit_events(chat_id, ts DESC); +CREATE INDEX IF NOT EXISTS idx_audit_event_ts ON audit_events(event, ts DESC); + -- ── Graph Intelligence (Hygiene + Reflection) ────────────────────────────── -- These ADD COLUMN statements are idempotent (IF NOT EXISTS requires SQLite 3.37+). -- On older SQLite they fail silently — init_db() wraps them in a separate try block. @@ -740,6 +761,93 @@ async def list_messages_page( return [dict(r) for r in rows] +async def append_audit_event( + event: str, + operator_id: str, + *, + operator_id_missing: bool = False, + ip: Optional[str] = None, + chat_id: Optional[str] = None, + node_id: Optional[str] = None, + agent_id: Optional[str] = None, + status: str = "ok", + error_code: Optional[str] = None, + duration_ms: Optional[int] = None, + data: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + db = await get_db() + event_id = str(uuid.uuid4()) + now = _now() + payload = json.dumps(data or {}, ensure_ascii=True, separators=(",", ":")) + await db.execute( + "INSERT INTO audit_events(" + "id,ts,event,operator_id,operator_id_missing,ip,chat_id,node_id,agent_id," + "status,error_code,duration_ms,data_json" + ") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)", + ( + event_id, + now, + str(event or "").strip(), + (str(operator_id or "").strip() or "unknown")[:128], + 1 if operator_id_missing else 0, + (str(ip or "").strip() or None), + (str(chat_id or "").strip() or None), + (str(node_id or "").strip() or None), + (str(agent_id or "").strip() or None), + (str(status or "ok").strip() or "ok"), + (str(error_code or "").strip() or None), + int(duration_ms) if duration_ms is not None else None, + payload, + ), + ) + await db.commit() + return { + "id": event_id, + "ts": now, + "event": event, + "operator_id": operator_id, + "status": status, + } + + +async def list_audit_events( + *, + event: Optional[str] = None, + operator_id: Optional[str] = None, + chat_id: Optional[str] = None, + limit: int = 100, +) -> List[Dict[str, Any]]: + db = await get_db() + clauses = ["1=1"] + params: List[Any] = [] + if event: + clauses.append("event=?") + params.append(event) + if operator_id: + clauses.append("operator_id=?") + params.append(operator_id) + if chat_id: + clauses.append("chat_id=?") + params.append(chat_id) + params.append(max(1, min(int(limit), 500))) + sql = ( + "SELECT * FROM audit_events WHERE " + + " AND ".join(clauses) + + " ORDER BY ts DESC, id DESC LIMIT ?" + ) + async with db.execute(sql, tuple(params)) as cur: + rows = await cur.fetchall() + out: List[Dict[str, Any]] = [] + for r in rows: + row = dict(r) + try: + row["data_json"] = json.loads(row.get("data_json") or "{}") + except Exception: + row["data_json"] = {} + out.append(row) + return out + + async def get_dialog_map(session_id: str) -> Dict[str, Any]: """Return nodes and edges for the dialog map tree. diff --git a/services/sofiia-console/app/main.py b/services/sofiia-console/app/main.py index e8ea500b..3b7ca88d 100644 --- a/services/sofiia-console/app/main.py +++ b/services/sofiia-console/app/main.py @@ -66,6 +66,7 @@ from .metrics import ( ) from .idempotency import get_idempotency_store, ReplayEntry from .rate_limit import get_rate_limiter +from .audit import audit_log, AuditEvent from .logging import ( configure_sofiia_logger, get_request_id, @@ -131,6 +132,18 @@ def _rate_limited_http(scope: str, retry_after_s: int) -> HTTPException: ) +def _resolve_operator_from_request(request: Request, request_id: str) -> Tuple[str, bool]: + operator_id = ( + str(request.headers.get("X-Operator-Id") or "").strip() + or str(request.headers.get("X-User-Id") or "").strip() + ) + if operator_id: + return operator_id[:128], False + client_ip = request.client.host if request.client else "unknown" + fallback = f"ip:{client_ip}" if client_ip else f"req:{request_id}" + return fallback[:128], True + + # ── Voice error rings (repro pack for incident diagnosis) ───────────────────── # Circular buffers: last 5 TTS errors and last 5 LLM errors. # Populated by all voice endpoints. Read by /api/voice/degradation_status. @@ -3247,8 +3260,10 @@ async def api_chats_list( @app.post("/api/chats") -async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_auth)): +async def api_chat_create(body: ChatCreateBody, request: Request, _auth: str = Depends(require_auth)): await _ensure_chat_project() + request_id = get_request_id(request) + operator_id, operator_id_missing = _resolve_operator_from_request(request, request_id) cid = _make_chat_id( node_id=body.node_id, agent_id=body.agent_id, @@ -3258,6 +3273,23 @@ async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_aut info = _parse_chat_id(cid) title = (body.title or f"{info['agent_id']} • {info['node_id']} • {info['source']}").strip() sess = await _app_db.upsert_session(cid, project_id=CHAT_PROJECT_ID, title=title) + await audit_log( + AuditEvent( + event="chat.create", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=cid, + node_id=info["node_id"], + agent_id=info["agent_id"], + status="ok", + data={ + "request_id": request_id, + "source": info["source"], + "external_chat_ref": info["external_chat_ref"], + }, + ) + ) return {"ok": True, "chat": {"chat_id": cid, "title": title, "agent_id": info["agent_id"], "node_id": info["node_id"], "source": info["source"], "external_chat_ref": info["external_chat_ref"], "updated_at": sess.get("last_active")}} @@ -3269,6 +3301,8 @@ async def api_chat_messages( cursor: Optional[str] = Query(None), _auth: str = Depends(require_auth), ): + request_id = get_request_id(request) + operator_id, operator_id_missing = _resolve_operator_from_request(request, request_id) SOFIIA_CURSOR_REQUESTS_TOTAL.labels(resource="messages").inc() cur = _cursor_decode(cursor) before_ts = str(cur.get("ts") or "").strip() or None @@ -3304,7 +3338,7 @@ async def api_chat_messages( next_cursor = _cursor_encode({"ts": tail.get("ts"), "message_id": tail.get("msg_id")}) log_event( "chat.messages.list", - request_id=get_request_id(request), + request_id=request_id, chat_id=chat_id, node_id=info["node_id"], agent_id=info["agent_id"], @@ -3314,6 +3348,25 @@ async def api_chat_messages( next_cursor_present=bool(next_cursor), status="ok", ) + await audit_log( + AuditEvent( + event="chat.messages.list", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=info["node_id"], + agent_id=info["agent_id"], + status="ok", + data={ + "request_id": request_id, + "limit": limit, + "cursor_present": bool(cursor), + "count": len(messages), + "has_more": has_more, + }, + ) + ) return { "items": messages, "count": len(messages), @@ -3345,6 +3398,26 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper() target_agent = info["agent_id"] or "sofiia" operator_id, operator_id_missing = _resolve_operator_id(request, body, request_id) + payload_size_bytes = len(text.encode("utf-8")) + attachments_count = len(body.attachments or []) + await audit_log( + AuditEvent( + event="chat.send.requested", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="ok", + data={ + "request_id": request_id, + "idempotency_key_hash": (idem_hash or None), + "payload_size_bytes": payload_size_bytes, + "attachments_count": attachments_count, + }, + ) + ) chat_rl = _rate_limiter.consume(f"rl:chat:{chat_id}", rps=_RL_CHAT_RPS, burst=_RL_CHAT_BURST) if not chat_rl.allowed: SOFIIA_RATE_LIMITED_TOTAL.labels(scope="chat").inc() @@ -3363,6 +3436,27 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req status="error", error_code="rate_limited", ) + await audit_log( + AuditEvent( + event="chat.send.rate_limited", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="error", + error_code="rate_limited", + duration_ms=int((time.monotonic() - started_at) * 1000), + data={ + "request_id": request_id, + "scope": "chat", + "limit_rps": _RL_CHAT_RPS, + "burst": _RL_CHAT_BURST, + "retry_after_s": chat_rl.retry_after_s, + }, + ) + ) raise _rate_limited_http("chat", chat_rl.retry_after_s) op_rl = _rate_limiter.consume(f"rl:op:{operator_id}", rps=_RL_OP_RPS, burst=_RL_OP_BURST) if not op_rl.allowed: @@ -3382,6 +3476,27 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req status="error", error_code="rate_limited", ) + await audit_log( + AuditEvent( + event="chat.send.rate_limited", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="error", + error_code="rate_limited", + duration_ms=int((time.monotonic() - started_at) * 1000), + data={ + "request_id": request_id, + "scope": "operator", + "limit_rps": _RL_OP_RPS, + "burst": _RL_OP_BURST, + "retry_after_s": op_rl.retry_after_s, + }, + ) + ) raise _rate_limited_http("operator", op_rl.retry_after_s) log_event( "chat.send", @@ -3413,6 +3528,24 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req ) replay = dict(cached.response_body) replay["idempotency"] = {"replayed": True, "key": idem_key} + await audit_log( + AuditEvent( + event="chat.send.result", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="ok", + duration_ms=int((time.monotonic() - started_at) * 1000), + data={ + "request_id": request_id, + "message_id": cached.message_id, + "replayed": True, + }, + ) + ) return replay await _ensure_chat_project() @@ -3452,6 +3585,21 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req error_code="router_url_not_configured", duration_ms=duration_ms, ) + await audit_log( + AuditEvent( + event="chat.send.result", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="error", + error_code="router_url_not_configured", + duration_ms=duration_ms, + data={"request_id": request_id}, + ) + ) raise HTTPException(status_code=400, detail=f"router_url is not configured for node {target_node}") try: out = await infer( @@ -3477,6 +3625,21 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req error=str(e)[:180], duration_ms=duration_ms, ) + await audit_log( + AuditEvent( + event="chat.send.result", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="error", + error_code="upstream_error", + duration_ms=duration_ms, + data={"request_id": request_id}, + ) + ) _broadcast_bg( _make_event( "error", @@ -3535,6 +3698,24 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req status="ok", duration_ms=duration_ms, ) + await audit_log( + AuditEvent( + event="chat.send.result", + operator_id=operator_id, + operator_id_missing=operator_id_missing, + ip=(request.client.host if request.client else None), + chat_id=chat_id, + node_id=target_node, + agent_id=target_agent, + status="ok", + duration_ms=duration_ms, + data={ + "request_id": request_id, + "message_id": (result.get("message") or {}).get("message_id"), + "replayed": False, + }, + ) + ) return result diff --git a/tests/test_sofiia_audit_trail.py b/tests/test_sofiia_audit_trail.py new file mode 100644 index 00000000..6e311039 --- /dev/null +++ b/tests/test_sofiia_audit_trail.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import asyncio + +from app.rate_limit import InMemoryRateLimiter # type: ignore +import app.db as db_mod # type: ignore + + +def _create_chat(client, agent_id: str, node_id: str, ref: str): + return client.post( + "/api/chats", + json={ + "agent_id": agent_id, + "node_id": node_id, + "source": "web", + "external_chat_ref": ref, + }, + ) + + +def _audit_events(event: str, chat_id: str | None = None): + return asyncio.run(db_mod.list_audit_events(event=event, chat_id=chat_id, limit=200)) + + +def test_audit_trail_records_create_and_send(sofiia_client, sofiia_module, monkeypatch): + async def _fake_infer(base_url, agent_id, text, **kwargs): + return {"response": f"ok:{agent_id}:{text}", "backend": "fake", "model": "fake-model"} + + monkeypatch.setattr(sofiia_module, "infer", _fake_infer) + monkeypatch.setattr(sofiia_module, "_rate_limiter", InMemoryRateLimiter()) + monkeypatch.setattr(sofiia_module, "_RL_CHAT_RPS", 100.0) + monkeypatch.setattr(sofiia_module, "_RL_CHAT_BURST", 100) + monkeypatch.setattr(sofiia_module, "_RL_OP_RPS", 100.0) + monkeypatch.setattr(sofiia_module, "_RL_OP_BURST", 100) + + r_create = _create_chat(sofiia_client, "sofiia", "NODA2", "audit-create-send") + assert r_create.status_code == 200, r_create.text + chat_id = r_create.json()["chat"]["chat_id"] + + r_send = sofiia_client.post( + f"/api/chats/{chat_id}/send", + json={"text": "ping", "user_id": "operator-1"}, + ) + assert r_send.status_code == 200, r_send.text + + ev_create = _audit_events("chat.create", chat_id=chat_id) + ev_req = _audit_events("chat.send.requested", chat_id=chat_id) + ev_res = _audit_events("chat.send.result", chat_id=chat_id) + + assert ev_create, "Expected chat.create audit event" + assert ev_req, "Expected chat.send.requested audit event" + assert ev_res, "Expected chat.send.result audit event" + assert ev_res[0]["status"] == "ok" + assert "message_id" in (ev_res[0].get("data_json") or {}) + + +def test_audit_trail_records_rate_limited_send(sofiia_client, sofiia_module, monkeypatch): + async def _fake_infer(base_url, agent_id, text, **kwargs): + return {"response": f"ok:{agent_id}:{text}", "backend": "fake", "model": "fake-model"} + + monkeypatch.setattr(sofiia_module, "infer", _fake_infer) + monkeypatch.setattr(sofiia_module, "_rate_limiter", InMemoryRateLimiter()) + monkeypatch.setattr(sofiia_module, "_RL_CHAT_RPS", 0.001) + monkeypatch.setattr(sofiia_module, "_RL_CHAT_BURST", 1) + monkeypatch.setattr(sofiia_module, "_RL_OP_RPS", 100.0) + monkeypatch.setattr(sofiia_module, "_RL_OP_BURST", 100) + + r_create = _create_chat(sofiia_client, "sofiia", "NODA2", "audit-rl") + assert r_create.status_code == 200, r_create.text + chat_id = r_create.json()["chat"]["chat_id"] + + r1 = sofiia_client.post(f"/api/chats/{chat_id}/send", json={"text": "one", "user_id": "operator-rl"}) + r2 = sofiia_client.post(f"/api/chats/{chat_id}/send", json={"text": "two", "user_id": "operator-rl"}) + assert r1.status_code == 200, r1.text + assert r2.status_code == 429, r2.text + + ev_rl = _audit_events("chat.send.rate_limited", chat_id=chat_id) + assert ev_rl, "Expected chat.send.rate_limited audit event" + assert ev_rl[0]["error_code"] == "rate_limited"