Files
microdao-daarion/tests/test_sofiia_docs.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

3225 lines
155 KiB
Python

"""
tests/test_sofiia_docs.py
Unit tests for sofiia-console Projects/Documents/Sessions/Dialog Map.
Tests:
- DB: projects CRUD
- DB: documents CRUD + SHA-256 stability
- DB: sessions upsert + turn count
- DB: messages + parent_msg_id branching
- DB: dialog map nodes/edges
- DB: fork_session copies ancestors
- API: upload size limits config
- API: mime validation (allowed/blocked)
- API: search documents (keyword)
- API: session fork returns new_session_id
"""
import asyncio
import json
import os
import sys
import tempfile
import unittest
import uuid
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# ── path setup ──────────────────────────────────────────────────────────────
_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_ROOT / "services" / "sofiia-console"))
# Use a temp file DB for tests
_TMP_DIR = tempfile.mkdtemp(prefix="sofiia_test_")
os.environ["SOFIIA_DATA_DIR"] = _TMP_DIR
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
# ── Import after env setup ───────────────────────────────────────────────────
try:
import aiosqlite # noqa — ensure available
# Try to import db module directly (may fail without full app context)
try:
from app import db as _db_module
_DB_AVAILABLE = True
except ImportError:
# Import directly from file path
import importlib.util
_spec = importlib.util.spec_from_file_location(
"sofiia_db",
str(_ROOT / "services" / "sofiia-console" / "app" / "db.py"),
)
_db_module = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_db_module)
_DB_AVAILABLE = True
_AIOSQLITE_AVAILABLE = True
except (ImportError, Exception) as _e:
_AIOSQLITE_AVAILABLE = False
_DB_AVAILABLE = False
_db_module = None
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestProjectsCRUD(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
_db_module._db_conn = None # reset connection
await _db_module.init_db()
self._db = _db_module
async def asyncTearDown(self):
await self._db.close_db()
async def test_create_and_get_project(self):
p = await self._db.create_project("Test Project", "desc")
self.assertIn("project_id", p)
self.assertEqual(p["name"], "Test Project")
fetched = await self._db.get_project(p["project_id"])
self.assertIsNotNone(fetched)
self.assertEqual(fetched["name"], "Test Project")
async def test_list_projects_includes_default(self):
projects = await self._db.list_projects()
ids = [p["project_id"] for p in projects]
self.assertIn("default", ids, "Default project must always exist")
async def test_update_project(self):
p = await self._db.create_project("Old Name")
ok = await self._db.update_project(p["project_id"], name="New Name")
self.assertTrue(ok)
updated = await self._db.get_project(p["project_id"])
self.assertEqual(updated["name"], "New Name")
async def test_get_nonexistent_project_returns_none(self):
result = await self._db.get_project("nonexistent_xyz")
self.assertIsNone(result)
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestDocumentsCRUD(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
async def asyncTearDown(self):
await self._db.close_db()
async def test_create_document(self):
doc = await self._db.create_document(
project_id="default",
file_id="abc123def456",
sha256="a" * 64,
mime="application/pdf",
size_bytes=1024,
filename="test.pdf",
title="My Test Doc",
tags=["invoice", "2026"],
extracted_text="Sample text content",
)
self.assertIn("doc_id", doc)
self.assertEqual(doc["filename"], "test.pdf")
self.assertEqual(doc["tags"], ["invoice", "2026"])
async def test_sha256_stability(self):
"""SHA-256 must be stored exactly as given (no mutation)."""
sha = "b" * 64
doc = await self._db.create_document(
"default", "fid", sha, "text/plain", 100, "file.txt"
)
fetched = await self._db.get_document(doc["doc_id"])
self.assertEqual(fetched["sha256"], sha)
async def test_list_documents_by_project(self):
p = await self._db.create_project("DocProject")
await self._db.create_document(p["project_id"], "f1", "c"*64, "text/plain", 10, "a.txt")
await self._db.create_document(p["project_id"], "f2", "d"*64, "text/plain", 20, "b.txt")
docs = await self._db.list_documents(p["project_id"])
self.assertEqual(len(docs), 2)
async def test_search_documents_by_title(self):
p = await self._db.create_project("SearchProject")
await self._db.create_document(p["project_id"], "f1", "e"*64, "text/plain", 10, "budget.txt",
title="Annual Budget 2026")
await self._db.create_document(p["project_id"], "f2", "f"*64, "text/plain", 10, "report.txt",
title="Monthly Report")
results = await self._db.search_documents(p["project_id"], "Budget")
self.assertEqual(len(results), 1)
self.assertIn("budget.txt", results[0]["filename"])
async def test_get_document_wrong_project(self):
doc = await self._db.create_document(
"default", "gid", "g"*64, "text/plain", 5, "test.txt"
)
fetched = await self._db.get_document(doc["doc_id"])
self.assertIsNotNone(fetched)
# Simulating a "wrong project" check (as done in the API endpoint)
self.assertNotEqual(fetched["project_id"], "nonexistent_project")
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestSessionsAndMessages(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
async def asyncTearDown(self):
await self._db.close_db()
async def test_upsert_session_creates(self):
s = await self._db.upsert_session("sess_test_001", project_id="default", title="Test Session")
self.assertEqual(s["session_id"], "sess_test_001")
self.assertEqual(s["title"], "Test Session")
async def test_upsert_session_updates_last_active(self):
await self._db.upsert_session("sess_002", project_id="default")
s2 = await self._db.upsert_session("sess_002", project_id="default")
self.assertEqual(s2["session_id"], "sess_002")
async def test_save_message_and_retrieve(self):
await self._db.upsert_session("sess_003", project_id="default")
m = await self._db.save_message("sess_003", "user", "Hello Sofiia")
self.assertIn("msg_id", m)
self.assertEqual(m["role"], "user")
self.assertEqual(m["content"], "Hello Sofiia")
async def test_message_branching_parent_msg_id(self):
await self._db.upsert_session("sess_branch", project_id="default")
m1 = await self._db.save_message("sess_branch", "user", "First message")
m2 = await self._db.save_message("sess_branch", "assistant", "First reply", parent_msg_id=m1["msg_id"])
# Fork from m1
m3 = await self._db.save_message("sess_branch", "user", "Branch question", parent_msg_id=m1["msg_id"], branch_label="branch-1")
msgs = await self._db.list_messages("sess_branch", limit=10)
self.assertEqual(len(msgs), 3)
branch_msgs = [m for m in msgs if m["branch_label"] == "branch-1"]
self.assertEqual(len(branch_msgs), 1)
self.assertEqual(branch_msgs[0]["parent_msg_id"], m1["msg_id"])
async def test_turn_count_increments(self):
await self._db.upsert_session("sess_count", project_id="default")
for i in range(3):
await self._db.save_message("sess_count", "user", f"Message {i}")
s = await self._db.get_session("sess_count")
self.assertGreaterEqual(s["turn_count"], 3)
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestDialogMap(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
async def asyncTearDown(self):
await self._db.close_db()
async def test_dialog_map_nodes_and_edges(self):
await self._db.upsert_session("sess_map", project_id="default")
m1 = await self._db.save_message("sess_map", "user", "Hi there")
m2 = await self._db.save_message("sess_map", "assistant", "Hello!", parent_msg_id=m1["msg_id"])
m3 = await self._db.save_message("sess_map", "user", "Follow-up", parent_msg_id=m2["msg_id"])
dmap = await self._db.get_dialog_map("sess_map")
self.assertEqual(dmap["session_id"], "sess_map")
self.assertEqual(len(dmap["nodes"]), 3)
self.assertEqual(len(dmap["edges"]), 2) # m1→m2, m2→m3
async def test_dialog_map_empty_session(self):
await self._db.upsert_session("sess_empty_map", project_id="default")
dmap = await self._db.get_dialog_map("sess_empty_map")
self.assertEqual(dmap["nodes"], [])
self.assertEqual(dmap["edges"], [])
async def test_dialog_map_node_structure(self):
await self._db.upsert_session("sess_map2", project_id="default")
m = await self._db.save_message("sess_map2", "user", "Test node structure")
dmap = await self._db.get_dialog_map("sess_map2")
node = dmap["nodes"][0]
self.assertIn("id", node)
self.assertIn("role", node)
self.assertIn("preview", node)
self.assertIn("ts", node)
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestForkSession(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
async def asyncTearDown(self):
await self._db.close_db()
async def test_fork_creates_new_session(self):
await self._db.upsert_session("sess_src", project_id="default")
m1 = await self._db.save_message("sess_src", "user", "Message 1")
m2 = await self._db.save_message("sess_src", "assistant", "Reply 1", parent_msg_id=m1["msg_id"])
m3 = await self._db.save_message("sess_src", "user", "Message 2", parent_msg_id=m2["msg_id"])
result = await self._db.fork_session("sess_src", from_msg_id=m2["msg_id"], new_title="Fork Test")
self.assertIn("new_session_id", result)
self.assertNotEqual(result["new_session_id"], "sess_src")
self.assertGreaterEqual(result["copied_turns"], 2) # m1 and m2 are ancestors
async def test_fork_messages_are_independent(self):
await self._db.upsert_session("sess_src2", project_id="default")
m1 = await self._db.save_message("sess_src2", "user", "Original message")
result = await self._db.fork_session("sess_src2", from_msg_id=m1["msg_id"])
new_sid = result["new_session_id"]
# New session exists
s = await self._db.get_session(new_sid)
self.assertIsNotNone(s)
# Modifying original doesn't affect fork
await self._db.save_message("sess_src2", "user", "New in original")
new_msgs = await self._db.list_messages(new_sid)
src_msgs = await self._db.list_messages("sess_src2")
self.assertLess(len(new_msgs), len(src_msgs))
class TestUploadSizeLimits(unittest.TestCase):
"""Upload size limit configuration tests (no DB needed)."""
def _get_docs_router_module(self):
"""Load docs_router module directly from filesystem."""
try:
import importlib.util
spec = importlib.util.spec_from_file_location(
"sofiia_docs_router",
str(_ROOT / "services" / "sofiia-console" / "app" / "docs_router.py"),
)
mod = importlib.util.module_from_spec(spec)
# Pre-populate with dummy deps to avoid ImportError
import types
dummy = types.ModuleType("app.db")
sys.modules.setdefault("app", types.ModuleType("app"))
sys.modules["app.db"] = dummy
spec.loader.exec_module(mod)
return mod
except Exception:
return None
def _get_docs_router_limits(self):
"""Load docs_router module and check env-based limit defaults."""
mod = self._get_docs_router_module()
if mod:
return getattr(mod, "_MAX_IMAGE_MB", 10), getattr(mod, "_MAX_VIDEO_MB", 200), getattr(mod, "_MAX_DOC_MB", 50)
return 10, 200, 50
def test_default_image_limit_10mb(self):
img, vid, doc = self._get_docs_router_limits()
self.assertEqual(img, 10)
def test_default_video_limit_200mb(self):
img, vid, doc = self._get_docs_router_limits()
self.assertEqual(vid, 200)
def test_default_doc_limit_50mb(self):
img, vid, doc = self._get_docs_router_limits()
self.assertEqual(doc, 50)
def test_allowed_mimes_includes_pdf(self):
mod = self._get_docs_router_module()
if not mod:
self.skipTest("docs_router not importable")
self.assertIn("application/pdf", mod._ALLOWED_MIMES)
def test_allowed_mimes_includes_images(self):
mod = self._get_docs_router_module()
if not mod:
self.skipTest("docs_router not importable")
self.assertIn("image/jpeg", mod._ALLOWED_MIMES)
self.assertIn("image/png", mod._ALLOWED_MIMES)
def test_allowed_mimes_excludes_executables(self):
mod = self._get_docs_router_module()
if not mod:
self.skipTest("docs_router not importable")
self.assertNotIn("application/x-executable", mod._ALLOWED_MIMES)
self.assertNotIn("application/x-sh", mod._ALLOWED_MIMES)
class TestSafeFilename(unittest.TestCase):
"""Filename sanitization tests."""
def _get_safe_filename(self):
mod = TestUploadSizeLimits()._get_docs_router_module()
return getattr(mod, "_safe_filename", None) if mod else None
def test_safe_filename_strips_path(self):
fn = self._get_safe_filename()
if not fn:
self.skipTest("docs_router not importable")
self.assertEqual(fn("../../../etc/passwd"), "passwd")
self.assertEqual(fn("/absolute/path/file.txt"), "file.txt")
def test_safe_filename_removes_dangerous_chars(self):
fn = self._get_safe_filename()
if not fn:
self.skipTest("docs_router not importable")
result = fn("file; rm -rf /; .txt")
self.assertNotIn(";", result)
self.assertNotIn(" ", result)
def test_safe_filename_preserves_extension(self):
fn = self._get_safe_filename()
if not fn:
self.skipTest("docs_router not importable")
self.assertTrue(fn("report.pdf").endswith(".pdf"))
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestTasksCRUD(unittest.IsolatedAsyncioTestCase):
"""Tests for Tasks (Kanban) persistence layer."""
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
# ensure test project
await self._db.create_project("TaskProject")
projects = await self._db.list_projects()
self._pid = next(p["project_id"] for p in projects if p["name"] == "TaskProject")
async def asyncTearDown(self):
await self._db.close_db()
async def test_create_task(self):
task = await self._db.create_task(self._pid, "Fix the bug", description="Critical bug", priority="high")
self.assertIn("task_id", task)
self.assertEqual(task["title"], "Fix the bug")
self.assertEqual(task["status"], "backlog")
self.assertEqual(task["priority"], "high")
async def test_list_tasks_by_project(self):
await self._db.create_task(self._pid, "Task A")
await self._db.create_task(self._pid, "Task B", status="in_progress")
tasks = await self._db.list_tasks(self._pid)
titles = [t["title"] for t in tasks]
self.assertIn("Task A", titles)
self.assertIn("Task B", titles)
async def test_list_tasks_filtered_by_status(self):
await self._db.create_task(self._pid, "Done task", status="done")
await self._db.create_task(self._pid, "Backlog task", status="backlog")
done = await self._db.list_tasks(self._pid, status="done")
self.assertTrue(all(t["status"] == "done" for t in done))
async def test_update_task_status(self):
task = await self._db.create_task(self._pid, "Moveable task")
ok = await self._db.update_task(task["task_id"], status="in_progress")
self.assertTrue(ok)
updated = await self._db.get_task(task["task_id"])
self.assertEqual(updated["status"], "in_progress")
async def test_delete_task(self):
task = await self._db.create_task(self._pid, "Deletable task")
ok = await self._db.delete_task(task["task_id"])
self.assertTrue(ok)
fetched = await self._db.get_task(task["task_id"])
self.assertIsNone(fetched)
async def test_task_labels_round_trip(self):
task = await self._db.create_task(self._pid, "Labeled task", labels=["bug", "ui", "P1"])
fetched = await self._db.get_task(task["task_id"])
self.assertEqual(fetched["labels"], ["bug", "ui", "P1"])
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestMeetingsCRUD(unittest.IsolatedAsyncioTestCase):
"""Tests for Meetings persistence layer."""
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
p = await self._db.create_project("MeetingProject")
self._pid = p["project_id"]
async def asyncTearDown(self):
await self._db.close_db()
async def test_create_meeting(self):
m = await self._db.create_meeting(
self._pid, "Sprint Planning", "2026-03-01T10:00:00Z",
agenda="Goals and backlog review", duration_min=60,
)
self.assertIn("meeting_id", m)
self.assertEqual(m["title"], "Sprint Planning")
self.assertEqual(m["duration_min"], 60)
async def test_list_meetings(self):
await self._db.create_meeting(self._pid, "Meeting A", "2026-03-01T09:00:00Z")
await self._db.create_meeting(self._pid, "Meeting B", "2026-03-02T10:00:00Z")
meetings = await self._db.list_meetings(self._pid)
self.assertEqual(len(meetings), 2)
# Should be sorted by starts_at ASC
self.assertLess(meetings[0]["starts_at"], meetings[1]["starts_at"])
async def test_update_meeting(self):
m = await self._db.create_meeting(self._pid, "Old Title", "2026-03-01T10:00:00Z")
ok = await self._db.update_meeting(m["meeting_id"], title="New Title", duration_min=90)
self.assertTrue(ok)
updated = await self._db.get_meeting(m["meeting_id"])
self.assertEqual(updated["title"], "New Title")
self.assertEqual(updated["duration_min"], 90)
async def test_attendees_round_trip(self):
attendees = ["user@a.com", "user@b.com"]
m = await self._db.create_meeting(self._pid, "Team sync", "2026-03-03T14:00:00Z",
attendees=attendees)
fetched = await self._db.get_meeting(m["meeting_id"])
self.assertEqual(fetched["attendees"], attendees)
async def test_delete_meeting(self):
m = await self._db.create_meeting(self._pid, "Deletable", "2026-03-10T10:00:00Z")
ok = await self._db.delete_meeting(m["meeting_id"])
self.assertTrue(ok)
fetched = await self._db.get_meeting(m["meeting_id"])
self.assertIsNone(fetched)
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestDialogGraph(unittest.IsolatedAsyncioTestCase):
"""Tests for Dialog Map graph (dialog_nodes + dialog_edges)."""
async def asyncSetUp(self):
_db_module._db_conn = None
await _db_module.init_db()
self._db = _db_module
p = await self._db.create_project("GraphProject")
self._pid = p["project_id"]
async def asyncTearDown(self):
await self._db.close_db()
async def test_upsert_dialog_node_creates(self):
node = await self._db.upsert_dialog_node(
self._pid, "task", "task_001", title="My task", summary="Do something"
)
self.assertIn("node_id", node)
self.assertEqual(node["node_type"], "task")
self.assertEqual(node["ref_id"], "task_001")
async def test_upsert_dialog_node_deduplicates(self):
n1 = await self._db.upsert_dialog_node(self._pid, "doc", "doc_001", title="First")
n2 = await self._db.upsert_dialog_node(self._pid, "doc", "doc_001", title="Updated")
# Same ref_id → same node_id
self.assertEqual(n1["node_id"], n2["node_id"])
# Title should be updated
self.assertEqual(n2["title"], "Updated")
async def test_create_dialog_edge(self):
n1 = await self._db.upsert_dialog_node(self._pid, "message", "msg_001")
n2 = await self._db.upsert_dialog_node(self._pid, "task", "task_002")
edge = await self._db.create_dialog_edge(
self._pid, n1["node_id"], n2["node_id"], "derives_task"
)
self.assertIn("edge_id", edge)
self.assertEqual(edge["edge_type"], "derives_task")
async def test_get_project_dialog_map(self):
n1 = await self._db.upsert_dialog_node(self._pid, "message", "msg_map_001", title="Hello")
n2 = await self._db.upsert_dialog_node(self._pid, "task", "task_map_001", title="Do it")
await self._db.create_dialog_edge(self._pid, n1["node_id"], n2["node_id"], "derives_task")
graph = await self._db.get_project_dialog_map(self._pid)
self.assertIn("nodes", graph)
self.assertIn("edges", graph)
self.assertGreaterEqual(graph["node_count"], 2)
self.assertGreaterEqual(graph["edge_count"], 1)
async def test_no_self_loop_edges(self):
n = await self._db.upsert_dialog_node(self._pid, "goal", "goal_001", title="Self loop test")
# Self-loop should silently fail (SQLite CHECK constraint)
edge = await self._db.create_dialog_edge(
self._pid, n["node_id"], n["node_id"], "references"
)
# Edge won't be in the graph (self-loop blocked)
graph = await self._db.get_project_dialog_map(self._pid)
self_loops = [e for e in graph["edges"] if e["from_node_id"] == e["to_node_id"]]
self.assertEqual(len(self_loops), 0)
async def test_entity_link_created(self):
link = await self._db.create_entity_link(
self._pid, "message", "msg_x", "task", "task_x", "derives_task"
)
self.assertIn("link_id", link)
self.assertEqual(link["link_type"], "derives_task")
async def test_doc_version_round_trip(self):
# Create a dummy document first
doc = await self._db.create_document(
self._pid, "f_ver", "v"*64, "text/plain", 100, "versioned.txt",
extracted_text="original content"
)
v = await self._db.save_doc_version(doc["doc_id"], "new content v2", author_id="test_user")
self.assertIn("version_id", v)
content = await self._db.get_doc_version_content(v["version_id"])
self.assertEqual(content, "new content v2")
versions = await self._db.list_doc_versions(doc["doc_id"])
self.assertGreaterEqual(len(versions), 1)
async def test_dialog_view_upsert(self):
view = await self._db.upsert_dialog_view(
self._pid, "default",
filters={"node_types": ["task", "doc"]},
layout={"zoom": 1.0, "pan": [0, 0]},
)
self.assertEqual(view["name"], "default")
self.assertIn("task", view["filters"].get("node_types", []))
# Upsert again (update)
view2 = await self._db.upsert_dialog_view(self._pid, "default", layout={"zoom": 2.0})
self.assertEqual(view2["layout"].get("zoom"), 2.0)
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
class TestTransactionalIntegrity(unittest.IsolatedAsyncioTestCase):
"""Graph Contract: every artifact creation is atomic with its dialog_node."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
self._pid = f"tx_proj_{uuid.uuid4().hex[:8]}"
await self._db.init_db()
await self._db.create_project("TX Test Project", project_id=self._pid)
async def asyncTearDown(self):
pass # Keep DB open across test classes (shared global connection)
async def test_create_task_creates_node_atomically(self):
"""create_task must produce task + dialog_node in one transaction."""
task = await self._db.create_task(
self._pid, "Atomic Task", description="desc", created_by="test"
)
self.assertIn("node_id", task, "create_task must return node_id")
self.assertIsNotNone(task["node_id"])
graph = await self._db.get_project_dialog_map(self._pid)
task_nodes = [n for n in graph["nodes"] if n["node_type"] == "task" and n["ref_id"] == task["task_id"]]
self.assertEqual(len(task_nodes), 1, "Task node must be in dialog map")
async def test_create_meeting_creates_node_atomically(self):
"""create_meeting must produce meeting + dialog_node atomically."""
meeting = await self._db.create_meeting(
self._pid, "Atomic Meeting", starts_at="2026-03-01T10:00:00Z", created_by="test"
)
self.assertIn("node_id", meeting)
graph = await self._db.get_project_dialog_map(self._pid)
m_nodes = [n for n in graph["nodes"] if n["node_type"] == "meeting" and n["ref_id"] == meeting["meeting_id"]]
self.assertEqual(len(m_nodes), 1, "Meeting node must be in dialog map")
async def test_create_task_with_source_msg_creates_edge(self):
"""create_task with source_msg_id must create derives_task edge."""
msg_id = f"msg_{uuid.uuid4().hex[:8]}"
task = await self._db.create_task(
self._pid, "Task from msg", source_msg_id=msg_id, created_by="test"
)
graph = await self._db.get_project_dialog_map(self._pid)
derives_edges = [
e for e in graph["edges"]
if e["edge_type"] == "derives_task" and e["to_node_id"] == task["node_id"]
]
self.assertGreaterEqual(len(derives_edges), 1, "Must have derives_task edge from message")
async def test_create_meeting_with_source_msg_creates_edge(self):
"""create_meeting with source_msg_id must create schedules_meeting edge."""
msg_id = f"msg_{uuid.uuid4().hex[:8]}"
meeting = await self._db.create_meeting(
self._pid, "Meeting from msg",
starts_at="2026-03-02T15:00:00Z",
source_msg_id=msg_id,
created_by="test",
)
graph = await self._db.get_project_dialog_map(self._pid)
sched_edges = [
e for e in graph["edges"]
if e["edge_type"] == "schedules_meeting" and e["to_node_id"] == meeting["node_id"]
]
self.assertGreaterEqual(len(sched_edges), 1, "Must have schedules_meeting edge from message")
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
class TestGraphIntegrity(unittest.IsolatedAsyncioTestCase):
"""Graph Contract: check_graph_integrity detects violations."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
self._pid = f"integrity_proj_{uuid.uuid4().hex[:8]}"
await self._db.init_db()
await self._db.create_project("Integrity Test", project_id=self._pid)
async def asyncTearDown(self):
pass # Keep DB open
async def test_clean_project_passes_integrity(self):
"""A freshly created project with proper artifacts should pass."""
await self._db.create_task(self._pid, "Clean task")
await self._db.create_meeting(self._pid, "Clean meeting", starts_at="2026-04-01T09:00:00Z")
result = await self._db.check_graph_integrity(self._pid)
self.assertTrue(result["ok"], f"Integrity must pass, violations: {result['violations']}")
self.assertEqual(result["violations"], [])
self.assertGreaterEqual(result["stats"]["node_count"], 2)
async def test_integrity_detects_dangling_task_node(self):
"""Manually inserted task node without matching task row should be detected."""
db = await self._db.get_db()
fake_task_id = f"fake_{uuid.uuid4().hex}"
node_id = str(uuid.uuid4())
now = "2026-01-01T00:00:00Z"
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,summary,props,created_by,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(node_id, self._pid, "task", fake_task_id, "Orphan", "", "{}", "test", now, now),
)
await db.commit()
result = await self._db.check_graph_integrity(self._pid)
self.assertFalse(result["ok"], "Should detect dangling task node")
violation_types = [v["type"] for v in result["violations"]]
self.assertIn("dangling_task_nodes", violation_types)
async def test_no_self_loops_after_operations(self):
"""After normal CRUD operations, there must be no self-loop edges."""
task = await self._db.create_task(self._pid, "Loop check task")
meeting = await self._db.create_meeting(
self._pid, "Loop check meeting", starts_at="2026-05-01T08:00:00Z"
)
await self._db.create_dialog_edge(
self._pid, task["node_id"], meeting["node_id"], "relates_to"
)
result = await self._db.check_graph_integrity(self._pid)
self.assertGreaterEqual(result["stats"]["edge_count"], 1)
loop_violations = [v for v in result["violations"] if v["type"] == "self_loop_edges"]
self.assertEqual(loop_violations, [])
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
class TestEvidencePackEngine(unittest.IsolatedAsyncioTestCase):
"""Evidence Pack Engine: Supervisor run → node + tasks + edges atomically."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
self._pid = f"evidence_proj_{uuid.uuid4().hex[:8]}"
await self._db.init_db()
await self._db.create_project("Evidence Test", project_id=self._pid)
async def asyncTearDown(self):
pass # Keep DB open
async def test_evidence_pack_creates_agent_run_node(self):
"""create_evidence_pack must create an agent_run node."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
pack = await self._db.create_evidence_pack(
project_id=self._pid,
run_id=run_id,
graph_name="release_check",
result_data={"status": "completed", "summary": "All checks passed"},
)
self.assertTrue(pack["ok"])
self.assertIsNotNone(pack["node_id"])
graph = await self._db.get_project_dialog_map(self._pid)
run_nodes = [n for n in graph["nodes"] if n["node_type"] == "agent_run" and n["ref_id"] == run_id]
self.assertEqual(len(run_nodes), 1, "agent_run node must be in dialog map")
async def test_evidence_pack_creates_follow_up_tasks(self):
"""create_evidence_pack with follow_up_tasks must create tasks + produced_by edges."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
pack = await self._db.create_evidence_pack(
project_id=self._pid,
run_id=run_id,
graph_name="incident_triage",
result_data={
"status": "completed",
"follow_up_tasks": [
{"title": "Fix DB index", "priority": "high"},
{"title": "Update runbook", "priority": "normal"},
],
},
)
self.assertEqual(pack["tasks_created"], 2)
self.assertEqual(len(pack["task_ids"]), 2)
tasks = await self._db.list_tasks(self._pid)
task_titles = {t["title"] for t in tasks}
self.assertIn("Fix DB index", task_titles)
self.assertIn("Update runbook", task_titles)
graph = await self._db.get_project_dialog_map(self._pid)
produced_edges = [e for e in graph["edges"] if e["edge_type"] == "produced_by"]
self.assertEqual(len(produced_edges), 2, "Must have produced_by edges for each task")
async def test_evidence_pack_idempotent_on_rerun(self):
"""Re-recording same run_id must not duplicate nodes (ON CONFLICT DO UPDATE)."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
pack1 = await self._db.create_evidence_pack(
self._pid, run_id, "release_check",
{"status": "completed", "summary": "First run"}
)
pack2 = await self._db.create_evidence_pack(
self._pid, run_id, "release_check",
{"status": "completed", "summary": "Updated summary"}
)
self.assertEqual(pack1["node_id"], pack2["node_id"], "Node ID must be stable on re-run")
async def test_full_integrity_after_evidence_pack(self):
"""After creating an evidence pack, integrity check must still pass."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._db.create_evidence_pack(
self._pid, run_id, "postmortem_draft",
result_data={
"status": "completed",
"follow_up_tasks": [{"title": "Write postmortem", "priority": "urgent"}],
},
)
result = await self._db.check_graph_integrity(self._pid)
self.assertTrue(result["ok"], f"Integrity must pass after evidence pack: {result['violations']}")
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
class TestGraphHygiene(unittest.IsolatedAsyncioTestCase):
"""Graph Hygiene Engine: fingerprints, dedup, lifecycle, importance."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
self._pid = f"hygiene_proj_{uuid.uuid4().hex[:8]}"
await self._db.init_db()
await self._db.create_project("Hygiene Test", project_id=self._pid)
async def asyncTearDown(self):
await self._db.close_db()
async def test_importance_baseline_scores(self):
"""Base importance scores must match contract values."""
self.assertAlmostEqual(self._db._compute_importance("decision"), 0.95, places=2)
self.assertAlmostEqual(self._db._compute_importance("goal"), 0.90, places=2)
self.assertAlmostEqual(self._db._compute_importance("task"), 0.70, places=2)
self.assertAlmostEqual(self._db._compute_importance("message"), 0.15, places=2)
# Done task halved
self.assertAlmostEqual(self._db._compute_importance("task", task_status="done"), 0.35, places=2)
async def test_importance_lifecycle_multiplier(self):
"""Archived and superseded nodes must have reduced importance."""
active = self._db._compute_importance("decision", lifecycle="active")
superseded = self._db._compute_importance("decision", lifecycle="superseded")
archived = self._db._compute_importance("decision", lifecycle="archived")
self.assertGreater(active, superseded)
self.assertGreater(superseded, archived)
async def test_importance_bump_factors(self):
"""High risk and pinned nodes get importance bumps."""
base = self._db._compute_importance("task")
with_risk = self._db._compute_importance("task", risk_level="high")
pinned = self._db._compute_importance("task", pinned=True)
self.assertGreater(with_risk, base)
self.assertGreater(pinned, base)
async def test_fingerprint_is_deterministic(self):
"""Same title+summary must always produce same fingerprint."""
fp1 = self._db._compute_fingerprint("task", "Fix DB index", "Critical bug")
fp2 = self._db._compute_fingerprint("task", "Fix DB index", "Critical bug")
fp3 = self._db._compute_fingerprint("task", " Fix DB Index ", "critical bug") # normalized
self.assertEqual(fp1, fp2)
self.assertEqual(fp1, fp3) # lowercase + strip normalization
async def test_fingerprint_differs_for_different_content(self):
"""Different titles must produce different fingerprints."""
fp1 = self._db._compute_fingerprint("task", "Fix index", "")
fp2 = self._db._compute_fingerprint("task", "Deploy service", "")
self.assertNotEqual(fp1, fp2)
async def test_hygiene_dry_run_detects_duplicates(self):
"""Dry-run must find duplicates without writing changes."""
# Create two tasks with same title (will get same fingerprint)
t1 = await self._db.create_task(self._pid, "Duplicate Decision", created_by="test")
# Manually insert second node with same fingerprint-equivalent title
db = await self._db.get_db()
n2_id = str(uuid.uuid4())
now = "2025-01-01T00:00:00Z"
fp = self._db._compute_fingerprint("task", "Duplicate Decision", "")
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,summary,props,fingerprint,lifecycle,importance,created_by,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
(n2_id, self._pid, "task", "fake_task_dup", "Duplicate Decision", "", "{}", fp, "active", 0.7, "test", now, now),
)
await db.commit()
result = await self._db.run_graph_hygiene(self._pid, dry_run=True)
self.assertTrue(result["ok"])
self.assertTrue(result["dry_run"])
# Must find the duplicate
self.assertGreater(result["stats"]["duplicates_found"], 0, "Should detect duplicates")
# Dry-run: no changes applied
archived_changes = [c for c in result["changes"] if c["action"] == "archive_duplicate"]
self.assertGreater(len(archived_changes), 0)
# Node lifecycle must NOT be changed (dry_run=True)
async with db.execute("SELECT lifecycle FROM dialog_nodes WHERE node_id=?", (n2_id,)) as cur:
row = await cur.fetchone()
self.assertEqual(row[0], "active", "Dry-run must not modify lifecycle")
async def test_hygiene_apply_archives_duplicates(self):
"""Non-dry-run hygiene must archive duplicate nodes."""
# Create two nodes with identical fingerprint-equivalent titles
t1 = await self._db.create_task(self._pid, "Archive Dup Task", created_by="test")
db = await self._db.get_db()
n2_id = str(uuid.uuid4())
fp = self._db._compute_fingerprint("task", "Archive Dup Task", "")
# older created_at → will be archived (canonical = latest)
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,summary,props,fingerprint,lifecycle,importance,created_by,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
(n2_id, self._pid, "task", "fake_dup2", "Archive Dup Task", "", "{}", fp, "active", 0.7, "test", "2024-01-01T00:00:00Z", "2024-01-01T00:00:00Z"),
)
await db.commit()
result = await self._db.run_graph_hygiene(self._pid, dry_run=False)
self.assertFalse(result["dry_run"])
self.assertGreater(result["stats"]["archived"], 0, "Should archive duplicates")
# The older node must now be archived
async with db.execute("SELECT lifecycle FROM dialog_nodes WHERE node_id=?", (n2_id,)) as cur:
row = await cur.fetchone()
self.assertIn(row[0], ("archived", "superseded"), "Duplicate must be archived/superseded")
async def test_hygiene_idempotent(self):
"""Running hygiene twice must not create new violations."""
await self._db.create_task(self._pid, "Idempotent Task")
r1 = await self._db.run_graph_hygiene(self._pid, dry_run=False)
r2 = await self._db.run_graph_hygiene(self._pid, dry_run=False)
# Second run should find no new duplicates to archive
self.assertEqual(r2["stats"]["archived"], 0, "Second hygiene run must be idempotent")
async def test_hygiene_recomputes_importance(self):
"""Hygiene must update importance for nodes without it set."""
# Create task node and manually clear its importance
task = await self._db.create_task(self._pid, "Importance Test Task")
db = await self._db.get_db()
await db.execute("UPDATE dialog_nodes SET importance=0.0 WHERE node_id=?", (task["node_id"],))
await db.commit()
result = await self._db.run_graph_hygiene(self._pid, dry_run=False)
importance_changes = [c for c in result["changes"] if c["action"] == "update_importance"]
self.assertGreater(len(importance_changes), 0, "Hygiene must recompute importance")
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
class TestSelfReflection(unittest.IsolatedAsyncioTestCase):
"""Self-Reflection Engine: supervisor run analysis."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
self._pid = f"reflect_proj_{uuid.uuid4().hex[:8]}"
await self._db.init_db()
await self._db.create_project("Reflection Test", project_id=self._pid)
async def asyncTearDown(self):
await self._db.close_db()
async def _create_run(self, run_id: str, graph: str = "release_check") -> dict:
"""Helper: create evidence pack (agent_run node) first."""
return await self._db.create_evidence_pack(
self._pid, run_id, graph,
result_data={"status": "completed", "summary": f"Run {run_id[:8]}"},
)
async def test_reflection_creates_decision_node(self):
"""create_run_reflection must create a decision node."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._create_run(run_id)
result = await self._db.create_run_reflection(
self._pid, run_id,
evidence_data={
"summary": "Release checks passed",
"findings": [{"name": "tests", "status": "pass"}, {"name": "lint", "status": "pass"}],
},
)
self.assertTrue(result["ok"])
self.assertIsNotNone(result["node_id"])
graph = await self._db.get_project_dialog_map(self._pid)
reflection_nodes = [n for n in graph["nodes"] if n["node_type"] == "decision" and "reflection" in n.get("ref_id", "")]
self.assertGreaterEqual(len(reflection_nodes), 1)
async def test_reflection_links_to_agent_run(self):
"""Reflection must create reflects_on edge to agent_run node."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
pack = await self._create_run(run_id)
result = await self._db.create_run_reflection(self._pid, run_id, evidence_data={})
self.assertIsNotNone(result["edge_id"])
graph = await self._db.get_project_dialog_map(self._pid)
reflects_edges = [e for e in graph["edges"] if e["edge_type"] == "reflects_on"]
self.assertGreaterEqual(len(reflects_edges), 1)
async def test_reflection_scores_completeness(self):
"""Reflection must compute plan_completeness_score from findings."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._create_run(run_id)
result = await self._db.create_run_reflection(
self._pid, run_id,
evidence_data={
"findings": [
{"name": "a", "status": "pass"},
{"name": "b", "status": "pass"},
{"name": "c", "status": "fail"},
{"name": "d", "status": "pass"},
],
},
)
refl = result["reflection"]
# 3/4 passed = 0.75
self.assertAlmostEqual(refl["plan_completeness_score"], 0.75, places=2)
self.assertEqual(refl["confidence"], "medium")
self.assertEqual(len(refl["open_risks"]), 1)
async def test_reflection_creates_risk_tasks(self):
"""Failed findings must auto-create risk tasks."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._create_run(run_id)
result = await self._db.create_run_reflection(
self._pid, run_id,
evidence_data={
"findings": [
{"name": "DB migration", "status": "fail", "detail": "Migration pending"},
{"name": "Security scan", "status": "error", "message": "CVE-2024-001"},
],
},
)
self.assertGreater(result["risk_tasks_created"], 0)
tasks = await self._db.list_tasks(self._pid)
risk_titles = [t["title"] for t in tasks if "[RISK]" in t["title"]]
self.assertGreater(len(risk_titles), 0)
async def test_reflection_idempotent(self):
"""Reflecting on same run_id twice must not duplicate nodes."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._create_run(run_id)
r1 = await self._db.create_run_reflection(self._pid, run_id, evidence_data={})
r2 = await self._db.create_run_reflection(self._pid, run_id, evidence_data={})
self.assertEqual(r1["node_id"], r2["node_id"], "Reflection node must be stable")
async def test_full_integrity_after_reflection(self):
"""After reflection, graph integrity must still pass."""
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._create_run(run_id, "incident_triage")
await self._db.create_run_reflection(
self._pid, run_id,
evidence_data={
"findings": [{"name": "x", "status": "fail", "detail": "Timeout"}],
},
)
integrity = await self._db.check_graph_integrity(self._pid)
self.assertTrue(integrity["ok"], f"Integrity must pass after reflection: {integrity['violations']}")
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestStrategicCTOLayer(unittest.IsolatedAsyncioTestCase):
"""Tests for graph_snapshots and graph_signals (Strategic CTO Layer)."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
await self._db.init_db()
# Use unique project_id per test to avoid UNIQUE conflicts across test methods
self._pid = f"cto-{uuid.uuid4().hex[:10]}"
r = await self._db.create_project("CTO Test Project", project_id=self._pid)
self._pid = r["project_id"]
async def asyncTearDown(self):
await self._db.close_db()
self._db._db_conn = None
# ── Snapshot Tests ────────────────────────────────────────────────────────
async def test_snapshot_empty_project(self):
"""Snapshot on an empty project must return zero metrics without errors."""
result = await self._db.compute_graph_snapshot(self._pid, window="7d")
self.assertTrue(result["ok"])
m = result["metrics"]
self.assertEqual(m["tasks_created"], 0)
self.assertEqual(m["tasks_done"], 0)
self.assertEqual(m["wip"], 0)
self.assertEqual(m["risk_tasks_open"], 0)
self.assertEqual(m["agent_runs_total"], 0)
async def test_snapshot_with_tasks(self):
"""Snapshot correctly counts tasks_created and wip."""
await self._db.create_task(self._pid, "Task A", status="backlog")
await self._db.create_task(self._pid, "Task B", status="in_progress")
await self._db.create_task(self._pid, "Task C", status="done")
result = await self._db.compute_graph_snapshot(self._pid, window="7d")
m = result["metrics"]
self.assertEqual(m["tasks_created"], 3)
self.assertGreaterEqual(m["wip"], 1)
self.assertGreaterEqual(m["tasks_done"], 1)
async def test_snapshot_risk_tasks_count(self):
"""Snapshot correctly counts open [RISK] tasks."""
await self._db.create_task(self._pid, "[RISK] Critical vuln A", status="backlog", priority="high")
await self._db.create_task(self._pid, "[RISK] Critical vuln B", status="done", priority="high")
await self._db.create_task(self._pid, "Normal task", status="backlog")
result = await self._db.compute_graph_snapshot(self._pid, window="7d")
m = result["metrics"]
self.assertEqual(m["risk_tasks_open"], 1, "Only non-done [RISK] tasks should count")
async def test_snapshot_idempotent_same_day(self):
"""Two calls on same day produce single snapshot (ON CONFLICT DO UPDATE)."""
await self._db.compute_graph_snapshot(self._pid, window="7d")
await self._db.compute_graph_snapshot(self._pid, window="7d")
db = await self._db.get_db()
async with db.execute(
"SELECT COUNT(*) FROM graph_snapshots WHERE project_id=? AND window='7d'",
(self._pid,),
) as cur:
count = (await cur.fetchone())[0]
self.assertEqual(count, 1, "Should upsert not duplicate snapshot")
async def test_get_latest_snapshot(self):
"""get_latest_snapshot returns None before first compute, data after."""
snap = await self._db.get_latest_snapshot(self._pid, window="7d")
self.assertIsNone(snap)
await self._db.compute_graph_snapshot(self._pid, window="7d")
snap = await self._db.get_latest_snapshot(self._pid, window="7d")
self.assertIsNotNone(snap)
self.assertIn("metrics", snap)
self.assertIsInstance(snap["metrics"], dict)
async def test_snapshot_graph_density(self):
"""graph_density metric equals edges/nodes ratio."""
await self._db.compute_graph_snapshot(self._pid, window="7d")
snap = await self._db.get_latest_snapshot(self._pid, window="7d")
m = snap["metrics"]
if m["node_count"] > 0:
expected = round(m["edge_count"] / m["node_count"], 3)
self.assertAlmostEqual(m["graph_density"], expected, places=2)
# ── Signals Tests ─────────────────────────────────────────────────────────
async def test_signals_empty_project_no_signals(self):
"""Empty project generates no signals."""
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
self.assertTrue(result["ok"])
self.assertEqual(result["signals_generated"], 0)
self.assertEqual(result["signals_upserted"], 0)
async def test_signal_dry_run_does_not_persist(self):
"""dry_run=True computes signals but does not write to DB."""
# Create conditions for run_quality_regression rule
for i in range(3):
run_id = f"run_{uuid.uuid4().hex[:8]}"
await self._db.create_evidence_pack(
project_id=self._pid, run_id=run_id, graph_name="release_check",
result_data={"status": "completed", "findings": [{"name": "test", "status": "fail", "detail": "bad"}]},
)
await self._db.create_run_reflection(self._pid, run_id, evidence_data={
"findings": [{"name": "x", "status": "fail", "detail": "Critical fail"}] * 3,
})
dry = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=True)
live = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
# Dry run: no upserts
self.assertEqual(dry["signals_upserted"], 0)
# Dry and live should detect same signals_generated count
self.assertEqual(dry["signals_generated"], live["signals_generated"])
async def test_signal_idempotency(self):
"""Running signals twice with same conditions must not create new signals."""
# Create risk tasks for risk_cluster rule
for i in range(3):
await self._db.create_task(
self._pid, f"[RISK] Issue {i}", status="backlog", priority="high",
labels=["backend", "security"]
)
r1 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
r2 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
# Second run must not create new signals (may be skip_cooldown or refresh, but not new)
new_in_r2 = [d for d in r2["diff"] if d["action"] == "new"]
self.assertEqual(len(new_in_r2), 0, "Second run must not create new signals")
non_new = [d for d in r2["diff"] if d["action"] in ("skip_cooldown", "refresh", "exists", "cooldown")]
self.assertGreater(len(non_new), 0, "Should see non-new actions on second run")
async def test_signal_risk_cluster_rule(self):
"""risk_cluster signal fires when 3+ [RISK] tasks share a label."""
for i in range(4):
await self._db.create_task(
self._pid, f"[RISK] DB problem {i}", status="backlog", priority="high",
labels=["database"]
)
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=True)
types = [d["signal_type"] for d in result["diff"]]
self.assertIn("risk_cluster", types, "risk_cluster signal must fire for 4 tasks with shared label")
async def test_signal_stale_goal(self):
"""stale_goal signal fires for goals not updated in 14 days."""
import datetime
db = await self._db.get_db()
old_date = (datetime.datetime.utcnow() - datetime.timedelta(days=20)).strftime("%Y-%m-%dT%H:%M:%SZ")
node_id = str(uuid.uuid4())
await db.execute(
"""INSERT INTO dialog_nodes(node_id, project_id, node_type, ref_id, title, lifecycle, importance, created_at, updated_at)
VALUES(?,?,?,?,?,?,?,?,?)""",
(node_id, self._pid, "goal", node_id, "Old Stale Goal", "active", 0.9, old_date, old_date),
)
await db.commit()
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=True)
types = [d["signal_type"] for d in result["diff"]]
self.assertIn("stale_goal", types, "stale_goal must fire for goal not updated in 20 days")
async def test_signal_ack_changes_status(self):
"""ack action changes signal status to 'ack'."""
# Create a signal manually
db = await self._db.get_db()
sig_id = str(uuid.uuid4())
now = self._db._now()
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(sig_id, self._pid, "stale_goal", "medium", "Test Signal", "", "{}", "open", "fp123", now, now),
)
await db.commit()
result = await self._db.update_signal_status(sig_id, "ack")
self.assertIsNotNone(result)
self.assertEqual(result["status"], "ack")
async def test_signal_evidence_node_ids_valid(self):
"""risk_cluster signal evidence contains valid task IDs."""
task_ids = []
for i in range(3):
t = await self._db.create_task(
self._pid, f"[RISK] infra problem {i}", status="backlog", priority="high",
labels=["infra"]
)
task_ids.append(t["task_id"])
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
# Load saved signals
signals = await self._db.get_graph_signals(self._pid, status="open")
cluster = [s for s in signals if s["signal_type"] == "risk_cluster"]
self.assertTrue(len(cluster) > 0, "risk_cluster signal must exist")
ev_ids = cluster[0]["evidence"].get("task_ids", [])
for eid in ev_ids[:3]:
self.assertIn(eid, task_ids, f"Signal evidence must reference valid task IDs: {eid}")
async def test_get_signals_by_status(self):
"""get_graph_signals filters correctly by status."""
db = await self._db.get_db()
now = self._db._now()
for status, fp in [("open", "fp1"), ("ack", "fp2"), ("dismissed", "fp3")]:
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(str(uuid.uuid4()), self._pid, "stale_goal", "medium", f"Sig {status}", "", "{}", status, fp, now, now),
)
await db.commit()
open_sigs = await self._db.get_graph_signals(self._pid, status="open")
self.assertTrue(all(s["status"] == "open" for s in open_sigs))
all_sigs = await self._db.get_graph_signals(self._pid, status="all")
self.assertGreaterEqual(len(all_sigs), 3)
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestOpsGraphBridging(unittest.IsolatedAsyncioTestCase):
"""Tests for upsert_ops_run_node (Ops Graph Bridging)."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
await self._db.init_db()
self._pid = f"ops-{uuid.uuid4().hex[:10]}"
r = await self._db.create_project("Ops Test", project_id=self._pid)
self._pid = r["project_id"]
async def asyncTearDown(self):
await self._db.close_db()
self._db._db_conn = None
async def test_ops_run_node_created(self):
"""upsert_ops_run_node creates dialog_node with node_type=ops_run."""
run_id = f"ops-{uuid.uuid4().hex[:8]}"
result = await self._db.upsert_ops_run_node(
project_id=self._pid,
ops_run_id=run_id,
action_id="smoke_gateway",
node_id="NODA1",
status="ok",
elapsed_ms=250,
)
self.assertIn("node_id", result)
db = await self._db.get_db()
async with db.execute(
"SELECT node_type, title FROM dialog_nodes WHERE node_id=?", (result["node_id"],)
) as cur:
row = await cur.fetchone()
self.assertIsNotNone(row)
self.assertEqual(row[0], "ops_run")
self.assertIn("smoke_gateway", row[1])
async def test_ops_run_node_idempotent(self):
"""Calling upsert_ops_run_node twice with same ops_run_id updates, not duplicates."""
run_id = f"ops-{uuid.uuid4().hex[:8]}"
r1 = await self._db.upsert_ops_run_node(self._pid, run_id, "drift_check", "NODA1", "ok")
r2 = await self._db.upsert_ops_run_node(self._pid, run_id, "drift_check", "NODA1", "failed")
self.assertEqual(r1["node_id"], r2["node_id"], "Same run_id must return same node_id")
db = await self._db.get_db()
async with db.execute(
"SELECT COUNT(*) FROM dialog_nodes WHERE project_id=? AND node_type='ops_run' AND ref_id=?",
(self._pid, run_id),
) as cur:
count = (await cur.fetchone())[0]
self.assertEqual(count, 1, "No duplicate nodes on upsert")
async def test_ops_run_failed_higher_importance(self):
"""Failed ops_run nodes have higher importance than successful ones."""
ok_id = f"ops-ok-{uuid.uuid4().hex[:6]}"
fail_id = f"ops-fail-{uuid.uuid4().hex[:6]}"
r_ok = await self._db.upsert_ops_run_node(self._pid, ok_id, "smoke_all", "NODA1", "ok")
r_fail = await self._db.upsert_ops_run_node(self._pid, fail_id, "smoke_all", "NODA1", "failed")
db = await self._db.get_db()
async with db.execute(
"SELECT importance FROM dialog_nodes WHERE node_id=?", (r_ok["node_id"],)
) as cur:
imp_ok = (await cur.fetchone())[0]
async with db.execute(
"SELECT importance FROM dialog_nodes WHERE node_id=?", (r_fail["node_id"],)
) as cur:
imp_fail = (await cur.fetchone())[0]
self.assertGreater(imp_fail, imp_ok, "Failed ops_run must have higher importance")
async def test_ops_run_links_to_source_agent_run(self):
"""ops_run node gets a produced_by edge from source supervisor run."""
# Create a source agent_run node
src_run_id = f"run-{uuid.uuid4().hex[:8]}"
await self._db.create_evidence_pack(
self._pid, src_run_id, "release_check",
result_data={"status": "completed"}
)
ops_id = f"ops-{uuid.uuid4().hex[:8]}"
result = await self._db.upsert_ops_run_node(
self._pid, ops_id, "smoke_gateway", "NODA1", "ok",
source_run_id=src_run_id,
)
self.assertIsNotNone(result["edge_id"], "Edge must be created when source_run_id is given")
db = await self._db.get_db()
async with db.execute(
"SELECT edge_type FROM dialog_edges WHERE edge_id=?", (result["edge_id"],)
) as cur:
row = await cur.fetchone()
self.assertIsNotNone(row)
self.assertEqual(row[0], "produced_by")
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestMitigationPlanner(unittest.IsolatedAsyncioTestCase):
"""Tests for create_mitigation_plan (Mitigation Planner)."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
await self._db.init_db()
self._pid = f"mit-{uuid.uuid4().hex[:10]}"
r = await self._db.create_project("Mit Test", project_id=self._pid)
self._pid = r["project_id"]
# Create a test signal
db = await self._db.get_db()
self._sig_id = str(uuid.uuid4())
now = self._db._now()
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(self._sig_id, self._pid, "release_blocker", "critical",
"Test Release Blocker", "Test summary", '{"blocker_count": 2}',
"open", f"fp-{uuid.uuid4().hex[:8]}", now, now),
)
await db.commit()
async def asyncTearDown(self):
await self._db.close_db()
self._db._db_conn = None
async def test_mitigation_creates_plan_node(self):
"""create_mitigation_plan creates a decision node for the plan."""
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
self.assertTrue(result["ok"])
self.assertIn("plan_node_id", result)
db = await self._db.get_db()
async with db.execute(
"SELECT node_type, title FROM dialog_nodes WHERE node_id=?",
(result["plan_node_id"],),
) as cur:
row = await cur.fetchone()
self.assertIsNotNone(row)
self.assertEqual(row[0], "decision")
self.assertIn("Mitigation Plan", row[1])
async def test_mitigation_creates_tasks_from_templates(self):
"""Mitigation plan creates tasks matching release_blocker templates."""
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
self.assertGreater(result["task_count"], 0)
self.assertEqual(len(result["task_ids"]), result["task_count"])
# Verify tasks exist in DB
db = await self._db.get_db()
for tid in result["task_ids"]:
async with db.execute("SELECT title FROM tasks WHERE task_id=?", (tid,)) as cur:
row = await cur.fetchone()
self.assertIsNotNone(row, f"Task {tid} must exist in DB")
self.assertIn("[Mitigation]", row[0])
async def test_mitigation_task_count_by_signal_type(self):
"""Each signal_type has the expected number of mitigation templates."""
expected_counts = {
"release_blocker": 4,
"ops_instability": 3,
"stale_goal": 3,
"risk_cluster": 4,
"run_quality_regression": 3,
}
db = await self._db.get_db()
now = self._db._now()
for sig_type, expected in expected_counts.items():
sid = str(uuid.uuid4())
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(sid, self._pid, sig_type, "high", f"Test {sig_type}", "", "{}",
"open", f"fp-{sid[:8]}", now, now),
)
await db.commit()
result = await self._db.create_mitigation_plan(self._pid, sid)
self.assertEqual(result["task_count"], expected,
f"{sig_type} should have {expected} tasks, got {result['task_count']}")
async def test_mitigation_creates_plan_to_task_edges(self):
"""Each mitigation task has a derives_task edge from the plan node."""
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
db = await self._db.get_db()
plan_nid = result["plan_node_id"]
# Get task node_ids from dialog_nodes
task_node_ids = []
for tid in result["task_ids"]:
async with db.execute(
"SELECT node_id FROM dialog_nodes WHERE project_id=? AND node_type='task' AND ref_id=?",
(self._pid, tid),
) as cur:
row = await cur.fetchone()
if row:
task_node_ids.append(row[0])
# Check derives_task edges from plan_node
for tnid in task_node_ids:
async with db.execute(
"SELECT COUNT(*) FROM dialog_edges WHERE project_id=? AND from_node_id=? AND to_node_id=? AND edge_type='derives_task'",
(self._pid, plan_nid, tnid),
) as cur:
count = (await cur.fetchone())[0]
self.assertEqual(count, 1, f"Missing derives_task edge for task node {tnid}")
async def test_mitigation_updates_signal_evidence(self):
"""After mitigation, signal.evidence contains plan_node_id and mitigation_task_ids."""
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
db = await self._db.get_db()
async with db.execute("SELECT evidence FROM graph_signals WHERE id=?", (self._sig_id,)) as cur:
row = await cur.fetchone()
import json as _json
ev = _json.loads(row[0])
self.assertIn("plan_node_id", ev)
self.assertEqual(ev["plan_node_id"], result["plan_node_id"])
self.assertIn("mitigation_task_ids", ev)
async def test_mitigation_idempotent(self):
"""Running mitigation twice does not duplicate plan node."""
r1 = await self._db.create_mitigation_plan(self._pid, self._sig_id)
r2 = await self._db.create_mitigation_plan(self._pid, self._sig_id)
self.assertEqual(r1["plan_node_id"], r2["plan_node_id"], "Plan node must be stable")
async def test_mitigation_invalid_signal_raises(self):
"""create_mitigation_plan raises ValueError for unknown signal."""
with self.assertRaises(ValueError):
await self._db.create_mitigation_plan(self._pid, "nonexistent-signal-id")
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestSignalLifecycle(unittest.IsolatedAsyncioTestCase):
"""Tests for signal merge/reopen/cooldown and auto-resolve."""
async def asyncSetUp(self):
self._db = _db_module
await self._db.close_db()
self._db._db_conn = None
await self._db.init_db()
self._pid = f"slc-{uuid.uuid4().hex[:10]}"
r = await self._db.create_project("SLC Test", project_id=self._pid)
self._pid = r["project_id"]
async def asyncTearDown(self):
await self._db.close_db()
self._db._db_conn = None
async def _make_signal(self, sig_type="stale_goal", severity="medium", status="open", fp=None):
"""Helper: insert a signal directly."""
db = await self._db.get_db()
sid = str(uuid.uuid4())
now = self._db._now()
_fp = fp or f"fp-{uuid.uuid4().hex[:8]}"
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(sid, self._pid, sig_type, severity, f"Test {sig_type}", "",
'{"cooldown_hours": 24}', status, _fp, now, now),
)
await db.commit()
return sid, _fp
# ── Cooldown / Reopen ─────────────────────────────────────────────────────
async def test_signal_new_creates_correctly(self):
"""First-time signal is created with last_triggered_at in evidence."""
for i in range(3):
await self._db.create_task(
self._pid, f"[RISK] Cluster task {i}", status="backlog", priority="high",
labels=["infra-cluster"]
)
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
diff_new = [d for d in result["diff"] if d["action"] == "new"]
self.assertGreater(len(diff_new), 0)
# Verify evidence has last_triggered_at
sigs = await self._db.get_graph_signals(self._pid)
for s in sigs:
self.assertIn("last_triggered_at", s["evidence"])
self.assertIn("cooldown_hours", s["evidence"])
async def test_cooldown_prevents_duplicate(self):
"""Second recompute within cooldown skips already-active signal."""
for i in range(3):
await self._db.create_task(
self._pid, f"[RISK] X {i}", status="backlog", priority="high", labels=["comp-x"]
)
r1 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
r2 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
# All r2 diff entries should be skip_cooldown (not new)
r2_new = [d for d in r2["diff"] if d["action"] == "new"]
self.assertEqual(len(r2_new), 0, "Second recompute in cooldown must not create new signals")
skip = [d for d in r2["diff"] if d["action"] == "skip_cooldown"]
self.assertGreater(len(skip), 0, "skip_cooldown must appear in diff")
async def test_resolved_signal_reopens_after_cooldown(self):
"""A resolved signal with expired cooldown gets reopened on next recompute."""
import datetime
db = await self._db.get_db()
# Create stale goal to trigger stale_goal rule
node_id = str(uuid.uuid4())
old_ts = (datetime.datetime.utcnow() - datetime.timedelta(days=20)).strftime("%Y-%m-%dT%H:%M:%SZ")
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,lifecycle,importance,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?)",
(node_id, self._pid, "goal", node_id, "Stale Test Goal", "active", 0.9, old_ts, old_ts),
)
await db.commit()
# First recompute: creates the signal
r1 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
new_sigs = [d for d in r1["diff"] if d["action"] == "new" and d["signal_type"] == "stale_goal"]
self.assertGreater(len(new_sigs), 0, "stale_goal signal must be created")
# Find and mark it resolved, with old updated_at (cooldown expired)
sigs = await self._db.get_graph_signals(self._pid)
stale_sig = next((s for s in sigs if s["signal_type"] == "stale_goal"), None)
self.assertIsNotNone(stale_sig)
old_updated = (datetime.datetime.utcnow() - datetime.timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%SZ")
await db.execute(
"UPDATE graph_signals SET status='resolved', updated_at=?, evidence=? WHERE id=?",
(old_updated, '{"cooldown_hours": 1, "last_triggered_at": "' + old_updated + '"}', stale_sig["id"]),
)
await db.commit()
# Second recompute: should reopen (cooldown of 1h expired)
r2 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
reopen_entries = [d for d in r2["diff"] if d.get("action") == "reopen"]
self.assertGreater(len(reopen_entries), 0, "Resolved signal must be reopened after cooldown expires")
# ── Auto-resolve ──────────────────────────────────────────────────────────
async def test_auto_resolve_dry_run_does_not_change_status(self):
"""auto_resolve dry_run computes but does not change signal status."""
sid, _ = await self._make_signal("release_blocker", status="open")
result = await self._db.auto_resolve_signals(self._pid, dry_run=True)
self.assertTrue(result["ok"])
self.assertEqual(result["resolved"], 0)
# Status unchanged
db = await self._db.get_db()
async with db.execute("SELECT status FROM graph_signals WHERE id=?", (sid,)) as cur:
row = await cur.fetchone()
self.assertEqual(row[0], "open")
async def test_auto_resolve_release_blocker_when_no_risks(self):
"""release_blocker resolves when no open [RISK] tasks remain."""
sid, _ = await self._make_signal("release_blocker", status="open")
# No [RISK] tasks → criteria met
result = await self._db.auto_resolve_signals(self._pid, dry_run=False)
resolved = [d for d in result["diff"] if d.get("action") == "resolved" and d["signal_type"] == "release_blocker"]
self.assertGreater(len(resolved), 0, "release_blocker must resolve when no [RISK] tasks")
# Verify status in DB
db = await self._db.get_db()
async with db.execute("SELECT status, evidence FROM graph_signals WHERE id=?", (sid,)) as cur:
row = await cur.fetchone()
self.assertEqual(row[0], "resolved")
import json as _j
ev = _j.loads(row[1])
self.assertIn("resolved_at", ev)
self.assertIn("resolution_reason", ev)
async def test_auto_resolve_release_blocker_stays_open_with_risks(self):
"""release_blocker stays open when [RISK] tasks exist."""
await self._db.create_task(self._pid, "[RISK] Critical blocker", status="backlog", priority="high")
sid, _ = await self._make_signal("release_blocker", status="open")
result = await self._db.auto_resolve_signals(self._pid, dry_run=False)
still_open = [d for d in result["diff"] if d.get("action") == "still_open"]
self.assertGreater(len(still_open), 0)
async def test_auto_resolve_run_quality_regression_resolves(self):
"""run_quality_regression resolves when avg completeness >= 75%."""
sid, _ = await self._make_signal("run_quality_regression", status="open")
# Insert 3 high-quality reflections
db = await self._db.get_db()
now = self._db._now()
for i in range(3):
import json as _j
props = _j.dumps({"plan_completeness_score": 0.85, "confidence": "high"})
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,props,lifecycle,importance,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(str(uuid.uuid4()), self._pid, "decision", f"refl-{i}-{uuid.uuid4().hex[:6]}",
f"Reflection: run{i}", props, "active", 0.7, now, now),
)
await db.commit()
result = await self._db.auto_resolve_signals(self._pid, dry_run=False)
resolved = [d for d in result["diff"] if d.get("action") == "resolved" and d["signal_type"] == "run_quality_regression"]
self.assertGreater(len(resolved), 0, "run_quality_regression must resolve with good quality")
async def test_auto_resolve_returns_correct_counts(self):
"""auto_resolve result has accurate checked/resolved counts."""
sid1, _ = await self._make_signal("release_blocker", status="open")
sid2, _ = await self._make_signal("stale_goal", status="ack")
result = await self._db.auto_resolve_signals(self._pid, dry_run=True)
self.assertEqual(result["checked"], 2)
self.assertEqual(len(result["diff"]), 2)
class TestPlaybooks(unittest.IsolatedAsyncioTestCase):
"""Tests for Playbooks v1 (Graph Learning Layer)."""
async def asyncSetUp(self):
import uuid as _uuid
self._pid = f"pb-{_uuid.uuid4().hex[:10]}"
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
_db_module._DB_PATH = ":memory:"
await _db_module.get_db()
await _db_module.init_db()
await _db_module.create_project(name="PB Test", project_id=self._pid)
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
async def _make_mitigated_signal(self, signal_type: str = "risk_cluster", label: str = "auth"):
"""Create a signal and run mitigation on it."""
import uuid as _uuid
sig_id = str(_uuid.uuid4())
now = "2026-02-26T12:00:00"
evidence = {"label": label, "count": 3}
db = await self._db.get_db()
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(sig_id, self._pid, signal_type, "high", f"Test {signal_type}", "",
json.dumps(evidence), "open", now, now),
)
await db.commit()
# Create mitigation
result = await self._db.create_mitigation_plan(self._pid, sig_id)
return sig_id, result
def test_context_key_risk_cluster(self):
"""compute_context_key returns label:<label> for risk_cluster."""
ck = self._db._compute_context_key("risk_cluster", {"label": "auth"})
self.assertEqual(ck, "label:auth")
def test_context_key_ops_instability(self):
"""compute_context_key returns ops_action:<action> for ops_instability."""
ck = self._db._compute_context_key("ops_instability", {"failed_ops_actions": ["smoke_gateway"]})
self.assertEqual(ck, "ops_action:smoke_gateway")
def test_context_key_release_blocker(self):
"""compute_context_key returns global for release_blocker."""
ck = self._db._compute_context_key("release_blocker", {})
self.assertEqual(ck, "global")
async def test_upsert_playbook_creates_new(self):
"""upsert_playbook_from_signal creates a new playbook."""
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "auth")
result = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
self.assertTrue(result["ok"])
self.assertTrue(result["created"])
self.assertIn("playbook_id", result)
self.assertEqual(result["signal_type"], "risk_cluster")
self.assertEqual(result["context_key"], "label:auth")
async def test_upsert_playbook_idempotent(self):
"""Promoting the same signal twice does not create duplicate playbook or examples."""
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "payments")
r1 = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
r2 = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
# Both should return the same playbook_id
self.assertEqual(r1["playbook_id"], r2["playbook_id"])
# Should not create a new playbook
self.assertTrue(r1["created"])
self.assertFalse(r2.get("created", True) if not r2["created"] else False or True)
# uses should not be double-counted
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
self.assertEqual(len(pbs), 1)
async def test_upsert_playbook_requires_mitigation(self):
"""upsert_playbook_from_signal raises ValueError if signal has no plan_node_id."""
import uuid as _uuid
sig_id = str(_uuid.uuid4())
now = "2026-02-26T12:00:00"
db = await self._db.get_db()
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(sig_id, self._pid, "stale_goal", "medium", "Stale", "", "{}", "open", now, now),
)
await db.commit()
with self.assertRaises(ValueError):
await self._db.upsert_playbook_from_signal(self._pid, sig_id)
async def test_list_playbooks_ordered(self):
"""list_playbooks returns playbooks ordered by success_rate desc."""
sig1, _ = await self._make_mitigated_signal("risk_cluster", "auth")
sig2, _ = await self._make_mitigated_signal("risk_cluster", "payments")
# Promote both with different success rates (manual update after create)
await self._db.upsert_playbook_from_signal(self._pid, sig1, resolved=True, time_to_resolve_h=5.0)
await self._db.upsert_playbook_from_signal(self._pid, sig2, resolved=False)
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
self.assertGreaterEqual(len(pbs), 1)
# Verify order: higher success_rate first (resolved one should be first)
if len(pbs) >= 2:
self.assertGreaterEqual(pbs[0]["success_rate"], pbs[1]["success_rate"])
async def test_apply_playbook_creates_tasks(self):
"""apply_playbook_to_signal creates tasks and links them to plan node."""
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "db")
pb_res = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
playbook_id = pb_res["playbook_id"]
# Create a fresh signal to apply the playbook to
import uuid as _uuid
new_sig_id = str(_uuid.uuid4())
db = await self._db.get_db()
now = "2026-02-26T13:00:00"
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(new_sig_id, self._pid, "risk_cluster", "high", "New cluster", "", '{"label":"db"}', "open", now, now),
)
await db.commit()
result = await self._db.apply_playbook_to_signal(self._pid, new_sig_id, playbook_id)
self.assertTrue(result["ok"])
self.assertIn("plan_node_id", result)
self.assertIn("task_ids", result)
# Signal should be ACKed
async with db.execute("SELECT status FROM graph_signals WHERE id=?", (new_sig_id,)) as cur:
row = await cur.fetchone()
self.assertEqual(row[0], "ack")
async def test_apply_playbook_increments_uses(self):
"""apply_playbook_to_signal increments uses count."""
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "cache")
pb_res = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
playbook_id = pb_res["playbook_id"]
# Apply to a new signal
import uuid as _uuid
new_sig_id = str(_uuid.uuid4())
db = await self._db.get_db()
now = "2026-02-26T14:00:00"
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(new_sig_id, self._pid, "risk_cluster", "high", "Cache cluster", "", '{"label":"cache"}', "open", now, now),
)
await db.commit()
await self._db.apply_playbook_to_signal(self._pid, new_sig_id, playbook_id)
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
pb = next((p for p in pbs if p["playbook_id"] == playbook_id), None)
self.assertIsNotNone(pb)
self.assertGreaterEqual(pb["uses"], 1)
async def test_update_playbook_stats_on_resolve(self):
"""update_playbook_stats_on_resolve increments successes for a NEW signal (not yet in examples)."""
# First create a playbook from one signal
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "api")
pb_res = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
playbook_id = pb_res["playbook_id"]
# Create a second mitigated signal with the same signal_type/label (same playbook)
sig_id2, _ = await self._make_mitigated_signal("risk_cluster", "api")
db = await self._db.get_db()
async with db.execute("SELECT evidence FROM graph_signals WHERE id=?", (sig_id2,)) as cur:
row = await cur.fetchone()
evidence = json.loads(row[0]) if row else {}
# update_playbook_stats_on_resolve with the NEW signal_id (not yet in examples)
pb_id_updated = await self._db.update_playbook_stats_on_resolve(
project_id=self._pid,
signal_id=sig_id2,
signal_type="risk_cluster",
evidence=evidence,
resolved_at="2026-02-26T15:00:00",
created_at="2026-02-26T12:00:00",
)
self.assertEqual(pb_id_updated, playbook_id)
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
pb = next((p for p in pbs if p["playbook_id"] == playbook_id), None)
self.assertIsNotNone(pb)
self.assertGreater(pb["successes"], 0)
async def test_update_playbook_stats_idempotent(self):
"""update_playbook_stats_on_resolve does not double-count the same signal."""
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "storage")
await self._db.upsert_playbook_from_signal(self._pid, sig_id, resolved=True, time_to_resolve_h=2.0)
db = await self._db.get_db()
async with db.execute("SELECT evidence FROM graph_signals WHERE id=?", (sig_id,)) as cur:
row = await cur.fetchone()
evidence = json.loads(row[0]) if row else {}
# Call twice with same signal_id
await self._db.update_playbook_stats_on_resolve(
self._pid, sig_id, "risk_cluster", evidence,
"2026-02-26T15:00:00", "2026-02-26T12:00:00",
)
await self._db.update_playbook_stats_on_resolve(
self._pid, sig_id, "risk_cluster", evidence,
"2026-02-26T15:00:00", "2026-02-26T12:00:00",
)
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
pb = next((p for p in pbs if p.get("context_key") == "label:storage"), None)
# successes should not be inflated beyond what was set during upsert
if pb:
self.assertLessEqual(pb["successes"], pb["uses"])
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestLessons(unittest.IsolatedAsyncioTestCase):
"""Tests for Lessons v1 (weekly Lessons Learned)."""
async def asyncSetUp(self):
import uuid as _uuid
self._pid = f"ls-{_uuid.uuid4().hex[:10]}"
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
_db_module._DB_PATH = ":memory:"
await _db_module.get_db()
await _db_module.init_db()
await _db_module.create_project(name="Lesson Test", project_id=self._pid)
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
def test_compute_lesson_bucket(self):
"""compute_lesson_bucket returns valid ISO week string."""
bucket = self._db.compute_lesson_bucket()
import re
self.assertRegex(bucket, r"^\d{4}-W\d{2}$")
def test_compute_lesson_bucket_specific_date(self):
"""compute_lesson_bucket with a known date returns correct week."""
bucket = self._db.compute_lesson_bucket("2026-02-28T00:00:00")
self.assertTrue(bucket.startswith("2026-W"), f"Expected 2026-Www, got {bucket}")
async def test_dry_run_does_not_write(self):
"""generate with dry_run=True returns report but writes nothing to DB."""
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=True)
self.assertTrue(result["dry_run"])
self.assertIn("markdown", result)
self.assertIn("date_bucket", result)
# Verify nothing written
db = await self._db.get_db()
async with db.execute("SELECT COUNT(*) FROM lessons WHERE project_id=?", (self._pid,)) as cur:
count = (await cur.fetchone())[0]
self.assertEqual(count, 0)
async def test_dry_run_returns_planned_tasks(self):
"""dry_run result includes planned_improvement_tasks list."""
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=True)
self.assertIn("planned_improvement_tasks", result)
self.assertIsInstance(result["planned_improvement_tasks"], list)
self.assertLessEqual(len(result["planned_improvement_tasks"]), 3)
async def test_apply_creates_lesson(self):
"""apply (dry_run=False) creates lessons row and decision node."""
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
self.assertFalse(result["dry_run"])
self.assertIn("lesson_id", result)
self.assertIn("lesson_node_id", result)
self.assertIn("date_bucket", result)
db = await self._db.get_db()
async with db.execute("SELECT COUNT(*) FROM lessons WHERE project_id=?", (self._pid,)) as cur:
count = (await cur.fetchone())[0]
self.assertEqual(count, 1)
async def test_apply_creates_decision_node(self):
"""apply creates a decision dialog_node with lesson title."""
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
db = await self._db.get_db()
async with db.execute(
"SELECT title, node_type FROM dialog_nodes WHERE node_id=?",
(result["lesson_node_id"],),
) as cur:
row = await cur.fetchone()
self.assertIsNotNone(row)
self.assertEqual(row[1], "decision")
self.assertIn("Lesson:", row[0])
async def test_apply_idempotent_same_bucket(self):
"""Applying twice for the same bucket yields the same lesson_id."""
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
self.assertEqual(r1["lesson_id"], r2["lesson_id"])
self.assertEqual(r1["lesson_node_id"], r2["lesson_node_id"])
db = await self._db.get_db()
async with db.execute("SELECT COUNT(*) FROM lessons WHERE project_id=?", (self._pid,)) as cur:
count = (await cur.fetchone())[0]
self.assertEqual(count, 1)
async def test_improvement_tasks_capped_at_3(self):
"""Improvement tasks are capped at 3 max."""
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
self.assertLessEqual(len(result["created_task_ids"]), 3)
async def test_improvement_tasks_deduped(self):
"""Calling apply twice does not duplicate improvement tasks."""
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
# Task IDs should be the same set (same lesson, same fingerprints)
self.assertEqual(set(r1["created_task_ids"]), set(r2["created_task_ids"]))
async def test_triggers_with_signals(self):
"""Improvement triggers fire when signals exist in window."""
import uuid as _uuid
now = "2026-02-28T12:00:00"
db = await self._db.get_db()
# Add ops_instability signals (high/critical) ≥2 to trigger trigger 1
for i in range(3):
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(str(_uuid.uuid4()), self._pid, "ops_instability", "high",
f"OpsInstab {i}", "", "{}", "open", now, now),
)
await db.commit()
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=True)
tasks = result["planned_improvement_tasks"]
# Should have at least 1 task for ops resilience
ops_tasks = [t for t in tasks if "ops" in t["title"].lower() or "ops" in " ".join(t["labels"])]
self.assertGreater(len(ops_tasks), 0)
async def test_list_lessons(self):
"""list_lessons returns lessons in date_bucket desc order."""
await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
lessons = await self._db.list_lessons(self._pid)
self.assertGreaterEqual(len(lessons), 1)
self.assertIn("lesson_id", lessons[0])
self.assertIn("date_bucket", lessons[0])
self.assertIn("metrics", lessons[0])
async def test_get_lesson_detail(self):
"""get_lesson_detail returns markdown and evidence ids."""
apply_result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
lesson_id = apply_result["lesson_id"]
detail = await self._db.get_lesson_detail(self._pid, lesson_id)
self.assertIsNotNone(detail)
self.assertEqual(detail["lesson_id"], lesson_id)
self.assertIn("markdown", detail)
self.assertIn("linked_signal_ids", detail)
self.assertIn("improvement_task_ids", detail)
async def test_evidence_signal_links(self):
"""Lesson links to signals via entity_links summarizes."""
import uuid as _uuid
now = "2026-02-28T12:00:00"
db = await self._db.get_db()
sig_id = str(_uuid.uuid4())
await db.execute(
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
"VALUES(?,?,?,?,?,?,?,?,?,?)",
(sig_id, self._pid, "ops_instability", "medium", "Test sig", "", "{}", "open", now, now),
)
await db.commit()
apply_result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
detail = await self._db.get_lesson_detail(self._pid, apply_result["lesson_id"])
# Signal should be in linked_signal_ids
self.assertIn(sig_id, detail["linked_signal_ids"])
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestLessonsDelta(unittest.IsolatedAsyncioTestCase):
"""Tests for Delta Intelligence and Impact Tracking (Lessons v2)."""
async def asyncSetUp(self):
import uuid as _uuid
self._pid = f"ld-{_uuid.uuid4().hex[:10]}"
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
_db_module._DB_PATH = ":memory:"
await _db_module.get_db()
await _db_module.init_db()
self._db = _db_module
# Create project
pid = self._pid
db = await _db_module.get_db()
import time as _time
now_ts = _db_module._now()
await db.execute(
"INSERT INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
(pid, f"Delta Test {pid}", now_ts, now_ts),
)
await db.commit()
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
async def test_delta_schema_present_after_two_lessons(self):
"""Second lesson has delta block with expected keys."""
# Generate first lesson (W01 sim)
import datetime
# Patch compute_lesson_bucket to return fixed buckets
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W01"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
self.assertFalse(r1.get("dry_run"))
# Generate second lesson
_db_module.compute_lesson_bucket = lambda now=None: "2026-W02"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
# Check metrics_json of second lesson
detail2 = await self._db.get_lesson_detail(self._pid, r2["lesson_id"])
metrics = detail2["metrics"]
self.assertIn("current", metrics)
self.assertIn("delta", metrics)
self.assertIn("trend_flags", metrics)
# previous should reference W01
prev = metrics.get("previous")
self.assertIsNotNone(prev)
self.assertEqual(prev.get("date_bucket"), "2026-W01")
async def test_trend_flags_correct_for_good_directions(self):
"""trend_flags structure is present and risk_regressing=True when risk_open increased."""
import json
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W10"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
# Manually set risk_tasks_open to 0 in first lesson current block
db = await _db_module.get_db()
m1 = json.loads((await (await db.execute("SELECT metrics_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],))).fetchone())[0])
m1.setdefault("current", {})["risk_open"] = 0
m1["risk_tasks_open"] = 0
await db.execute("UPDATE lessons SET metrics_json=? WHERE lesson_id=?", (json.dumps(m1), r1["lesson_id"]))
await db.commit()
# Manually inject a [RISK] task so current risk_open > 0
import uuid as _uuid_m
t_id = str(_uuid_m.uuid4())
n_id = str(_uuid_m.uuid4())
now_ts = _db_module._now()
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,title,summary,props,created_by,created_at,updated_at,lifecycle,importance,ref_id)"
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?)",
(n_id, self._pid, "task", "[RISK] Critical auth issue", "", "{}", "test", now_ts, now_ts, "active", 0.9, t_id),
)
await db.execute(
"INSERT INTO tasks(task_id,project_id,title,status,priority,description,labels,created_by,created_at,updated_at)"
" VALUES(?,?,?,?,?,?,?,?,?,?)",
(t_id, self._pid, "[RISK] Critical auth issue", "backlog", "high", "", '["risk"]', "test", now_ts, now_ts),
)
await db.commit()
_db_module.compute_lesson_bucket = lambda now=None: "2026-W11"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
detail2 = await self._db.get_lesson_detail(self._pid, r2["lesson_id"])
metrics = detail2["metrics"]
# trend_flags must have all expected keys
tf = metrics.get("trend_flags", {})
for key in ("risk_improving", "risk_regressing", "quality_improving", "quality_regressing"):
self.assertIn(key, tf, f"Missing trend_flag key: {key}")
# Previous risk_open=0, current risk_open>=1 → risk_regressing=True
prev = metrics.get("previous", {})
curr = metrics.get("current", {})
if (prev.get("risk_open") or 0) == 0 and (curr.get("risk_open") or 0) >= 1:
self.assertTrue(tf.get("risk_regressing"), "risk_regressing should be True when risk increased")
async def test_delta_first_lesson_has_no_previous(self):
"""First lesson has delta=None or trend=new for all metrics."""
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W50"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
detail = await self._db.get_lesson_detail(self._pid, r1["lesson_id"])
metrics = detail["metrics"]
prev = metrics.get("previous")
self.assertIsNone(prev, "First lesson should have no previous block")
# Delta should exist but trend=new for all
delta = metrics.get("delta", {})
if delta:
for k, v in delta.items():
self.assertEqual(v.get("trend"), "new", f"Expected trend=new for {k} in first lesson")
async def test_impact_evaluated_when_next_bucket_exists(self):
"""evaluate_lesson_impact writes impact_score to prior lesson."""
import json
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W20"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = lambda now=None: "2026-W21"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
# After W21 lesson, W20 lesson should have impact evaluated
db = await _db_module.get_db()
row = await (await db.execute(
"SELECT impact_score, impact_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
)).fetchone()
self.assertIsNotNone(row)
# impact_json should reference evaluated_bucket W21
ij = json.loads(row[1] or "{}")
self.assertEqual(ij.get("evaluated_bucket"), "2026-W21")
async def test_impact_idempotent(self):
"""Calling evaluate_lesson_impact twice does not change result."""
import json
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W30"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = lambda now=None: "2026-W31"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
# First evaluation (auto-called after upsert)
db = await _db_module.get_db()
row1 = await (await db.execute(
"SELECT impact_score FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
)).fetchone()
score1 = row1[0] if row1 else None
# Second call with force=False (should skip)
result2 = await self._db.evaluate_lesson_impact(self._pid, "7d", "2026-W31", force=False)
self.assertIsNone(result2, "Second call without force should return None (idempotent)")
row2 = await (await db.execute(
"SELECT impact_score FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
)).fetchone()
self.assertEqual(score1, row2[0] if row2 else None)
async def test_dry_run_no_write_for_impact_recompute(self):
"""evaluate_lesson_impact with force=True writes, then a dry_run check via API endpoint logic."""
import json
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W40"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = lambda now=None: "2026-W41"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
db = await _db_module.get_db()
row = await (await db.execute(
"SELECT impact_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
)).fetchone()
ij = json.loads((row[0] if row else None) or "{}")
eval_at_1 = ij.get("evaluated_at")
# Force recompute
res = await self._db.evaluate_lesson_impact(self._pid, "7d", "2026-W41", force=True)
self.assertIsNotNone(res)
row2 = await (await db.execute(
"SELECT impact_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
)).fetchone()
ij2 = json.loads((row2[0] if row2 else None) or "{}")
eval_at_2 = ij2.get("evaluated_at")
# updated_at should be new (force re-ran)
self.assertIsNotNone(eval_at_2)
async def test_detail_includes_delta_and_impact_fields(self):
"""get_lesson_detail returns delta, trend_flags, current, previous, impact."""
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W60"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = lambda now=None: "2026-W61"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
detail2 = await self._db.get_lesson_detail(self._pid, r2["lesson_id"])
# Must have these keys
for key in ("delta", "trend_flags", "current", "previous", "impact"):
self.assertIn(key, detail2, f"Missing key: {key}")
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
class TestLevelFive(unittest.IsolatedAsyncioTestCase):
"""Tests for Level 5: Streaks, Portfolio Drift Signals, Impact Attribution."""
async def asyncSetUp(self):
import uuid as _uuid
self._pid = f"lv5-{_uuid.uuid4().hex[:10]}"
self._pid2 = f"lv5b-{_uuid.uuid4().hex[:10]}"
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
_db_module._DB_PATH = ":memory:"
await _db_module.get_db()
await _db_module.init_db()
self._db = _db_module
db = await _db_module.get_db()
now_ts = _db_module._now()
for pid in (self._pid, self._pid2):
await db.execute(
"INSERT INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
(pid, f"L5 Test {pid}", now_ts, now_ts),
)
await db.commit()
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
_db_module._db_conn = None
async def _make_lesson_with_flags(self, pid, bucket, risk_reg=False, quality_reg=False, ops_reg=False,
risk_imp=False, quality_imp=False, ops_imp=False):
"""Insert a synthetic lesson with given trend_flags."""
import json
import uuid as _uuid
lesson_id = str(_uuid.uuid4())
tf = {
"risk_regressing": risk_reg, "risk_improving": risk_imp,
"quality_regressing": quality_reg, "quality_improving": quality_imp,
"ops_regressing": ops_reg, "ops_improving": ops_imp,
"delivery_improving": False, "delivery_regressing": False,
}
metrics = {
"kind": "lesson_metrics", "project_id": pid, "window": "7d", "date_bucket": bucket,
"current": {"risk_open": 5 if risk_reg else 0, "quality_avg": 0.5, "ops_failure_rate": 0.3},
"previous": None, "delta": {}, "trend_flags": tf,
"wip": 3, "tasks_done": 5, "risk_tasks_open": 2, "run_quality_avg": 0.6,
"ops_failure_rate": 0.2, "agent_runs_in_window": 4, "signals_in_window": 2,
"improvement_tasks_count": 0,
}
fp = _db_module._lesson_fingerprint(pid, "7d", bucket)
now_ts = _db_module._now()
db = await _db_module.get_db()
# Create a dummy decision node for lesson_node_id
node_id = str(_uuid.uuid4())
await db.execute(
"INSERT INTO dialog_nodes(node_id,project_id,node_type,title,summary,props,ref_id,created_by,created_at,updated_at,lifecycle,importance)"
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?)",
(node_id, pid, "decision", f"Lesson: {bucket}", "", json.dumps({"markdown": "# Test"}),
node_id, "test", now_ts, now_ts, "active", 0.9),
)
await db.execute(
"INSERT INTO lessons(lesson_id,project_id,window,date_bucket,fingerprint,status,lesson_node_id,doc_version_id,metrics_json,impact_score,impact_json,created_at,updated_at)"
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
(lesson_id, pid, "7d", bucket, fp, "published", node_id, "", json.dumps(metrics), 0.0, "{}", now_ts, now_ts),
)
await db.commit()
return lesson_id
async def test_streak_len2_regressing(self):
"""Two consecutive regressing lessons → streak len=2."""
await self._make_lesson_with_flags(self._pid, "2026-W01", risk_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W02", risk_reg=True)
streaks = await self._db.compute_lesson_streaks(self._pid)
self.assertEqual(streaks["risk"]["dir"], "regressing")
self.assertEqual(streaks["risk"]["len"], 2)
self.assertEqual(streaks["risk"]["since_bucket"], "2026-W01")
async def test_streak_len3_improving(self):
"""Three consecutive improving lessons → streak len=3."""
await self._make_lesson_with_flags(self._pid, "2026-W10", ops_imp=True)
await self._make_lesson_with_flags(self._pid, "2026-W11", ops_imp=True)
await self._make_lesson_with_flags(self._pid, "2026-W12", ops_imp=True)
streaks = await self._db.compute_lesson_streaks(self._pid)
self.assertEqual(streaks["ops"]["dir"], "improving")
self.assertEqual(streaks["ops"]["len"], 3)
async def test_streak_flat_when_mixed(self):
"""Mixed directions → streak len=0 or len=1 (not a streak)."""
await self._make_lesson_with_flags(self._pid, "2026-W20", quality_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W21", quality_imp=True)
streaks = await self._db.compute_lesson_streaks(self._pid)
self.assertLess(streaks["quality"]["len"], 2, "Mixed directions should not produce len>=2 streak")
async def test_portfolio_drift_signal_created(self):
"""portfolio_risk_drift signal is created when a project has risk streak>=2."""
await self._make_lesson_with_flags(self._pid, "2026-W30", risk_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W31", risk_reg=True)
result = await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
self.assertFalse(result["dry_run"])
types = [c["signal_type"] for c in result["changes"]]
self.assertIn("portfolio_risk_drift", types)
async def test_portfolio_drift_signal_idempotent(self):
"""Running recompute twice does not create duplicate signals."""
import json
await self._make_lesson_with_flags(self._pid, "2026-W40", risk_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W41", risk_reg=True)
await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
db = await _db_module.get_db()
async with db.execute(
"SELECT COUNT(*) FROM graph_signals WHERE project_id=? AND signal_type=?",
("portfolio", "portfolio_risk_drift"),
) as cur:
row = await cur.fetchone()
self.assertEqual(row[0], 1, "Should have exactly 1 drift signal, not duplicates")
async def test_portfolio_drift_severity_by_len(self):
"""len=2 → high, len=3 → critical."""
# len=3 streak for ops
await self._make_lesson_with_flags(self._pid, "2026-W50", ops_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W51", ops_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W52", ops_reg=True)
result = await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
ops_change = next((c for c in result["changes"] if c["signal_type"] == "portfolio_ops_drift"), None)
self.assertIsNotNone(ops_change)
self.assertEqual(ops_change["severity"], "critical")
async def test_portfolio_drift_dry_run_no_write(self):
"""dry_run=True should not write signals to DB."""
await self._make_lesson_with_flags(self._pid, "2026-W60", risk_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W61", risk_reg=True)
result = await self._db.recompute_portfolio_signals(window="7d", dry_run=True)
self.assertTrue(result["dry_run"])
db = await _db_module.get_db()
async with db.execute(
"SELECT COUNT(*) FROM graph_signals WHERE project_id='portfolio'",
) as cur:
row = await cur.fetchone()
self.assertEqual(row[0], 0, "dry_run should not write any signals")
async def test_attribution_unknown_when_no_tasks(self):
"""Impact with no improvement tasks → attribution unknown."""
orig = _db_module.compute_lesson_bucket
_db_module.compute_lesson_bucket = lambda now=None: "2026-W70"
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = lambda now=None: "2026-W71"
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
_db_module.compute_lesson_bucket = orig
detail = await self._db.get_lesson_detail(self._pid, r1["lesson_id"])
imp = detail.get("impact") or {}
attr = imp.get("attribution") or {}
self.assertEqual(attr.get("level"), "unknown", "No tasks → attribution unknown")
async def test_list_portfolio_signals(self):
"""list_portfolio_signals returns signals with project_id='portfolio'."""
await self._make_lesson_with_flags(self._pid, "2026-W80", quality_reg=True)
await self._make_lesson_with_flags(self._pid, "2026-W81", quality_reg=True)
await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
sigs = await self._db.list_portfolio_signals(status="open")
self.assertGreater(len(sigs), 0, "Should have at least one portfolio drift signal")
for s in sigs:
self.assertEqual(s["project_id"], "portfolio")
class TestLevel6(unittest.IsolatedAsyncioTestCase):
"""Tests for Level 6: Governance Gates + Auto-plan/Auto-run."""
async def asyncSetUp(self):
import uuid as _uuid
self._pid = f"lv6-{_uuid.uuid4().hex[:10]}"
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
db = await _db_module.get_db()
await db.execute(
"INSERT OR IGNORE INTO projects(project_id, name, created_at, updated_at) VALUES(?,?,?,?)",
(self._pid, "LV6 Test", _db_module._now(), _db_module._now()),
)
await db.commit()
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
# ── Gate: BLOCK_RELEASE ──────────────────────────────────────────────────
async def test_block_release_gate_triggers(self):
"""BLOCK_RELEASE fires when open high/critical [RISK] tasks exist."""
db = await self._db.get_db()
task_id = "t-risk-" + self._pid[:8]
await db.execute(
"INSERT OR IGNORE INTO tasks(task_id, project_id, title, status, priority, labels, created_at, updated_at)"
" VALUES(?,?,?,?,?,?,?,?)",
(task_id, self._pid, "[RISK] critical auth bypass", "backlog", "urgent", "risk", self._db._now(), self._db._now()),
)
# dialog_node required for join
node_id = "dn-risk-" + self._pid[:8]
await db.execute(
"INSERT OR IGNORE INTO dialog_nodes(node_id,project_id,node_type,title,ref_id,created_by,created_at,updated_at,lifecycle,importance)"
" VALUES(?,?,?,?,?,?,?,?,?,?)",
(node_id, self._pid, "task", "[RISK] critical auth bypass", task_id, "test", self._db._now(), self._db._now(), "active", 0.7),
)
await db.commit()
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
gate = next((g for g in result["gates"] if g["name"] == "BLOCK_RELEASE"), None)
self.assertIsNotNone(gate)
self.assertEqual(gate["status"], "BLOCKED", "Should be BLOCKED with open [RISK] task")
async def test_block_release_gate_passes_when_no_risks(self):
"""BLOCK_RELEASE passes when no open high/critical [RISK] tasks."""
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
gate = next((g for g in result["gates"] if g["name"] == "BLOCK_RELEASE"), None)
self.assertIsNotNone(gate)
self.assertEqual(gate["status"], "PASS")
# ── Gate: DEGRADE_MODE ───────────────────────────────────────────────────
async def test_degrade_mode_passes_when_no_snapshot(self):
"""DEGRADE_MODE should PASS (no data = conservative) when no snapshot."""
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
gate = next((g for g in result["gates"] if g["name"] == "DEGRADE_MODE"), None)
self.assertIsNotNone(gate)
self.assertEqual(gate["status"], "PASS", "No snapshot → conservative PASS")
async def test_degrade_mode_triggers_from_snapshot(self):
"""DEGRADE_MODE fires when ops_failure_rate > 0.33 and runs >= 3."""
snap_id = "snap-lv6-" + self._pid[:8]
metrics = {"ops_failure_rate": 0.5, "agent_runs_in_window": 5}
db = await self._db.get_db()
await db.execute(
"INSERT OR IGNORE INTO graph_snapshots(id, project_id, scope, window, date_bucket, metrics, created_at)"
" VALUES(?,?,?,?,?,?,?)",
(snap_id, self._pid, "project", "7d", "2026-W99", __import__("json").dumps(metrics), self._db._now()),
)
await db.commit()
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
gate = next((g for g in result["gates"] if g["name"] == "DEGRADE_MODE"), None)
self.assertIsNotNone(gate)
self.assertEqual(gate["status"], "DEGRADED")
# ── Gate: PROMPT_FREEZE ──────────────────────────────────────────────────
async def test_prompt_freeze_passes_without_streaks(self):
"""PROMPT_FREEZE passes when no quality regression streak."""
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
gate = next((g for g in result["gates"] if g["name"] == "PROMPT_FREEZE"), None)
self.assertIsNotNone(gate)
self.assertEqual(gate["status"], "PASS")
# ── Gate persistence ─────────────────────────────────────────────────────
async def test_evaluate_gates_idempotent_apply(self):
"""Applying gates twice for same bucket reuses existing node (upsert)."""
r1 = await self._db.evaluate_governance_gates(self._pid, dry_run=False)
node_id_1 = r1.get("gate_node_id")
r2 = await self._db.evaluate_governance_gates(self._pid, dry_run=False)
node_id_2 = r2.get("gate_node_id")
self.assertEqual(node_id_1, node_id_2, "Same bucket → same node_id (upsert)")
# ── Auto-plan ────────────────────────────────────────────────────────────
async def test_auto_plan_creates_planned_entries(self):
"""auto_plan_drift_signal creates planned run entries in evidence."""
db = await self._db.get_db()
# Ensure portfolio project exists
await db.execute(
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
("portfolio", "portfolio", self._db._now(), self._db._now()),
)
import json, hashlib
ev = {
"bucket": "2026-W90",
"metric": "ops",
"projects": [{"project_id": self._pid, "streak": {"dir": "regressing", "len": 2}}],
}
sig_id = "sig-lv6-" + self._pid[:8]
fp = hashlib.sha256(f"portfolio_ops_drift|2026-W90|{self._pid}".encode()).hexdigest()[:24]
await db.execute(
"INSERT OR IGNORE INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at)"
" VALUES(?,?,?,?,?,?,?,?,?,?)",
(sig_id, "portfolio", "portfolio_ops_drift", "high", "Ops drift W90", "", json.dumps(ev), "open", self._db._now(), self._db._now()),
)
await db.commit()
result = await self._db.auto_plan_drift_signal(sig_id)
self.assertNotIn("error", result)
self.assertGreater(result["total_planned"], 0, "Should plan at least one run")
# Verify idempotency: planning again adds 0 new entries
result2 = await self._db.auto_plan_drift_signal(sig_id)
self.assertEqual(result2["total_planned"], 0, "Idempotent: no new entries on second call")
async def test_auto_run_dry_run_no_writes(self):
"""auto_run_drift_signal with dry_run=True does not fire real runs."""
db = await self._db.get_db()
await db.execute(
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
("portfolio", "portfolio", self._db._now(), self._db._now()),
)
import json
ev = {
"bucket": "2026-W91",
"metric": "risk",
"projects": [{"project_id": self._pid, "streak": {"dir": "regressing", "len": 3}}],
}
sig_id = "sig-lv6b-" + self._pid[:8]
await db.execute(
"INSERT OR IGNORE INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at)"
" VALUES(?,?,?,?,?,?,?,?,?,?)",
(sig_id, "portfolio", "portfolio_risk_drift", "critical", "Risk drift W91", "", json.dumps(ev), "open", self._db._now(), self._db._now()),
)
await db.commit()
result = await self._db.auto_run_drift_signal(sig_id, dry_run=True, supervisor_url="http://localhost:0")
self.assertTrue(result["dry_run"])
# All fired entries should be dry_run=True (planned) not real runs
for f in result.get("fired", []):
self.assertTrue(f.get("dry_run", True), "dry_run=True → no real fire")
# ── Gates summary ────────────────────────────────────────────────────────
async def test_gates_result_has_summary(self):
"""Gate evaluation returns summary with all_clear and blocked list."""
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
self.assertIn("summary", result)
self.assertIn("all_clear", result["summary"])
self.assertIn("blocked", result["summary"])
class TestLevel7(unittest.IsolatedAsyncioTestCase):
"""Tests for Level 7: Governance Audit Trail (append_governance_event, list, hooks)."""
async def asyncSetUp(self):
import uuid as _uuid
self._pid = f"lv7-{_uuid.uuid4().hex[:10]}"
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
db = await _db_module.get_db()
await db.execute(
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
(self._pid, "LV7 Test", _db_module._now(), _db_module._now()),
)
await db.execute(
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
("portfolio", "portfolio", _db_module._now(), _db_module._now()),
)
await db.commit()
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
# ── append_governance_event ──────────────────────────────────────────────
async def test_append_event_basic(self):
"""append_governance_event returns an event_id and is stored."""
ev_id = await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="user",
event_type="test_event",
idempotency_key=f"test|basic|{self._pid}",
severity="info", status="ok",
)
self.assertIsNotNone(ev_id)
items = await self._db.list_governance_events(project_id=self._pid)
self.assertTrue(any(i["event_type"] == "test_event" for i in items))
async def test_append_event_idempotent(self):
"""Same idempotency_key does not create duplicate event."""
ikey = f"test|idem|{self._pid}"
await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="user",
event_type="idem_event", idempotency_key=ikey,
)
await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="user",
event_type="idem_event", idempotency_key=ikey,
)
items = await self._db.list_governance_events(project_id=self._pid, event_type="idem_event")
self.assertEqual(len(items), 1, "Should have exactly 1 event despite 2 inserts")
# ── list_governance_events filters ──────────────────────────────────────
async def test_list_filter_by_event_type(self):
"""list_governance_events filters by event_type."""
await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="system",
event_type="type_a", idempotency_key=f"fa|{self._pid}",
)
await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="system",
event_type="type_b", idempotency_key=f"fb|{self._pid}",
)
items_a = await self._db.list_governance_events(project_id=self._pid, event_type="type_a")
items_b = await self._db.list_governance_events(project_id=self._pid, event_type="type_b")
self.assertEqual(len(items_a), 1)
self.assertEqual(len(items_b), 1)
async def test_list_filter_by_status(self):
"""list_governance_events filters by status."""
await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="system",
event_type="err_event", idempotency_key=f"ferr|{self._pid}",
status="error",
)
errors = await self._db.list_governance_events(project_id=self._pid, status="error")
self.assertTrue(any(i["status"] == "error" for i in errors))
async def test_list_portfolio_scope(self):
"""Events written to portfolio scope are queryable by scope=portfolio."""
await self._db.append_governance_event(
scope="portfolio", project_id="portfolio", actor_type="system",
event_type="portfolio_event", idempotency_key=f"port|{self._pid}",
)
items = await self._db.list_governance_events(scope="portfolio", project_id="portfolio")
self.assertTrue(any(i["event_type"] == "portfolio_event" for i in items))
# ── Gate hooks ───────────────────────────────────────────────────────────
async def test_gate_preview_logs_event(self):
"""evaluate_governance_gates(dry_run=True) logs gate_previewed event."""
import asyncio
await self._db.evaluate_governance_gates(self._pid, dry_run=True)
# Fire-and-forget tasks need a moment to run
await asyncio.sleep(0.05)
items = await self._db.list_governance_events(project_id=self._pid, event_type="gate_previewed")
self.assertEqual(len(items), 1, "Should log gate_previewed on preview")
async def test_gate_persist_logs_event(self):
"""evaluate_governance_gates(dry_run=False) logs gate_evaluated event."""
import asyncio
await self._db.evaluate_governance_gates(self._pid, dry_run=False)
await asyncio.sleep(0.05)
items = await self._db.list_governance_events(project_id=self._pid, event_type="gate_evaluated")
self.assertEqual(len(items), 1, "Should log gate_evaluated on persist")
# ── Auto-plan hook ────────────────────────────────────────────────────────
async def test_drift_plan_logs_event(self):
"""auto_plan_drift_signal logs drift_planned event."""
import asyncio, json
db = await self._db.get_db()
ev = {"bucket": "2026-W95", "metric": "ops",
"projects": [{"project_id": self._pid, "streak": {"dir": "regressing", "len": 2}}]}
sig_id = f"sig-lv7-{self._pid[:8]}"
await db.execute(
"INSERT OR IGNORE INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at)"
" VALUES(?,?,?,?,?,?,?,?,?,?)",
(sig_id, "portfolio", "portfolio_ops_drift", "high", "Ops W95", "", json.dumps(ev), "open", self._db._now(), self._db._now()),
)
await db.commit()
await self._db.auto_plan_drift_signal(sig_id)
await asyncio.sleep(0.05)
items = await self._db.list_governance_events(scope="portfolio", project_id="portfolio", event_type="drift_planned")
self.assertGreater(len(items), 0, "drift_planned event should be logged")
# ── Evidence JSON schema ──────────────────────────────────────────────────
async def test_evidence_schema_v1(self):
"""Stored evidence has v=1 schema with required keys."""
await self._db.append_governance_event(
scope="project", project_id=self._pid, actor_type="user",
event_type="schema_check", idempotency_key=f"schema|{self._pid}",
evidence=self._db._make_evidence("test schema", bucket="2026-W09"),
)
items = await self._db.list_governance_events(project_id=self._pid, event_type="schema_check")
self.assertEqual(len(items), 1)
ev = items[0]["evidence"]
self.assertEqual(ev.get("v"), 1)
self.assertIn("message", ev)
self.assertIn("links", ev)
self.assertIn("timings", ev)
class TestAgentOverrides(unittest.IsolatedAsyncioTestCase):
"""Tests for Level 8: Agent Overrides (Projects = Agents)."""
async def asyncSetUp(self):
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
await _db_module.get_db()
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
# ── CRUD ─────────────────────────────────────────────────────────────────
async def test_upsert_creates_override(self):
"""upsert_agent_override creates a new row."""
ov = await self._db.upsert_agent_override(
"NODA1", "helion",
display_name="Helion Test",
domain="ai, strategy",
)
self.assertEqual(ov["node_id"], "NODA1")
self.assertEqual(ov["agent_id"], "helion")
self.assertEqual(ov["display_name"], "Helion Test")
async def test_upsert_updates_override(self):
"""Second upsert updates existing row."""
await self._db.upsert_agent_override("NODA1", "druid", display_name="Druid v1")
ov2 = await self._db.upsert_agent_override("NODA1", "druid", display_name="Druid v2")
self.assertEqual(ov2["display_name"], "Druid v2")
async def test_get_returns_none_for_missing(self):
"""get_agent_override returns None when no override exists."""
result = await self._db.get_agent_override("NODA1", "nonexistent_agent")
self.assertIsNone(result)
async def test_upsert_partial_update(self):
"""Partial upsert only changes provided fields."""
await self._db.upsert_agent_override("NODA1", "sofiia",
display_name="Sophia", domain="arch")
# Update only system_prompt_md, not display_name
ov2 = await self._db.upsert_agent_override("NODA1", "sofiia",
system_prompt_md="# You are Sophia")
self.assertEqual(ov2["display_name"], "Sophia") # unchanged
self.assertEqual(ov2["system_prompt_md"], "# You are Sophia") # updated
async def test_delete_removes_override(self):
"""delete_agent_override removes the row."""
await self._db.upsert_agent_override("NODA1", "senpai", display_name="Senpai")
await self._db.delete_agent_override("NODA1", "senpai")
result = await self._db.get_agent_override("NODA1", "senpai")
self.assertIsNone(result)
async def test_list_agent_overrides_filtered(self):
"""list_agent_overrides filters by node_id."""
await self._db.upsert_agent_override("NODA1", "agent_a", display_name="A")
await self._db.upsert_agent_override("NODA2", "agent_b", display_name="B")
noda1 = await self._db.list_agent_overrides("NODA1")
noda2 = await self._db.list_agent_overrides("NODA2")
self.assertTrue(all(o["node_id"] == "NODA1" for o in noda1))
self.assertTrue(all(o["node_id"] == "NODA2" for o in noda2))
async def test_is_hidden_flag(self):
"""is_hidden flag is stored and retrieved correctly."""
await self._db.upsert_agent_override("NODA1", "hidden_agent", is_hidden=True)
ov = await self._db.get_agent_override("NODA1", "hidden_agent")
self.assertTrue(bool(ov["is_hidden"]))
# ── Registry merge ────────────────────────────────────────────────────────
async def test_override_display_name_persists(self):
"""display_name override is persisted and retrievable."""
await self._db.upsert_agent_override("NODA1", "helion", display_name="Helion Custom")
ov = await self._db.get_agent_override("NODA1", "helion")
self.assertEqual(ov["display_name"], "Helion Custom")
self.assertIsNone(ov.get("system_prompt_md"))
class TestAgentsOps(unittest.IsolatedAsyncioTestCase):
"""Tests for Agents Ops: versioning, safe apply, drift, bulk, audit events."""
async def asyncSetUp(self):
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
await _db_module.get_db()
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
# ── Version snapshots ──────────────────────────────────────────────────────
async def test_upsert_creates_version_snapshot(self):
"""upsert_agent_override creates a version row automatically."""
ov = await self._db.upsert_agent_override(
"NODA1", "helion",
display_name="Helion", system_prompt_md="# Helion prompt",
)
self.assertIn("version_hash", ov)
self.assertIsNotNone(ov["version_hash"])
versions = await self._db.list_agent_versions("NODA1", "helion")
self.assertGreaterEqual(len(versions), 1)
self.assertEqual(versions[0]["version_hash"], ov["version_hash"])
async def test_versions_are_idempotent_by_content(self):
"""Same content twice → only one version row (idempotent by hash)."""
await self._db.upsert_agent_override("NODA1", "druid",
system_prompt_md="# same")
await self._db.upsert_agent_override("NODA1", "druid",
system_prompt_md="# same")
versions = await self._db.list_agent_versions("NODA1", "druid")
hashes = [v["version_hash"] for v in versions]
self.assertEqual(len(hashes), len(set(hashes)), "duplicate version hash")
async def test_different_content_creates_new_version(self):
"""Different content → new version row."""
await self._db.upsert_agent_override("NODA1", "sofiia",
system_prompt_md="# v1")
await self._db.upsert_agent_override("NODA1", "sofiia",
system_prompt_md="# v2")
versions = await self._db.list_agent_versions("NODA1", "sofiia")
self.assertGreaterEqual(len(versions), 2)
async def test_get_agent_version_by_hash(self):
"""get_agent_version_by_hash returns correct payload."""
ov = await self._db.upsert_agent_override(
"NODA1", "senpai", system_prompt_md="# senpai v1",
)
vh = ov["version_hash"]
version = await self._db.get_agent_version_by_hash("NODA1", "senpai", vh)
self.assertIsNotNone(version)
self.assertEqual(version["payload"].get("system_prompt_md"), "# senpai v1")
async def test_get_agent_version_returns_none_for_unknown_hash(self):
"""Unknown version_hash returns None."""
result = await self._db.get_agent_version_by_hash("NODA1", "nobody", "deadbeef")
self.assertIsNone(result)
# ── Payload hash ──────────────────────────────────────────────────────────
async def test_payload_hash_ignores_timestamps(self):
"""_agent_payload_hash ignores updated_at and last_applied_at."""
p1 = {"display_name": "X", "domain": "ai", "system_prompt_md": "# hi",
"updated_at": "2026-01-01", "last_applied_at": "2026-02-01"}
p2 = {"display_name": "X", "domain": "ai", "system_prompt_md": "# hi",
"updated_at": "2026-05-05", "last_applied_at": None}
h1 = self._db._agent_payload_hash(p1)
h2 = self._db._agent_payload_hash(p2)
self.assertEqual(h1, h2)
async def test_payload_hash_differs_on_content_change(self):
"""Different content → different hash."""
h1 = self._db._agent_payload_hash({"system_prompt_md": "v1"})
h2 = self._db._agent_payload_hash({"system_prompt_md": "v2"})
self.assertNotEqual(h1, h2)
# ── last_applied_hash (mark applied) ──────────────────────────────────────
async def test_mark_applied_hash_stored(self):
"""_mark_applied_hash is stored in last_applied_hash."""
ov = await self._db.upsert_agent_override(
"NODA1", "helion_apply", system_prompt_md="# prompt",
)
vh = ov["version_hash"]
# Simulate successful apply
updated = await self._db.upsert_agent_override(
"NODA1", "helion_apply", _mark_applied_hash=vh,
)
self.assertEqual(updated["last_applied_hash"], vh)
self.assertIsNotNone(updated.get("last_applied_at"))
async def test_drift_detected_when_hash_differs(self):
"""Drift = last_applied_hash != desired_hash."""
ov = await self._db.upsert_agent_override(
"NODA1", "drift_agent", system_prompt_md="# original",
)
old_hash = ov["version_hash"]
# Mark applied at old version
await self._db.upsert_agent_override("NODA1", "drift_agent",
_mark_applied_hash=old_hash)
# Change content → new desired hash
ov2 = await self._db.upsert_agent_override(
"NODA1", "drift_agent", system_prompt_md="# changed",
)
new_hash = ov2["version_hash"]
self.assertNotEqual(old_hash, new_hash, "should be different versions")
fresh = await self._db.get_agent_override("NODA1", "drift_agent")
self.assertEqual(fresh["last_applied_hash"], old_hash)
# Drift: desired (new_hash) != last_applied (old_hash)
desired = {"display_name": fresh.get("display_name"),
"domain": fresh.get("domain"),
"system_prompt_md": fresh.get("system_prompt_md")}
desired_hash = self._db._agent_payload_hash(desired)
self.assertNotEqual(desired_hash, fresh["last_applied_hash"])
# ── Audit integration ─────────────────────────────────────────────────────
async def test_audit_agent_override_saved_event(self):
"""agent_override_saved audit event is written correctly."""
await self._db.append_governance_event(
scope="project", project_id="helion",
actor_type="user",
event_type="agent_override_saved",
idempotency_key="test|aos|NODA1|helion|abc123",
severity="info", status="ok",
ref_type="agent", ref_id="helion",
evidence=self._db._make_evidence(
message="Override saved",
outputs={"version_hash": "abc123"},
),
)
events = await self._db.list_governance_events(
scope="project", project_id="helion",
event_type="agent_override_saved",
)
self.assertGreaterEqual(len(events), 1)
self.assertEqual(events[0]["event_type"], "agent_override_saved")
async def test_audit_agent_apply_planned_event(self):
"""agent_apply_planned audit event written for dry-run."""
await self._db.append_governance_event(
scope="project", project_id="druid",
actor_type="user",
event_type="agent_apply_planned",
idempotency_key="test|aap|NODA1|druid|planXXX",
severity="info", status="ok",
ref_type="agent", ref_id="druid",
evidence=self._db._make_evidence(
message="Apply planned for druid@NODA1",
outputs={"plan_id": "planXXX", "will_change": True, "diff_lines": 5},
),
)
events = await self._db.list_governance_events(
scope="project", project_id="druid",
event_type="agent_apply_planned",
)
self.assertGreaterEqual(len(events), 1)
ev = events[0]
self.assertEqual(ev["status"], "ok")
ev_json = ev.get("evidence") or {}
self.assertIn("plan_id", ev_json.get("outputs", {}))
async def test_audit_idempotency_no_duplicate(self):
"""Same idempotency_key → only one event row."""
key = "test|unique|ops|agent1|abc"
for _ in range(3):
await self._db.append_governance_event(
scope="project", project_id="agent1",
actor_type="user",
event_type="agent_apply_executed",
idempotency_key=key,
severity="info", status="ok",
ref_type="agent", ref_id="agent1",
evidence={},
)
events = await self._db.list_governance_events(
scope="project", project_id="agent1",
event_type="agent_apply_executed",
)
matching = [e for e in events if e.get("idempotency_key") == key]
self.assertEqual(len(matching), 1)
async def test_audit_rollback_event(self):
"""agent_rollback_executed event is written."""
await self._db.append_governance_event(
scope="project", project_id="senpai",
actor_type="user",
event_type="agent_rollback_executed",
idempotency_key="test|arb|NODA1|senpai|abc|t1",
severity="warn", status="ok",
ref_type="agent", ref_id="senpai",
evidence=self._db._make_evidence(
message="Rollback to abc",
outputs={"version_hash": "abc"},
),
)
events = await self._db.list_governance_events(
scope="project", project_id="senpai",
event_type="agent_rollback_executed",
)
self.assertGreaterEqual(len(events), 1)
self.assertEqual(events[0]["severity"], "warn")
class TestMultiNodeCanary(unittest.IsolatedAsyncioTestCase):
"""Tests for Multi-node fanout + Canary Apply (Agents Ops v2)."""
async def asyncSetUp(self):
_db_module._db_conn = None
try:
await _db_module.close_db()
except Exception:
pass
await _db_module.get_db()
self._db = _db_module
async def asyncTearDown(self):
try:
await _db_module.close_db()
except Exception:
pass
# ── node_policy helper ─────────────────────────────────────────────────────
def test_node_policy_prod_defaults(self):
"""get_node_policy returns prod defaults for NODA1."""
import sys
sys.path.insert(0, str(
__import__('pathlib').Path(__file__).resolve().parent.parent /
'services' / 'sofiia-console'
))
from app.config import get_node_policy
policy = get_node_policy("NODA1")
self.assertEqual(policy["node_role"], "prod")
self.assertGreaterEqual(policy["gateway_timeout_ms"], 1000)
def test_node_policy_dev_shorter_timeout(self):
"""NODA2 (dev) has shorter timeout than NODA1 (prod)."""
import sys
sys.path.insert(0, str(
__import__('pathlib').Path(__file__).resolve().parent.parent /
'services' / 'sofiia-console'
))
from app.config import get_node_policy
prod = get_node_policy("NODA1")
dev = get_node_policy("NODA2")
self.assertLessEqual(dev["gateway_timeout_ms"], prod["gateway_timeout_ms"])
# ── payload hash / drift ───────────────────────────────────────────────────
async def test_canary_selects_drift_agents_deterministically(self):
"""Canary candidate selection: only drift agents, sorted by agent_id."""
# Create 3 overrides; mark 2 as drift
for aid in ["beta_agent", "alpha_agent", "gamma_agent"]:
ov = await self._db.upsert_agent_override(
"NODA1", aid, system_prompt_md=f"# {aid} v1"
)
# mark alpha and gamma as applied (no drift), beta NOT applied (drift)
if aid in ("alpha_agent", "gamma_agent"):
await self._db.upsert_agent_override(
"NODA1", aid, _mark_applied_hash=ov["version_hash"]
)
overrides = await self._db.list_agent_overrides("NODA1")
candidates = sorted(
[o for o in overrides if not o.get("is_hidden")],
key=lambda o: o["agent_id"],
)
drift_candidates = []
for o in candidates:
desired = {"display_name": o.get("display_name"),
"domain": o.get("domain"),
"system_prompt_md": o.get("system_prompt_md")}
plan_id = self._db._agent_payload_hash(desired)
is_drift = bool(o.get("last_applied_hash") and
o["last_applied_hash"] != plan_id)
if is_drift:
drift_candidates.append(o["agent_id"])
# beta_agent has no last_applied_hash → no drift by the formula
# (drift = last_applied_hash exists AND != plan_id)
# alpha and gamma have last_applied_hash matching plan_id → no drift
# So for this test, change beta so it HAS last_applied_hash but outdated
await self._db.upsert_agent_override("NODA1", "beta_agent",
_mark_applied_hash="oldHash")
# Now update content so desired != old
await self._db.upsert_agent_override("NODA1", "beta_agent",
system_prompt_md="# beta v2")
overrides2 = await self._db.list_agent_overrides("NODA1")
drift2 = []
for o in sorted(overrides2, key=lambda x: x["agent_id"]):
desired = {"display_name": o.get("display_name"),
"domain": o.get("domain"),
"system_prompt_md": o.get("system_prompt_md")}
plan_id = self._db._agent_payload_hash(desired)
if o.get("last_applied_hash") and o["last_applied_hash"] != plan_id:
drift2.append(o["agent_id"])
# beta_agent should be the only drift candidate
self.assertIn("beta_agent", drift2)
self.assertNotIn("alpha_agent", drift2)
self.assertNotIn("gamma_agent", drift2)
# Sorted: beta comes after alpha alphabetically
self.assertEqual(drift2, sorted(drift2))
# ── audit events for bulk ──────────────────────────────────────────────────
async def test_bulk_plan_created_audit_event(self):
"""agent_bulk_plan_created event written correctly."""
run_id = "testrun001"
await self._db.append_governance_event(
scope="portfolio", project_id="portfolio",
actor_type="user",
event_type="agent_bulk_plan_created",
idempotency_key=f"abpc|{run_id}|NODA1|all",
severity="info", status="ok",
evidence=self._db._make_evidence(
message="Bulk apply planned",
outputs={"mode": "all", "nodes": ["NODA1"], "dry_run": True},
),
)
events = await self._db.list_governance_events(
scope="portfolio", project_id="portfolio",
event_type="agent_bulk_plan_created",
)
self.assertGreaterEqual(len(events), 1)
async def test_canary_stopped_audit_event(self):
"""agent_bulk_canary_stopped event has high severity."""
run_id = "testrun002"
await self._db.append_governance_event(
scope="portfolio", project_id="portfolio",
actor_type="user",
event_type="agent_bulk_canary_stopped",
idempotency_key=f"abcstop|{run_id}|NODA1|helion",
severity="high", status="error",
evidence=self._db._make_evidence(
message="Canary stopped on helion: HTTP 500",
outputs={"failed_agent": "helion", "error": "HTTP 500"},
),
)
events = await self._db.list_governance_events(
scope="portfolio", project_id="portfolio",
event_type="agent_bulk_canary_stopped",
)
self.assertGreaterEqual(len(events), 1)
self.assertEqual(events[0]["severity"], "high")
self.assertEqual(events[0]["status"], "error")
async def test_canary_completed_audit_event(self):
"""agent_bulk_apply_completed event written on success."""
run_id = "testrun003"
await self._db.append_governance_event(
scope="portfolio", project_id="portfolio",
actor_type="user",
event_type="agent_bulk_apply_completed",
idempotency_key=f"abac|{run_id}|NODA1",
severity="info", status="ok",
evidence=self._db._make_evidence(
message="Canary apply completed: 2 agents",
outputs={"agents_applied": ["sofiia", "helion"]},
),
)
events = await self._db.list_governance_events(
scope="portfolio", project_id="portfolio",
event_type="agent_bulk_apply_completed",
)
self.assertGreaterEqual(len(events), 1)
self.assertEqual(events[0]["status"], "ok")
async def test_bulk_audit_idempotent(self):
"""Bulk audit events with same key do not duplicate."""
key = "abpc|DUPTEST|NODA1|canary"
for _ in range(3):
await self._db.append_governance_event(
scope="portfolio", project_id="portfolio",
actor_type="user",
event_type="agent_bulk_plan_created",
idempotency_key=key,
severity="info", status="ok",
evidence={},
)
events = await self._db.list_governance_events(
scope="portfolio", project_id="portfolio",
event_type="agent_bulk_plan_created",
)
matching = [e for e in events if e.get("idempotency_key") == key]
self.assertEqual(len(matching), 1)
# ── partial failure isolation ──────────────────────────────────────────────
async def test_partial_failure_does_not_block_other_overrides(self):
"""If one agent has no prompt, others in overrides list are still processed."""
# Create 2 overrides: one with prompt, one without
await self._db.upsert_agent_override("NODA1", "agent_with_prompt",
system_prompt_md="# valid prompt")
await self._db.upsert_agent_override("NODA1", "agent_no_prompt",
domain="test-domain") # no system_prompt_md
overrides = await self._db.list_agent_overrides("NODA1")
results = []
for ov in overrides:
desired_prompt = (ov.get("system_prompt_md") or "")
gw_url = "" # no gateway in test
if not desired_prompt or not gw_url:
results.append({"agent_id": ov["agent_id"], "status": "skipped"})
else:
results.append({"agent_id": ov["agent_id"], "status": "planned"})
# Both processed, none throws
agent_ids = [r["agent_id"] for r in results]
self.assertIn("agent_with_prompt", agent_ids)
self.assertIn("agent_no_prompt", agent_ids)
# agent_with_prompt has prompt but no gw_url → skipped (same path as no-gw)
for r in results:
self.assertIn(r["status"], ("skipped", "planned"))
async def test_limit_canary_to_n_agents(self):
"""Canary with limit=2 selects at most 2 drift agents."""
for i in range(5):
ov = await self._db.upsert_agent_override(
"NODA1", f"canary_agent_{i}", system_prompt_md=f"# v{i}")
await self._db.upsert_agent_override(
"NODA1", f"canary_agent_{i}", _mark_applied_hash="oldhash{i}")
# Update to create drift
await self._db.upsert_agent_override(
"NODA1", f"canary_agent_{i}", system_prompt_md=f"# v{i} updated")
overrides = await self._db.list_agent_overrides("NODA1")
candidates = sorted(
[o for o in overrides if not o.get("is_hidden") and
o["agent_id"].startswith("canary_agent_")],
key=lambda o: o["agent_id"],
)
limit = 2
drift_cands = []
for o in candidates:
desired = {"display_name": o.get("display_name"),
"domain": o.get("domain"),
"system_prompt_md": o.get("system_prompt_md")}
plan_id = self._db._agent_payload_hash(desired)
if o.get("last_applied_hash") and o["last_applied_hash"] != plan_id:
drift_cands.append(o["agent_id"])
selected = drift_cands[:limit]
self.assertLessEqual(len(selected), limit)
# Sorted deterministically
self.assertEqual(selected, sorted(selected))
if __name__ == "__main__":
unittest.main()