Files
microdao-daarion/services/router/audit_store.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

574 lines
21 KiB
Python

"""
Audit Store — persistence layer for ToolGovernance audit events.
Backends:
memory — in-process list (testing; not persistent)
jsonl — append-only JSONL file with daily rotation (default, zero-config)
postgres — asyncpg INSERT into tool_audit_events table
Selection: env var AUDIT_BACKEND=jsonl|postgres|memory (default: jsonl)
Security / Privacy:
- Payload is NEVER written (only hash + sizes)
- Each write is fire-and-forget: errors → log warning, do NOT raise
- Postgres writes are non-blocking (asyncio task)
JSONL schema per line (matches AuditEvent fields):
{ts, req_id, workspace_id, user_id, agent_id, tool, action,
status, duration_ms, in_size, out_size, input_hash,
graph_run_id?, graph_node?, job_id?}
Postgres DDL (run once — or apply via migration):
See _POSTGRES_DDL constant below.
"""
from __future__ import annotations
import asyncio
import datetime
import json
import logging
import os
import threading
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ─── DDL ──────────────────────────────────────────────────────────────────────
_POSTGRES_DDL = """
CREATE TABLE IF NOT EXISTS tool_audit_events (
id BIGSERIAL PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL,
req_id TEXT NOT NULL,
workspace_id TEXT NOT NULL,
user_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
tool TEXT NOT NULL,
action TEXT NOT NULL,
status TEXT NOT NULL,
duration_ms INT NOT NULL,
in_size INT NOT NULL,
out_size INT NOT NULL,
input_hash TEXT NOT NULL,
graph_run_id TEXT,
graph_node TEXT,
job_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_tool_audit_ts ON tool_audit_events(ts);
CREATE INDEX IF NOT EXISTS idx_tool_audit_tool_ts ON tool_audit_events(tool, ts);
CREATE INDEX IF NOT EXISTS idx_tool_audit_agent_ts ON tool_audit_events(agent_id, ts);
CREATE INDEX IF NOT EXISTS idx_tool_audit_ws_ts ON tool_audit_events(workspace_id, ts);
"""
# ─── Canonical event dict ─────────────────────────────────────────────────────
def _event_to_dict(event: "AuditEventLike") -> Dict[str, Any]:
"""Convert an AuditEvent (dataclass) or dict to canonical storage dict."""
if isinstance(event, dict):
return event
return {
"ts": getattr(event, "ts", ""),
"req_id": getattr(event, "req_id", ""),
"workspace_id": getattr(event, "workspace_id", ""),
"user_id": getattr(event, "user_id", ""),
"agent_id": getattr(event, "agent_id", ""),
"tool": getattr(event, "tool", ""),
"action": getattr(event, "action", ""),
"status": getattr(event, "status", ""),
"duration_ms": round(float(getattr(event, "duration_ms", 0))),
"in_size": int(getattr(event, "input_chars", 0)),
"out_size": int(getattr(event, "output_size_bytes", 0)),
"input_hash": getattr(event, "input_hash", ""),
"graph_run_id": getattr(event, "graph_run_id", None),
"graph_node": getattr(event, "graph_node", None),
"job_id": getattr(event, "job_id", None),
}
# Type alias (avoid circular imports)
AuditEventLike = Any
# ─── Interface ────────────────────────────────────────────────────────────────
class AuditStore(ABC):
@abstractmethod
def write(self, event: AuditEventLike) -> None:
"""Non-blocking write. MUST NOT raise on error."""
...
@abstractmethod
def read(
self,
from_ts: Optional[str] = None,
to_ts: Optional[str] = None,
tool: Optional[str] = None,
agent_id: Optional[str] = None,
workspace_id: Optional[str] = None,
limit: int = 50000,
) -> List[Dict[str, Any]]:
"""Read events matching filters. Returns list of dicts."""
...
def close(self) -> None:
pass
# ─── Memory store ─────────────────────────────────────────────────────────────
class MemoryAuditStore(AuditStore):
"""In-process store for testing. Thread-safe."""
def __init__(self, max_events: int = 100_000):
self._events: List[Dict] = []
self._lock = threading.Lock()
self._max = max_events
def write(self, event: AuditEventLike) -> None:
try:
d = _event_to_dict(event)
with self._lock:
self._events.append(d)
if len(self._events) > self._max:
self._events = self._events[-self._max:]
except Exception as e:
logger.warning("MemoryAuditStore.write error: %s", e)
def read(
self,
from_ts: Optional[str] = None,
to_ts: Optional[str] = None,
tool: Optional[str] = None,
agent_id: Optional[str] = None,
workspace_id: Optional[str] = None,
limit: int = 50000,
) -> List[Dict]:
with self._lock:
rows = list(self._events)
# Filter
if from_ts:
rows = [r for r in rows if r.get("ts", "") >= from_ts]
if to_ts:
rows = [r for r in rows if r.get("ts", "") <= to_ts]
if tool:
rows = [r for r in rows if r.get("tool") == tool]
if agent_id:
rows = [r for r in rows if r.get("agent_id") == agent_id]
if workspace_id:
rows = [r for r in rows if r.get("workspace_id") == workspace_id]
return rows[-limit:]
def clear(self) -> None:
with self._lock:
self._events.clear()
# ─── JSONL store ──────────────────────────────────────────────────────────────
class JsonlAuditStore(AuditStore):
"""
Append-only JSONL file with daily rotation.
File pattern: ops/audit/tool_audit_YYYY-MM-DD.jsonl
Writes are serialised through a threading.Lock (safe for multi-thread, not multi-process).
"""
def __init__(self, directory: str = "ops/audit"):
self._dir = Path(directory)
self._dir.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._current_file: Optional[Path] = None
self._current_date: Optional[str] = None
self._fh = None
def _get_fh(self, date_str: str):
if date_str != self._current_date:
if self._fh:
try:
self._fh.close()
except Exception:
pass
path = self._dir / f"tool_audit_{date_str}.jsonl"
self._fh = open(path, "a", encoding="utf-8", buffering=1) # line-buffered
self._current_date = date_str
self._current_file = path
return self._fh
def write(self, event: AuditEventLike) -> None:
try:
d = _event_to_dict(event)
date_str = (d.get("ts") or "")[:10] or datetime.date.today().isoformat()
line = json.dumps(d, ensure_ascii=False)
with self._lock:
fh = self._get_fh(date_str)
fh.write(line + "\n")
except Exception as e:
logger.warning("JsonlAuditStore.write error: %s", e)
def read(
self,
from_ts: Optional[str] = None,
to_ts: Optional[str] = None,
tool: Optional[str] = None,
agent_id: Optional[str] = None,
workspace_id: Optional[str] = None,
limit: int = 50000,
) -> List[Dict]:
"""Stream-read JSONL files in date range."""
# Determine which files to read
files = sorted(self._dir.glob("tool_audit_*.jsonl"))
if from_ts:
from_date = from_ts[:10]
files = [f for f in files if f.stem[-10:] >= from_date]
if to_ts:
to_date = to_ts[:10]
files = [f for f in files if f.stem[-10:] <= to_date]
rows = []
for fpath in files:
try:
with open(fpath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
d = json.loads(line)
except Exception:
continue
ts = d.get("ts", "")
if from_ts and ts < from_ts:
continue
if to_ts and ts > to_ts:
continue
if tool and d.get("tool") != tool:
continue
if agent_id and d.get("agent_id") != agent_id:
continue
if workspace_id and d.get("workspace_id") != workspace_id:
continue
rows.append(d)
if len(rows) >= limit:
break
except Exception as e:
logger.warning("JsonlAuditStore.read error %s: %s", fpath, e)
if len(rows) >= limit:
break
return rows
def close(self) -> None:
with self._lock:
if self._fh:
try:
self._fh.close()
except Exception:
pass
self._fh = None
# ─── Postgres store ───────────────────────────────────────────────────────────
class PostgresAuditStore(AuditStore):
"""
Async Postgres store using asyncpg.
Writes are enqueued to an asyncio queue and flushed in background.
Falls back gracefully if Postgres is unavailable.
"""
def __init__(self, dsn: str):
self._dsn = dsn
self._pool = None
self._queue: asyncio.Queue = asyncio.Queue(maxsize=10_000)
self._task: Optional[asyncio.Task] = None
self._started = False
def _ensure_started(self):
if self._started:
return
try:
loop = asyncio.get_event_loop()
if loop.is_running():
self._task = loop.create_task(self._flush_loop())
self._started = True
except RuntimeError:
pass
async def _get_pool(self):
if self._pool is None:
import asyncpg
self._pool = await asyncpg.create_pool(self._dsn, min_size=1, max_size=3)
async with self._pool.acquire() as conn:
await conn.execute(_POSTGRES_DDL)
return self._pool
async def _flush_loop(self):
while True:
events = []
try:
# Collect up to 50 events or wait 2s
evt = await asyncio.wait_for(self._queue.get(), timeout=2.0)
events.append(evt)
while not self._queue.empty() and len(events) < 50:
events.append(self._queue.get_nowait())
except asyncio.TimeoutError:
pass
except Exception:
pass
if not events:
continue
try:
pool = await self._get_pool()
async with pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO tool_audit_events
(ts, req_id, workspace_id, user_id, agent_id, tool, action,
status, duration_ms, in_size, out_size, input_hash,
graph_run_id, graph_node, job_id)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)
""",
[
(
e["ts"], e["req_id"], e["workspace_id"], e["user_id"],
e["agent_id"], e["tool"], e["action"], e["status"],
e["duration_ms"], e["in_size"], e["out_size"],
e["input_hash"], e.get("graph_run_id"),
e.get("graph_node"), e.get("job_id"),
)
for e in events
],
)
except Exception as ex:
logger.warning("PostgresAuditStore flush error: %s", ex)
def write(self, event: AuditEventLike) -> None:
try:
d = _event_to_dict(event)
self._ensure_started()
if self._started and not self._queue.full():
self._queue.put_nowait(d)
except Exception as e:
logger.warning("PostgresAuditStore.write error: %s", e)
def read(
self,
from_ts: Optional[str] = None,
to_ts: Optional[str] = None,
tool: Optional[str] = None,
agent_id: Optional[str] = None,
workspace_id: Optional[str] = None,
limit: int = 50000,
) -> List[Dict]:
"""Synchronous read via asyncio.run() — for analyzer queries."""
try:
return asyncio.run(self._async_read(from_ts, to_ts, tool, agent_id, workspace_id, limit))
except Exception as e:
logger.warning("PostgresAuditStore.read error: %s", e)
return []
async def _async_read(self, from_ts, to_ts, tool, agent_id, workspace_id, limit):
pool = await self._get_pool()
conditions = ["TRUE"]
params = []
p = 1
if from_ts:
conditions.append(f"ts >= ${p}"); params.append(from_ts); p += 1
if to_ts:
conditions.append(f"ts <= ${p}"); params.append(to_ts); p += 1
if tool:
conditions.append(f"tool = ${p}"); params.append(tool); p += 1
if agent_id:
conditions.append(f"agent_id = ${p}"); params.append(agent_id); p += 1
if workspace_id:
conditions.append(f"workspace_id = ${p}"); params.append(workspace_id); p += 1
sql = f"SELECT * FROM tool_audit_events WHERE {' AND '.join(conditions)} ORDER BY ts LIMIT {limit}"
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(r) for r in rows]
# ─── Null store ───────────────────────────────────────────────────────────────
class NullAuditStore(AuditStore):
"""No-op store (audit disabled)."""
def write(self, event: AuditEventLike) -> None:
pass
def read(self, **kwargs) -> List[Dict]:
return []
# ─── Global singleton ─────────────────────────────────────────────────────────
_store: Optional[AuditStore] = None
_store_lock = threading.Lock()
def get_audit_store() -> AuditStore:
"""Lazily initialise and return the global audit store."""
global _store
if _store is None:
with _store_lock:
if _store is None:
_store = _create_store()
return _store
def set_audit_store(store: AuditStore) -> None:
"""Override the global store (used in tests)."""
global _store
with _store_lock:
_store = store
class AutoAuditStore(AuditStore):
"""
Smart backend: tries Postgres first, falls back to JSONL on failure.
Used when AUDIT_BACKEND=auto (or unset with DATABASE_URL present).
- Writes go to whichever backend is currently healthy.
- On Postgres failure, transparently falls back to JsonlAuditStore.
- Recovers to Postgres on next health check (every ~5 min).
Non-fatal: write errors are logged as warnings.
"""
_RECOVERY_INTERVAL_S = 300 # retry Postgres after 5 minutes
def __init__(self, pg_dsn: str, jsonl_dir: str):
self._pg_dsn = pg_dsn
self._jsonl_dir = jsonl_dir
self._primary: Optional[PostgresAuditStore] = None
self._fallback: Optional[JsonlAuditStore] = None
self._using_fallback = False
self._fallback_since: float = 0.0
self._init_lock = threading.Lock()
def _get_primary(self) -> Optional[PostgresAuditStore]:
if self._primary is None:
with self._init_lock:
if self._primary is None:
self._primary = PostgresAuditStore(self._pg_dsn)
return self._primary
def _get_fallback(self) -> JsonlAuditStore:
if self._fallback is None:
with self._init_lock:
if self._fallback is None:
self._fallback = JsonlAuditStore(self._jsonl_dir)
return self._fallback
def _maybe_recover(self) -> None:
"""Try to switch back to Postgres if enough time has passed since fallback."""
if self._using_fallback and self._fallback_since > 0:
if time.monotonic() - self._fallback_since >= self._RECOVERY_INTERVAL_S:
logger.info("AutoAuditStore: attempting Postgres recovery")
self._using_fallback = False
self._fallback_since = 0.0
def write(self, event: AuditEventLike) -> None:
self._maybe_recover()
if not self._using_fallback:
try:
primary = self._get_primary()
if primary:
primary.write(event)
return
except Exception as pg_err:
logger.warning(
"AutoAuditStore: Postgres write failed (%s), switching to JSONL fallback", pg_err
)
self._using_fallback = True
self._fallback_since = time.monotonic()
# Write to JSONL fallback
try:
self._get_fallback().write(event)
except Exception as jl_err:
logger.warning("AutoAuditStore: JSONL fallback write failed: %s", jl_err)
def read(
self,
from_ts: Optional[str] = None,
to_ts: Optional[str] = None,
tool: Optional[str] = None,
agent_id: Optional[str] = None,
workspace_id: Optional[str] = None,
limit: int = 50000,
) -> List[Dict]:
"""Read from Postgres if available, else JSONL."""
self._maybe_recover()
if not self._using_fallback:
try:
primary = self._get_primary()
if primary:
return primary.read(from_ts=from_ts, to_ts=to_ts, tool=tool,
agent_id=agent_id, workspace_id=workspace_id, limit=limit)
except Exception as pg_err:
logger.warning("AutoAuditStore: Postgres read failed (%s), using JSONL", pg_err)
self._using_fallback = True
self._fallback_since = time.monotonic()
return self._get_fallback().read(
from_ts=from_ts, to_ts=to_ts, tool=tool,
agent_id=agent_id, workspace_id=workspace_id, limit=limit,
)
def active_backend(self) -> str:
"""Return the name of the currently active backend."""
return "jsonl_fallback" if self._using_fallback else "postgres"
def close(self) -> None:
if self._primary:
try:
self._primary.close()
except Exception:
pass
if self._fallback:
try:
self._fallback.close()
except Exception:
pass
def _create_store() -> AuditStore:
backend = os.getenv("AUDIT_BACKEND", "jsonl").lower()
dsn = os.getenv("DATABASE_URL") or os.getenv("POSTGRES_DSN", "")
audit_dir = os.getenv(
"AUDIT_JSONL_DIR",
str(Path(os.getenv("REPO_ROOT", ".")) / "ops" / "audit"),
)
if backend == "memory":
logger.info("AuditStore: in-memory (testing only)")
return MemoryAuditStore()
if backend == "postgres":
if not dsn:
logger.warning("AUDIT_BACKEND=postgres but DATABASE_URL not set; falling back to jsonl")
else:
logger.info("AuditStore: postgres dsn=%s", dsn[:30])
return PostgresAuditStore(dsn)
if backend == "auto":
if dsn:
logger.info("AuditStore: auto (postgres→jsonl fallback) dsn=%s", dsn[:30])
return AutoAuditStore(pg_dsn=dsn, jsonl_dir=audit_dir)
else:
logger.info("AuditStore: auto — no DATABASE_URL, using jsonl")
if backend == "null":
return NullAuditStore()
# Default / jsonl
logger.info("AuditStore: jsonl dir=%s", audit_dir)
return JsonlAuditStore(audit_dir)