Files
microdao-daarion/tests/test_audit_backend_auto.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

252 lines
8.7 KiB
Python

"""
tests/test_audit_backend_auto.py
─────────────────────────────────
Unit tests for AutoAuditStore and AUDIT_BACKEND=auto logic.
No real Postgres is needed — we mock PostgresAuditStore.
"""
from __future__ import annotations
import importlib
import os
import sys
import tempfile
import threading
from pathlib import Path
from typing import Dict, List, Optional
from unittest.mock import MagicMock, patch
import pytest
# ── Make sure the router package is importable ────────────────────────────────
ROUTER = Path(__file__).resolve().parent.parent / "services" / "router"
if str(ROUTER) not in sys.path:
sys.path.insert(0, str(ROUTER))
def _reload_audit_store():
"""Force-reload audit_store so env-var changes take effect."""
import audit_store as _m
# Reset global singleton
_m._store = None
importlib.reload(_m)
_m._store = None
return _m
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _make_event(**kwargs) -> Dict:
base = dict(
ts="2026-02-23T10:00:00Z",
req_id="r1",
workspace_id="ws1",
user_id="u1",
agent_id="sofiia",
tool="observability_tool",
action="service_overview",
status="succeeded",
duration_ms=42,
in_size=10,
out_size=50,
input_hash="abc",
)
base.update(kwargs)
return base
# ─── 1. AutoAuditStore: Postgres available ────────────────────────────────────
class TestAutoAuditStorePostgresAvailable:
def test_writes_to_postgres_when_available(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
pg_mock.write = MagicMock()
pg_mock.read = MagicMock(return_value=[_make_event()])
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock # inject mock directly
ev = _make_event()
store.write(ev)
pg_mock.write.assert_called_once_with(ev)
def test_reads_from_postgres_when_available(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
pg_mock.read = MagicMock(return_value=[_make_event(tool="kb_tool")])
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
events = store.read(limit=10)
assert len(events) == 1
assert events[0]["tool"] == "kb_tool"
pg_mock.read.assert_called_once()
def test_active_backend_returns_postgres(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
assert store.active_backend() == "postgres"
# ─── 2. AutoAuditStore: Postgres unavailable → fallback to JSONL ──────────────
class TestAutoAuditStoreFallback:
def test_fallback_on_write_failure(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
pg_mock.write = MagicMock(side_effect=ConnectionError("pg down"))
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
ev = _make_event()
store.write(ev) # should not raise
# Check _using_fallback is set
assert store._using_fallback is True
assert store.active_backend() == "jsonl_fallback"
def test_fallback_writes_jsonl_file(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
pg_mock.write = MagicMock(side_effect=ConnectionError("pg down"))
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
ev = _make_event()
store.write(ev)
# There should be at least one JSONL file in tmp_path
jsonl_files = list(tmp_path.glob("*.jsonl"))
assert len(jsonl_files) >= 1, "Expected JSONL fallback file to be created"
def test_read_falls_back_to_jsonl_on_pg_error(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
pg_mock.read = MagicMock(side_effect=RuntimeError("pg read error"))
# Pre-create a JSONL file with one event
jl_store = m.JsonlAuditStore(str(tmp_path))
jl_store.write(_make_event(tool="kb_tool"))
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
events = store.read(limit=100)
assert any(e["tool"] == "kb_tool" for e in events)
def test_fallback_recovery_after_interval(self, tmp_path):
"""After _RECOVERY_INTERVAL_S passes, AutoAuditStore tries Postgres again."""
import audit_store as m
import time
pg_mock = MagicMock()
pg_mock.write = MagicMock(side_effect=ConnectionError("pg down"))
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
# Trigger fallback
store.write(_make_event())
assert store._using_fallback is True
# Simulate recovery interval elapsed
store._fallback_since = time.monotonic() - store._RECOVERY_INTERVAL_S - 1
store._maybe_recover()
assert store._using_fallback is False
# ─── 3. _create_store() with AUDIT_BACKEND=auto ──────────────────────────────
class TestCreateStoreAuto:
def test_auto_with_dsn_creates_auto_store(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUDIT_BACKEND", "auto")
monkeypatch.setenv("DATABASE_URL", "postgresql://user:pass@localhost/test")
monkeypatch.setenv("AUDIT_JSONL_DIR", str(tmp_path))
m = _reload_audit_store()
store = m._create_store()
assert isinstance(store, m.AutoAuditStore)
def test_auto_without_dsn_creates_jsonl_store(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUDIT_BACKEND", "auto")
monkeypatch.delenv("DATABASE_URL", raising=False)
monkeypatch.delenv("POSTGRES_DSN", raising=False)
monkeypatch.setenv("AUDIT_JSONL_DIR", str(tmp_path))
m = _reload_audit_store()
store = m._create_store()
assert isinstance(store, m.JsonlAuditStore)
def test_postgres_backend_creates_pg_store(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUDIT_BACKEND", "postgres")
monkeypatch.setenv("DATABASE_URL", "postgresql://user:pass@localhost/test")
monkeypatch.setenv("AUDIT_JSONL_DIR", str(tmp_path))
m = _reload_audit_store()
store = m._create_store()
assert isinstance(store, m.PostgresAuditStore)
def test_null_backend(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUDIT_BACKEND", "null")
m = _reload_audit_store()
store = m._create_store()
assert isinstance(store, m.NullAuditStore)
def test_jsonl_backend_default(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUDIT_BACKEND", "jsonl")
monkeypatch.setenv("AUDIT_JSONL_DIR", str(tmp_path))
m = _reload_audit_store()
store = m._create_store()
assert isinstance(store, m.JsonlAuditStore)
# ─── 4. Thread-safety: concurrent writes don't crash ─────────────────────────
class TestAutoAuditStoreThreadSafety:
def test_concurrent_writes_no_exception(self, tmp_path):
import audit_store as m
pg_mock = MagicMock()
call_count = [0]
lock = threading.Lock()
def side_effect(ev):
with lock:
call_count[0] += 1
# Fail every 3rd call to simulate intermittent error
if call_count[0] % 3 == 0:
raise ConnectionError("intermittent")
pg_mock.write = MagicMock(side_effect=side_effect)
store = m.AutoAuditStore(pg_dsn="postgresql://test/test", jsonl_dir=str(tmp_path))
store._primary = pg_mock
errors = []
def write_n(n: int):
for _ in range(n):
try:
store.write(_make_event())
except Exception as exc:
errors.append(exc)
threads = [threading.Thread(target=write_n, args=(20,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Unexpected exceptions: {errors}"