feat(sofiia-console): add audit query endpoint with cursor pagination

Made-with: Cursor
This commit is contained in:
Apple
2026-03-02 09:36:11 -08:00
parent 9e70fc83d2
commit 11e0ba7264
3 changed files with 145 additions and 2 deletions

View File

@@ -814,8 +814,31 @@ async def list_audit_events(
*, *,
event: Optional[str] = None, event: Optional[str] = None,
operator_id: Optional[str] = None, operator_id: Optional[str] = None,
status: Optional[str] = None,
node_id: Optional[str] = None,
chat_id: Optional[str] = None, chat_id: Optional[str] = None,
limit: int = 100, limit: int = 100,
) -> List[Dict[str, Any]]:
return await list_audit_events_page(
event=event,
operator_id=operator_id,
status=status,
node_id=node_id,
chat_id=chat_id,
limit=limit,
)
async def list_audit_events_page(
*,
event: Optional[str] = None,
operator_id: Optional[str] = None,
status: Optional[str] = None,
node_id: Optional[str] = None,
chat_id: Optional[str] = None,
limit: int = 100,
before_ts: Optional[str] = None,
before_id: Optional[str] = None,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
db = await get_db() db = await get_db()
clauses = ["1=1"] clauses = ["1=1"]
@@ -826,9 +849,19 @@ async def list_audit_events(
if operator_id: if operator_id:
clauses.append("operator_id=?") clauses.append("operator_id=?")
params.append(operator_id) params.append(operator_id)
if status:
clauses.append("status=?")
params.append(status)
if node_id:
clauses.append("node_id=?")
params.append(node_id)
if chat_id: if chat_id:
clauses.append("chat_id=?") clauses.append("chat_id=?")
params.append(chat_id) params.append(chat_id)
if before_ts:
bid = before_id or "~~~~~~~~"
clauses.append("(ts < ? OR (ts = ? AND id < ?))")
params.extend([before_ts, before_ts, bid])
params.append(max(1, min(int(limit), 500))) params.append(max(1, min(int(limit), 500)))
sql = ( sql = (
"SELECT * FROM audit_events WHERE " "SELECT * FROM audit_events WHERE "

View File

@@ -3587,7 +3587,7 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
) )
await audit_log( await audit_log(
AuditEvent( AuditEvent(
event="chat.send.result", event="chat.send.error",
operator_id=operator_id, operator_id=operator_id,
operator_id_missing=operator_id_missing, operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None), ip=(request.client.host if request.client else None),
@@ -3627,7 +3627,7 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
) )
await audit_log( await audit_log(
AuditEvent( AuditEvent(
event="chat.send.result", event="chat.send.error",
operator_id=operator_id, operator_id=operator_id,
operator_id_missing=operator_id_missing, operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None), ip=(request.client.host if request.client else None),
@@ -3719,6 +3719,63 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
return result return result
@app.get("/api/audit")
async def api_audit_list(
chat_id: Optional[str] = Query(None),
operator_id: Optional[str] = Query(None),
event: Optional[str] = Query(None),
status: Optional[str] = Query(None),
node_id: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
cursor: Optional[str] = Query(None),
_auth: str = Depends(require_auth),
):
SOFIIA_CURSOR_REQUESTS_TOTAL.labels(resource="audit").inc()
cur = _cursor_decode(cursor)
before_ts = str(cur.get("ts") or "").strip() or None
before_id = str(cur.get("id") or "").strip() or None
rows = await _app_db.list_audit_events_page(
event=(event or None),
operator_id=(operator_id or None),
status=(status or None),
node_id=(node_id or None),
chat_id=(chat_id or None),
limit=limit + 1,
before_ts=before_ts,
before_id=before_id,
)
has_more = len(rows) > limit
page = rows[:limit]
next_cursor = None
if has_more and page:
tail = page[-1]
next_cursor = _cursor_encode({"ts": tail.get("ts"), "id": tail.get("id")})
items = [
{
"id": r.get("id"),
"ts": r.get("ts"),
"event": r.get("event"),
"operator_id": r.get("operator_id"),
"operator_id_missing": bool(r.get("operator_id_missing")),
"ip": r.get("ip"),
"chat_id": r.get("chat_id"),
"node_id": r.get("node_id"),
"agent_id": r.get("agent_id"),
"status": r.get("status"),
"error_code": r.get("error_code"),
"duration_ms": r.get("duration_ms"),
"data": r.get("data_json") or {},
}
for r in page
]
return {
"items": items,
"has_more": has_more,
"next_cursor": next_cursor,
}
@app.get("/metrics") @app.get("/metrics")
def metrics(): def metrics():
data, content_type = render_metrics() data, content_type = render_metrics()

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import asyncio
import app.db as db_mod # type: ignore
def _append(event: str, *, chat_id: str, operator_id: str = "op-a", status: str = "ok", node_id: str = "NODA2"):
return asyncio.run(
db_mod.append_audit_event(
event=event,
operator_id=operator_id,
chat_id=chat_id,
node_id=node_id,
agent_id="sofiia",
status=status,
data={"tag": event},
)
)
def test_audit_read_cursor_pagination(sofiia_client):
chat_id = "chat:NODA2:sofiia:web:audit-read"
for i in range(5):
_append("chat.send.result", chat_id=chat_id, operator_id=f"op-{i}")
r1 = sofiia_client.get(f"/api/audit?chat_id={chat_id}&limit=2")
assert r1.status_code == 200, r1.text
j1 = r1.json()
assert len(j1["items"]) == 2
assert j1["has_more"] is True
assert j1["next_cursor"]
r2 = sofiia_client.get(f"/api/audit?chat_id={chat_id}&limit=2&cursor={j1['next_cursor']}")
assert r2.status_code == 200, r2.text
j2 = r2.json()
assert len(j2["items"]) >= 1
ids_1 = {x["id"] for x in j1["items"]}
ids_2 = {x["id"] for x in j2["items"]}
assert ids_1.isdisjoint(ids_2)
def test_audit_read_filter_by_event(sofiia_client):
chat_id = "chat:NODA2:sofiia:web:audit-filter"
_append("chat.send.error", chat_id=chat_id, status="error")
_append("chat.send.result", chat_id=chat_id, status="ok")
_append("chat.create", chat_id=chat_id, status="ok")
r = sofiia_client.get("/api/audit?event=chat.send.error&limit=50")
assert r.status_code == 200, r.text
items = r.json()["items"]
assert items, "Expected at least one audit item for event filter"
assert all((it.get("event") == "chat.send.error") for it in items)