feat(sofiia-console): add audit trail for operator actions

Made-with: Cursor
This commit is contained in:
Apple
2026-03-02 09:29:14 -08:00
parent 9b89ace2fc
commit 3246440ac8
4 changed files with 407 additions and 2 deletions

View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from . import db as _app_db
@dataclass
class AuditEvent:
event: str
operator_id: str
operator_id_missing: bool = False
ip: Optional[str] = None
chat_id: Optional[str] = None
node_id: Optional[str] = None
agent_id: Optional[str] = None
status: str = "ok"
error_code: Optional[str] = None
duration_ms: Optional[int] = None
data: Dict[str, Any] = field(default_factory=dict)
async def audit_log(audit_event: AuditEvent) -> Dict[str, Any]:
return await _app_db.append_audit_event(
audit_event.event,
audit_event.operator_id,
operator_id_missing=bool(audit_event.operator_id_missing),
ip=audit_event.ip,
chat_id=audit_event.chat_id,
node_id=audit_event.node_id,
agent_id=audit_event.agent_id,
status=audit_event.status,
error_code=audit_event.error_code,
duration_ms=audit_event.duration_ms,
data=audit_event.data,
)

View File

@@ -329,6 +329,27 @@ CREATE INDEX IF NOT EXISTS idx_governance_events_scope_time
CREATE INDEX IF NOT EXISTS idx_governance_events_type_time
ON governance_events(event_type, created_at DESC);
-- ── Operator Audit Trail (Sofiia Console) ───────────────────────────────────
CREATE TABLE IF NOT EXISTS audit_events (
id TEXT PRIMARY KEY,
ts TEXT NOT NULL,
event TEXT NOT NULL,
operator_id TEXT NOT NULL,
operator_id_missing INTEGER NOT NULL DEFAULT 0,
ip TEXT,
chat_id TEXT,
node_id TEXT,
agent_id TEXT,
status TEXT NOT NULL DEFAULT 'ok',
error_code TEXT,
duration_ms INTEGER,
data_json TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit_events(ts DESC);
CREATE INDEX IF NOT EXISTS idx_audit_operator_ts ON audit_events(operator_id, ts DESC);
CREATE INDEX IF NOT EXISTS idx_audit_chat_ts ON audit_events(chat_id, ts DESC);
CREATE INDEX IF NOT EXISTS idx_audit_event_ts ON audit_events(event, ts DESC);
-- ── Graph Intelligence (Hygiene + Reflection) ──────────────────────────────
-- These ADD COLUMN statements are idempotent (IF NOT EXISTS requires SQLite 3.37+).
-- On older SQLite they fail silently — init_db() wraps them in a separate try block.
@@ -740,6 +761,93 @@ async def list_messages_page(
return [dict(r) for r in rows]
async def append_audit_event(
event: str,
operator_id: str,
*,
operator_id_missing: bool = False,
ip: Optional[str] = None,
chat_id: Optional[str] = None,
node_id: Optional[str] = None,
agent_id: Optional[str] = None,
status: str = "ok",
error_code: Optional[str] = None,
duration_ms: Optional[int] = None,
data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
db = await get_db()
event_id = str(uuid.uuid4())
now = _now()
payload = json.dumps(data or {}, ensure_ascii=True, separators=(",", ":"))
await db.execute(
"INSERT INTO audit_events("
"id,ts,event,operator_id,operator_id_missing,ip,chat_id,node_id,agent_id,"
"status,error_code,duration_ms,data_json"
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
event_id,
now,
str(event or "").strip(),
(str(operator_id or "").strip() or "unknown")[:128],
1 if operator_id_missing else 0,
(str(ip or "").strip() or None),
(str(chat_id or "").strip() or None),
(str(node_id or "").strip() or None),
(str(agent_id or "").strip() or None),
(str(status or "ok").strip() or "ok"),
(str(error_code or "").strip() or None),
int(duration_ms) if duration_ms is not None else None,
payload,
),
)
await db.commit()
return {
"id": event_id,
"ts": now,
"event": event,
"operator_id": operator_id,
"status": status,
}
async def list_audit_events(
*,
event: Optional[str] = None,
operator_id: Optional[str] = None,
chat_id: Optional[str] = None,
limit: int = 100,
) -> List[Dict[str, Any]]:
db = await get_db()
clauses = ["1=1"]
params: List[Any] = []
if event:
clauses.append("event=?")
params.append(event)
if operator_id:
clauses.append("operator_id=?")
params.append(operator_id)
if chat_id:
clauses.append("chat_id=?")
params.append(chat_id)
params.append(max(1, min(int(limit), 500)))
sql = (
"SELECT * FROM audit_events WHERE "
+ " AND ".join(clauses)
+ " ORDER BY ts DESC, id DESC LIMIT ?"
)
async with db.execute(sql, tuple(params)) as cur:
rows = await cur.fetchall()
out: List[Dict[str, Any]] = []
for r in rows:
row = dict(r)
try:
row["data_json"] = json.loads(row.get("data_json") or "{}")
except Exception:
row["data_json"] = {}
out.append(row)
return out
async def get_dialog_map(session_id: str) -> Dict[str, Any]:
"""Return nodes and edges for the dialog map tree.

View File

@@ -66,6 +66,7 @@ from .metrics import (
)
from .idempotency import get_idempotency_store, ReplayEntry
from .rate_limit import get_rate_limiter
from .audit import audit_log, AuditEvent
from .logging import (
configure_sofiia_logger,
get_request_id,
@@ -131,6 +132,18 @@ def _rate_limited_http(scope: str, retry_after_s: int) -> HTTPException:
)
def _resolve_operator_from_request(request: Request, request_id: str) -> Tuple[str, bool]:
operator_id = (
str(request.headers.get("X-Operator-Id") or "").strip()
or str(request.headers.get("X-User-Id") or "").strip()
)
if operator_id:
return operator_id[:128], False
client_ip = request.client.host if request.client else "unknown"
fallback = f"ip:{client_ip}" if client_ip else f"req:{request_id}"
return fallback[:128], True
# ── Voice error rings (repro pack for incident diagnosis) ─────────────────────
# Circular buffers: last 5 TTS errors and last 5 LLM errors.
# Populated by all voice endpoints. Read by /api/voice/degradation_status.
@@ -3247,8 +3260,10 @@ async def api_chats_list(
@app.post("/api/chats")
async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_auth)):
async def api_chat_create(body: ChatCreateBody, request: Request, _auth: str = Depends(require_auth)):
await _ensure_chat_project()
request_id = get_request_id(request)
operator_id, operator_id_missing = _resolve_operator_from_request(request, request_id)
cid = _make_chat_id(
node_id=body.node_id,
agent_id=body.agent_id,
@@ -3258,6 +3273,23 @@ async def api_chat_create(body: ChatCreateBody, _auth: str = Depends(require_aut
info = _parse_chat_id(cid)
title = (body.title or f"{info['agent_id']}{info['node_id']}{info['source']}").strip()
sess = await _app_db.upsert_session(cid, project_id=CHAT_PROJECT_ID, title=title)
await audit_log(
AuditEvent(
event="chat.create",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=cid,
node_id=info["node_id"],
agent_id=info["agent_id"],
status="ok",
data={
"request_id": request_id,
"source": info["source"],
"external_chat_ref": info["external_chat_ref"],
},
)
)
return {"ok": True, "chat": {"chat_id": cid, "title": title, "agent_id": info["agent_id"], "node_id": info["node_id"], "source": info["source"], "external_chat_ref": info["external_chat_ref"], "updated_at": sess.get("last_active")}}
@@ -3269,6 +3301,8 @@ async def api_chat_messages(
cursor: Optional[str] = Query(None),
_auth: str = Depends(require_auth),
):
request_id = get_request_id(request)
operator_id, operator_id_missing = _resolve_operator_from_request(request, request_id)
SOFIIA_CURSOR_REQUESTS_TOTAL.labels(resource="messages").inc()
cur = _cursor_decode(cursor)
before_ts = str(cur.get("ts") or "").strip() or None
@@ -3304,7 +3338,7 @@ async def api_chat_messages(
next_cursor = _cursor_encode({"ts": tail.get("ts"), "message_id": tail.get("msg_id")})
log_event(
"chat.messages.list",
request_id=get_request_id(request),
request_id=request_id,
chat_id=chat_id,
node_id=info["node_id"],
agent_id=info["agent_id"],
@@ -3314,6 +3348,25 @@ async def api_chat_messages(
next_cursor_present=bool(next_cursor),
status="ok",
)
await audit_log(
AuditEvent(
event="chat.messages.list",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=info["node_id"],
agent_id=info["agent_id"],
status="ok",
data={
"request_id": request_id,
"limit": limit,
"cursor_present": bool(cursor),
"count": len(messages),
"has_more": has_more,
},
)
)
return {
"items": messages,
"count": len(messages),
@@ -3345,6 +3398,26 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
target_node = ((body.routing or {}).get("force_node_id") or info["node_id"] or "NODA2").upper()
target_agent = info["agent_id"] or "sofiia"
operator_id, operator_id_missing = _resolve_operator_id(request, body, request_id)
payload_size_bytes = len(text.encode("utf-8"))
attachments_count = len(body.attachments or [])
await audit_log(
AuditEvent(
event="chat.send.requested",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="ok",
data={
"request_id": request_id,
"idempotency_key_hash": (idem_hash or None),
"payload_size_bytes": payload_size_bytes,
"attachments_count": attachments_count,
},
)
)
chat_rl = _rate_limiter.consume(f"rl:chat:{chat_id}", rps=_RL_CHAT_RPS, burst=_RL_CHAT_BURST)
if not chat_rl.allowed:
SOFIIA_RATE_LIMITED_TOTAL.labels(scope="chat").inc()
@@ -3363,6 +3436,27 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
status="error",
error_code="rate_limited",
)
await audit_log(
AuditEvent(
event="chat.send.rate_limited",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="error",
error_code="rate_limited",
duration_ms=int((time.monotonic() - started_at) * 1000),
data={
"request_id": request_id,
"scope": "chat",
"limit_rps": _RL_CHAT_RPS,
"burst": _RL_CHAT_BURST,
"retry_after_s": chat_rl.retry_after_s,
},
)
)
raise _rate_limited_http("chat", chat_rl.retry_after_s)
op_rl = _rate_limiter.consume(f"rl:op:{operator_id}", rps=_RL_OP_RPS, burst=_RL_OP_BURST)
if not op_rl.allowed:
@@ -3382,6 +3476,27 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
status="error",
error_code="rate_limited",
)
await audit_log(
AuditEvent(
event="chat.send.rate_limited",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="error",
error_code="rate_limited",
duration_ms=int((time.monotonic() - started_at) * 1000),
data={
"request_id": request_id,
"scope": "operator",
"limit_rps": _RL_OP_RPS,
"burst": _RL_OP_BURST,
"retry_after_s": op_rl.retry_after_s,
},
)
)
raise _rate_limited_http("operator", op_rl.retry_after_s)
log_event(
"chat.send",
@@ -3413,6 +3528,24 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
)
replay = dict(cached.response_body)
replay["idempotency"] = {"replayed": True, "key": idem_key}
await audit_log(
AuditEvent(
event="chat.send.result",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="ok",
duration_ms=int((time.monotonic() - started_at) * 1000),
data={
"request_id": request_id,
"message_id": cached.message_id,
"replayed": True,
},
)
)
return replay
await _ensure_chat_project()
@@ -3452,6 +3585,21 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
error_code="router_url_not_configured",
duration_ms=duration_ms,
)
await audit_log(
AuditEvent(
event="chat.send.result",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="error",
error_code="router_url_not_configured",
duration_ms=duration_ms,
data={"request_id": request_id},
)
)
raise HTTPException(status_code=400, detail=f"router_url is not configured for node {target_node}")
try:
out = await infer(
@@ -3477,6 +3625,21 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
error=str(e)[:180],
duration_ms=duration_ms,
)
await audit_log(
AuditEvent(
event="chat.send.result",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="error",
error_code="upstream_error",
duration_ms=duration_ms,
data={"request_id": request_id},
)
)
_broadcast_bg(
_make_event(
"error",
@@ -3535,6 +3698,24 @@ async def api_chat_send_v2(chat_id: str, body: ChatMessageSendBody, request: Req
status="ok",
duration_ms=duration_ms,
)
await audit_log(
AuditEvent(
event="chat.send.result",
operator_id=operator_id,
operator_id_missing=operator_id_missing,
ip=(request.client.host if request.client else None),
chat_id=chat_id,
node_id=target_node,
agent_id=target_agent,
status="ok",
duration_ms=duration_ms,
data={
"request_id": request_id,
"message_id": (result.get("message") or {}).get("message_id"),
"replayed": False,
},
)
)
return result