From 11e0ba7264e711e2d1971be6555443633e47e280 Mon Sep 17 00:00:00 2001 From: Apple Date: Mon, 2 Mar 2026 09:36:11 -0800 Subject: [PATCH] feat(sofiia-console): add audit query endpoint with cursor pagination Made-with: Cursor --- services/sofiia-console/app/db.py | 33 ++++++++++++++++ services/sofiia-console/app/main.py | 61 ++++++++++++++++++++++++++++- tests/test_sofiia_audit_read.py | 53 +++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 tests/test_sofiia_audit_read.py diff --git a/services/sofiia-console/app/db.py b/services/sofiia-console/app/db.py index 9ab38228..f16130aa 100644 --- a/services/sofiia-console/app/db.py +++ b/services/sofiia-console/app/db.py @@ -814,8 +814,31 @@ async def list_audit_events( *, event: Optional[str] = None, operator_id: Optional[str] = None, + status: Optional[str] = None, + node_id: Optional[str] = None, chat_id: Optional[str] = None, limit: int = 100, +) -> List[Dict[str, Any]]: + return await list_audit_events_page( + event=event, + operator_id=operator_id, + status=status, + node_id=node_id, + chat_id=chat_id, + limit=limit, + ) + + +async def list_audit_events_page( + *, + event: Optional[str] = None, + operator_id: Optional[str] = None, + status: Optional[str] = None, + node_id: Optional[str] = None, + chat_id: Optional[str] = None, + limit: int = 100, + before_ts: Optional[str] = None, + before_id: Optional[str] = None, ) -> List[Dict[str, Any]]: db = await get_db() clauses = ["1=1"] @@ -826,9 +849,19 @@ async def list_audit_events( if operator_id: clauses.append("operator_id=?") params.append(operator_id) + if status: + clauses.append("status=?") + params.append(status) + if node_id: + clauses.append("node_id=?") + params.append(node_id) if chat_id: clauses.append("chat_id=?") params.append(chat_id) + if before_ts: + bid = before_id or "~~~~~~~~" + clauses.append("(ts < ? OR (ts = ? AND id < ?))") + params.extend([before_ts, before_ts, bid]) params.append(max(1, min(int(limit), 500))) sql = ( "SELECT * FROM audit_events WHERE " diff --git a/services/sofiia-console/app/main.py b/services/sofiia-console/app/main.py index 3b7ca88d..4908084b 100644 --- a/services/sofiia-console/app/main.py +++ b/services/sofiia-console/app/main.py @@ -3587,7 +3587,7 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req ) await audit_log( AuditEvent( - event="chat.send.result", + event="chat.send.error", operator_id=operator_id, operator_id_missing=operator_id_missing, ip=(request.client.host if request.client else None), @@ -3627,7 +3627,7 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req ) await audit_log( AuditEvent( - event="chat.send.result", + event="chat.send.error", operator_id=operator_id, operator_id_missing=operator_id_missing, ip=(request.client.host if request.client else None), @@ -3719,6 +3719,63 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req return result +@app.get("/api/audit") +async def api_audit_list( + chat_id: Optional[str] = Query(None), + operator_id: Optional[str] = Query(None), + event: Optional[str] = Query(None), + status: Optional[str] = Query(None), + node_id: Optional[str] = Query(None), + limit: int = Query(50, ge=1, le=200), + cursor: Optional[str] = Query(None), + _auth: str = Depends(require_auth), +): + SOFIIA_CURSOR_REQUESTS_TOTAL.labels(resource="audit").inc() + cur = _cursor_decode(cursor) + before_ts = str(cur.get("ts") or "").strip() or None + before_id = str(cur.get("id") or "").strip() or None + rows = await _app_db.list_audit_events_page( + event=(event or None), + operator_id=(operator_id or None), + status=(status or None), + node_id=(node_id or None), + chat_id=(chat_id or None), + limit=limit + 1, + before_ts=before_ts, + before_id=before_id, + ) + has_more = len(rows) > limit + page = rows[:limit] + next_cursor = None + if has_more and page: + tail = page[-1] + next_cursor = _cursor_encode({"ts": tail.get("ts"), "id": tail.get("id")}) + + items = [ + { + "id": r.get("id"), + "ts": r.get("ts"), + "event": r.get("event"), + "operator_id": r.get("operator_id"), + "operator_id_missing": bool(r.get("operator_id_missing")), + "ip": r.get("ip"), + "chat_id": r.get("chat_id"), + "node_id": r.get("node_id"), + "agent_id": r.get("agent_id"), + "status": r.get("status"), + "error_code": r.get("error_code"), + "duration_ms": r.get("duration_ms"), + "data": r.get("data_json") or {}, + } + for r in page + ] + return { + "items": items, + "has_more": has_more, + "next_cursor": next_cursor, + } + + @app.get("/metrics") def metrics(): data, content_type = render_metrics() diff --git a/tests/test_sofiia_audit_read.py b/tests/test_sofiia_audit_read.py new file mode 100644 index 00000000..b37ded0c --- /dev/null +++ b/tests/test_sofiia_audit_read.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import asyncio + +import app.db as db_mod # type: ignore + + +def _append(event: str, *, chat_id: str, operator_id: str = "op-a", status: str = "ok", node_id: str = "NODA2"): + return asyncio.run( + db_mod.append_audit_event( + event=event, + operator_id=operator_id, + chat_id=chat_id, + node_id=node_id, + agent_id="sofiia", + status=status, + data={"tag": event}, + ) + ) + + +def test_audit_read_cursor_pagination(sofiia_client): + chat_id = "chat:NODA2:sofiia:web:audit-read" + for i in range(5): + _append("chat.send.result", chat_id=chat_id, operator_id=f"op-{i}") + + r1 = sofiia_client.get(f"/api/audit?chat_id={chat_id}&limit=2") + assert r1.status_code == 200, r1.text + j1 = r1.json() + assert len(j1["items"]) == 2 + assert j1["has_more"] is True + assert j1["next_cursor"] + + r2 = sofiia_client.get(f"/api/audit?chat_id={chat_id}&limit=2&cursor={j1['next_cursor']}") + assert r2.status_code == 200, r2.text + j2 = r2.json() + assert len(j2["items"]) >= 1 + ids_1 = {x["id"] for x in j1["items"]} + ids_2 = {x["id"] for x in j2["items"]} + assert ids_1.isdisjoint(ids_2) + + +def test_audit_read_filter_by_event(sofiia_client): + chat_id = "chat:NODA2:sofiia:web:audit-filter" + _append("chat.send.error", chat_id=chat_id, status="error") + _append("chat.send.result", chat_id=chat_id, status="ok") + _append("chat.create", chat_id=chat_id, status="ok") + + r = sofiia_client.get("/api/audit?event=chat.send.error&limit=50") + assert r.status_code == 200, r.text + items = r.json()["items"] + assert items, "Expected at least one audit item for event filter" + assert all((it.get("event") == "chat.send.error") for it in items)