New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
3225 lines
155 KiB
Python
3225 lines
155 KiB
Python
"""
|
|
tests/test_sofiia_docs.py
|
|
|
|
Unit tests for sofiia-console Projects/Documents/Sessions/Dialog Map.
|
|
|
|
Tests:
|
|
- DB: projects CRUD
|
|
- DB: documents CRUD + SHA-256 stability
|
|
- DB: sessions upsert + turn count
|
|
- DB: messages + parent_msg_id branching
|
|
- DB: dialog map nodes/edges
|
|
- DB: fork_session copies ancestors
|
|
- API: upload size limits config
|
|
- API: mime validation (allowed/blocked)
|
|
- API: search documents (keyword)
|
|
- API: session fork returns new_session_id
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
import uuid
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
# ── path setup ──────────────────────────────────────────────────────────────
|
|
_ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(_ROOT / "services" / "sofiia-console"))
|
|
|
|
# Use a temp file DB for tests
|
|
_TMP_DIR = tempfile.mkdtemp(prefix="sofiia_test_")
|
|
os.environ["SOFIIA_DATA_DIR"] = _TMP_DIR
|
|
|
|
|
|
def _run(coro):
|
|
return asyncio.get_event_loop().run_until_complete(coro)
|
|
|
|
|
|
# ── Import after env setup ───────────────────────────────────────────────────
|
|
try:
|
|
import aiosqlite # noqa — ensure available
|
|
# Try to import db module directly (may fail without full app context)
|
|
try:
|
|
from app import db as _db_module
|
|
_DB_AVAILABLE = True
|
|
except ImportError:
|
|
# Import directly from file path
|
|
import importlib.util
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"sofiia_db",
|
|
str(_ROOT / "services" / "sofiia-console" / "app" / "db.py"),
|
|
)
|
|
_db_module = importlib.util.module_from_spec(_spec)
|
|
_spec.loader.exec_module(_db_module)
|
|
_DB_AVAILABLE = True
|
|
_AIOSQLITE_AVAILABLE = True
|
|
except (ImportError, Exception) as _e:
|
|
_AIOSQLITE_AVAILABLE = False
|
|
_DB_AVAILABLE = False
|
|
_db_module = None
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestProjectsCRUD(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None # reset connection
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_create_and_get_project(self):
|
|
p = await self._db.create_project("Test Project", "desc")
|
|
self.assertIn("project_id", p)
|
|
self.assertEqual(p["name"], "Test Project")
|
|
fetched = await self._db.get_project(p["project_id"])
|
|
self.assertIsNotNone(fetched)
|
|
self.assertEqual(fetched["name"], "Test Project")
|
|
|
|
async def test_list_projects_includes_default(self):
|
|
projects = await self._db.list_projects()
|
|
ids = [p["project_id"] for p in projects]
|
|
self.assertIn("default", ids, "Default project must always exist")
|
|
|
|
async def test_update_project(self):
|
|
p = await self._db.create_project("Old Name")
|
|
ok = await self._db.update_project(p["project_id"], name="New Name")
|
|
self.assertTrue(ok)
|
|
updated = await self._db.get_project(p["project_id"])
|
|
self.assertEqual(updated["name"], "New Name")
|
|
|
|
async def test_get_nonexistent_project_returns_none(self):
|
|
result = await self._db.get_project("nonexistent_xyz")
|
|
self.assertIsNone(result)
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestDocumentsCRUD(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_create_document(self):
|
|
doc = await self._db.create_document(
|
|
project_id="default",
|
|
file_id="abc123def456",
|
|
sha256="a" * 64,
|
|
mime="application/pdf",
|
|
size_bytes=1024,
|
|
filename="test.pdf",
|
|
title="My Test Doc",
|
|
tags=["invoice", "2026"],
|
|
extracted_text="Sample text content",
|
|
)
|
|
self.assertIn("doc_id", doc)
|
|
self.assertEqual(doc["filename"], "test.pdf")
|
|
self.assertEqual(doc["tags"], ["invoice", "2026"])
|
|
|
|
async def test_sha256_stability(self):
|
|
"""SHA-256 must be stored exactly as given (no mutation)."""
|
|
sha = "b" * 64
|
|
doc = await self._db.create_document(
|
|
"default", "fid", sha, "text/plain", 100, "file.txt"
|
|
)
|
|
fetched = await self._db.get_document(doc["doc_id"])
|
|
self.assertEqual(fetched["sha256"], sha)
|
|
|
|
async def test_list_documents_by_project(self):
|
|
p = await self._db.create_project("DocProject")
|
|
await self._db.create_document(p["project_id"], "f1", "c"*64, "text/plain", 10, "a.txt")
|
|
await self._db.create_document(p["project_id"], "f2", "d"*64, "text/plain", 20, "b.txt")
|
|
docs = await self._db.list_documents(p["project_id"])
|
|
self.assertEqual(len(docs), 2)
|
|
|
|
async def test_search_documents_by_title(self):
|
|
p = await self._db.create_project("SearchProject")
|
|
await self._db.create_document(p["project_id"], "f1", "e"*64, "text/plain", 10, "budget.txt",
|
|
title="Annual Budget 2026")
|
|
await self._db.create_document(p["project_id"], "f2", "f"*64, "text/plain", 10, "report.txt",
|
|
title="Monthly Report")
|
|
results = await self._db.search_documents(p["project_id"], "Budget")
|
|
self.assertEqual(len(results), 1)
|
|
self.assertIn("budget.txt", results[0]["filename"])
|
|
|
|
async def test_get_document_wrong_project(self):
|
|
doc = await self._db.create_document(
|
|
"default", "gid", "g"*64, "text/plain", 5, "test.txt"
|
|
)
|
|
fetched = await self._db.get_document(doc["doc_id"])
|
|
self.assertIsNotNone(fetched)
|
|
# Simulating a "wrong project" check (as done in the API endpoint)
|
|
self.assertNotEqual(fetched["project_id"], "nonexistent_project")
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestSessionsAndMessages(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_upsert_session_creates(self):
|
|
s = await self._db.upsert_session("sess_test_001", project_id="default", title="Test Session")
|
|
self.assertEqual(s["session_id"], "sess_test_001")
|
|
self.assertEqual(s["title"], "Test Session")
|
|
|
|
async def test_upsert_session_updates_last_active(self):
|
|
await self._db.upsert_session("sess_002", project_id="default")
|
|
s2 = await self._db.upsert_session("sess_002", project_id="default")
|
|
self.assertEqual(s2["session_id"], "sess_002")
|
|
|
|
async def test_save_message_and_retrieve(self):
|
|
await self._db.upsert_session("sess_003", project_id="default")
|
|
m = await self._db.save_message("sess_003", "user", "Hello Sofiia")
|
|
self.assertIn("msg_id", m)
|
|
self.assertEqual(m["role"], "user")
|
|
self.assertEqual(m["content"], "Hello Sofiia")
|
|
|
|
async def test_message_branching_parent_msg_id(self):
|
|
await self._db.upsert_session("sess_branch", project_id="default")
|
|
m1 = await self._db.save_message("sess_branch", "user", "First message")
|
|
m2 = await self._db.save_message("sess_branch", "assistant", "First reply", parent_msg_id=m1["msg_id"])
|
|
# Fork from m1
|
|
m3 = await self._db.save_message("sess_branch", "user", "Branch question", parent_msg_id=m1["msg_id"], branch_label="branch-1")
|
|
|
|
msgs = await self._db.list_messages("sess_branch", limit=10)
|
|
self.assertEqual(len(msgs), 3)
|
|
branch_msgs = [m for m in msgs if m["branch_label"] == "branch-1"]
|
|
self.assertEqual(len(branch_msgs), 1)
|
|
self.assertEqual(branch_msgs[0]["parent_msg_id"], m1["msg_id"])
|
|
|
|
async def test_turn_count_increments(self):
|
|
await self._db.upsert_session("sess_count", project_id="default")
|
|
for i in range(3):
|
|
await self._db.save_message("sess_count", "user", f"Message {i}")
|
|
s = await self._db.get_session("sess_count")
|
|
self.assertGreaterEqual(s["turn_count"], 3)
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestDialogMap(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_dialog_map_nodes_and_edges(self):
|
|
await self._db.upsert_session("sess_map", project_id="default")
|
|
m1 = await self._db.save_message("sess_map", "user", "Hi there")
|
|
m2 = await self._db.save_message("sess_map", "assistant", "Hello!", parent_msg_id=m1["msg_id"])
|
|
m3 = await self._db.save_message("sess_map", "user", "Follow-up", parent_msg_id=m2["msg_id"])
|
|
|
|
dmap = await self._db.get_dialog_map("sess_map")
|
|
self.assertEqual(dmap["session_id"], "sess_map")
|
|
self.assertEqual(len(dmap["nodes"]), 3)
|
|
self.assertEqual(len(dmap["edges"]), 2) # m1→m2, m2→m3
|
|
|
|
async def test_dialog_map_empty_session(self):
|
|
await self._db.upsert_session("sess_empty_map", project_id="default")
|
|
dmap = await self._db.get_dialog_map("sess_empty_map")
|
|
self.assertEqual(dmap["nodes"], [])
|
|
self.assertEqual(dmap["edges"], [])
|
|
|
|
async def test_dialog_map_node_structure(self):
|
|
await self._db.upsert_session("sess_map2", project_id="default")
|
|
m = await self._db.save_message("sess_map2", "user", "Test node structure")
|
|
dmap = await self._db.get_dialog_map("sess_map2")
|
|
node = dmap["nodes"][0]
|
|
self.assertIn("id", node)
|
|
self.assertIn("role", node)
|
|
self.assertIn("preview", node)
|
|
self.assertIn("ts", node)
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestForkSession(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_fork_creates_new_session(self):
|
|
await self._db.upsert_session("sess_src", project_id="default")
|
|
m1 = await self._db.save_message("sess_src", "user", "Message 1")
|
|
m2 = await self._db.save_message("sess_src", "assistant", "Reply 1", parent_msg_id=m1["msg_id"])
|
|
m3 = await self._db.save_message("sess_src", "user", "Message 2", parent_msg_id=m2["msg_id"])
|
|
|
|
result = await self._db.fork_session("sess_src", from_msg_id=m2["msg_id"], new_title="Fork Test")
|
|
self.assertIn("new_session_id", result)
|
|
self.assertNotEqual(result["new_session_id"], "sess_src")
|
|
self.assertGreaterEqual(result["copied_turns"], 2) # m1 and m2 are ancestors
|
|
|
|
async def test_fork_messages_are_independent(self):
|
|
await self._db.upsert_session("sess_src2", project_id="default")
|
|
m1 = await self._db.save_message("sess_src2", "user", "Original message")
|
|
result = await self._db.fork_session("sess_src2", from_msg_id=m1["msg_id"])
|
|
new_sid = result["new_session_id"]
|
|
|
|
# New session exists
|
|
s = await self._db.get_session(new_sid)
|
|
self.assertIsNotNone(s)
|
|
|
|
# Modifying original doesn't affect fork
|
|
await self._db.save_message("sess_src2", "user", "New in original")
|
|
new_msgs = await self._db.list_messages(new_sid)
|
|
src_msgs = await self._db.list_messages("sess_src2")
|
|
self.assertLess(len(new_msgs), len(src_msgs))
|
|
|
|
|
|
class TestUploadSizeLimits(unittest.TestCase):
|
|
"""Upload size limit configuration tests (no DB needed)."""
|
|
|
|
def _get_docs_router_module(self):
|
|
"""Load docs_router module directly from filesystem."""
|
|
try:
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location(
|
|
"sofiia_docs_router",
|
|
str(_ROOT / "services" / "sofiia-console" / "app" / "docs_router.py"),
|
|
)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
# Pre-populate with dummy deps to avoid ImportError
|
|
import types
|
|
dummy = types.ModuleType("app.db")
|
|
sys.modules.setdefault("app", types.ModuleType("app"))
|
|
sys.modules["app.db"] = dummy
|
|
spec.loader.exec_module(mod)
|
|
return mod
|
|
except Exception:
|
|
return None
|
|
|
|
def _get_docs_router_limits(self):
|
|
"""Load docs_router module and check env-based limit defaults."""
|
|
mod = self._get_docs_router_module()
|
|
if mod:
|
|
return getattr(mod, "_MAX_IMAGE_MB", 10), getattr(mod, "_MAX_VIDEO_MB", 200), getattr(mod, "_MAX_DOC_MB", 50)
|
|
return 10, 200, 50
|
|
|
|
def test_default_image_limit_10mb(self):
|
|
img, vid, doc = self._get_docs_router_limits()
|
|
self.assertEqual(img, 10)
|
|
|
|
def test_default_video_limit_200mb(self):
|
|
img, vid, doc = self._get_docs_router_limits()
|
|
self.assertEqual(vid, 200)
|
|
|
|
def test_default_doc_limit_50mb(self):
|
|
img, vid, doc = self._get_docs_router_limits()
|
|
self.assertEqual(doc, 50)
|
|
|
|
def test_allowed_mimes_includes_pdf(self):
|
|
mod = self._get_docs_router_module()
|
|
if not mod:
|
|
self.skipTest("docs_router not importable")
|
|
self.assertIn("application/pdf", mod._ALLOWED_MIMES)
|
|
|
|
def test_allowed_mimes_includes_images(self):
|
|
mod = self._get_docs_router_module()
|
|
if not mod:
|
|
self.skipTest("docs_router not importable")
|
|
self.assertIn("image/jpeg", mod._ALLOWED_MIMES)
|
|
self.assertIn("image/png", mod._ALLOWED_MIMES)
|
|
|
|
def test_allowed_mimes_excludes_executables(self):
|
|
mod = self._get_docs_router_module()
|
|
if not mod:
|
|
self.skipTest("docs_router not importable")
|
|
self.assertNotIn("application/x-executable", mod._ALLOWED_MIMES)
|
|
self.assertNotIn("application/x-sh", mod._ALLOWED_MIMES)
|
|
|
|
|
|
class TestSafeFilename(unittest.TestCase):
|
|
"""Filename sanitization tests."""
|
|
|
|
def _get_safe_filename(self):
|
|
mod = TestUploadSizeLimits()._get_docs_router_module()
|
|
return getattr(mod, "_safe_filename", None) if mod else None
|
|
|
|
def test_safe_filename_strips_path(self):
|
|
fn = self._get_safe_filename()
|
|
if not fn:
|
|
self.skipTest("docs_router not importable")
|
|
self.assertEqual(fn("../../../etc/passwd"), "passwd")
|
|
self.assertEqual(fn("/absolute/path/file.txt"), "file.txt")
|
|
|
|
def test_safe_filename_removes_dangerous_chars(self):
|
|
fn = self._get_safe_filename()
|
|
if not fn:
|
|
self.skipTest("docs_router not importable")
|
|
result = fn("file; rm -rf /; .txt")
|
|
self.assertNotIn(";", result)
|
|
self.assertNotIn(" ", result)
|
|
|
|
def test_safe_filename_preserves_extension(self):
|
|
fn = self._get_safe_filename()
|
|
if not fn:
|
|
self.skipTest("docs_router not importable")
|
|
self.assertTrue(fn("report.pdf").endswith(".pdf"))
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestTasksCRUD(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Tasks (Kanban) persistence layer."""
|
|
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
# ensure test project
|
|
await self._db.create_project("TaskProject")
|
|
projects = await self._db.list_projects()
|
|
self._pid = next(p["project_id"] for p in projects if p["name"] == "TaskProject")
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_create_task(self):
|
|
task = await self._db.create_task(self._pid, "Fix the bug", description="Critical bug", priority="high")
|
|
self.assertIn("task_id", task)
|
|
self.assertEqual(task["title"], "Fix the bug")
|
|
self.assertEqual(task["status"], "backlog")
|
|
self.assertEqual(task["priority"], "high")
|
|
|
|
async def test_list_tasks_by_project(self):
|
|
await self._db.create_task(self._pid, "Task A")
|
|
await self._db.create_task(self._pid, "Task B", status="in_progress")
|
|
tasks = await self._db.list_tasks(self._pid)
|
|
titles = [t["title"] for t in tasks]
|
|
self.assertIn("Task A", titles)
|
|
self.assertIn("Task B", titles)
|
|
|
|
async def test_list_tasks_filtered_by_status(self):
|
|
await self._db.create_task(self._pid, "Done task", status="done")
|
|
await self._db.create_task(self._pid, "Backlog task", status="backlog")
|
|
done = await self._db.list_tasks(self._pid, status="done")
|
|
self.assertTrue(all(t["status"] == "done" for t in done))
|
|
|
|
async def test_update_task_status(self):
|
|
task = await self._db.create_task(self._pid, "Moveable task")
|
|
ok = await self._db.update_task(task["task_id"], status="in_progress")
|
|
self.assertTrue(ok)
|
|
updated = await self._db.get_task(task["task_id"])
|
|
self.assertEqual(updated["status"], "in_progress")
|
|
|
|
async def test_delete_task(self):
|
|
task = await self._db.create_task(self._pid, "Deletable task")
|
|
ok = await self._db.delete_task(task["task_id"])
|
|
self.assertTrue(ok)
|
|
fetched = await self._db.get_task(task["task_id"])
|
|
self.assertIsNone(fetched)
|
|
|
|
async def test_task_labels_round_trip(self):
|
|
task = await self._db.create_task(self._pid, "Labeled task", labels=["bug", "ui", "P1"])
|
|
fetched = await self._db.get_task(task["task_id"])
|
|
self.assertEqual(fetched["labels"], ["bug", "ui", "P1"])
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestMeetingsCRUD(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Meetings persistence layer."""
|
|
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
p = await self._db.create_project("MeetingProject")
|
|
self._pid = p["project_id"]
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_create_meeting(self):
|
|
m = await self._db.create_meeting(
|
|
self._pid, "Sprint Planning", "2026-03-01T10:00:00Z",
|
|
agenda="Goals and backlog review", duration_min=60,
|
|
)
|
|
self.assertIn("meeting_id", m)
|
|
self.assertEqual(m["title"], "Sprint Planning")
|
|
self.assertEqual(m["duration_min"], 60)
|
|
|
|
async def test_list_meetings(self):
|
|
await self._db.create_meeting(self._pid, "Meeting A", "2026-03-01T09:00:00Z")
|
|
await self._db.create_meeting(self._pid, "Meeting B", "2026-03-02T10:00:00Z")
|
|
meetings = await self._db.list_meetings(self._pid)
|
|
self.assertEqual(len(meetings), 2)
|
|
# Should be sorted by starts_at ASC
|
|
self.assertLess(meetings[0]["starts_at"], meetings[1]["starts_at"])
|
|
|
|
async def test_update_meeting(self):
|
|
m = await self._db.create_meeting(self._pid, "Old Title", "2026-03-01T10:00:00Z")
|
|
ok = await self._db.update_meeting(m["meeting_id"], title="New Title", duration_min=90)
|
|
self.assertTrue(ok)
|
|
updated = await self._db.get_meeting(m["meeting_id"])
|
|
self.assertEqual(updated["title"], "New Title")
|
|
self.assertEqual(updated["duration_min"], 90)
|
|
|
|
async def test_attendees_round_trip(self):
|
|
attendees = ["user@a.com", "user@b.com"]
|
|
m = await self._db.create_meeting(self._pid, "Team sync", "2026-03-03T14:00:00Z",
|
|
attendees=attendees)
|
|
fetched = await self._db.get_meeting(m["meeting_id"])
|
|
self.assertEqual(fetched["attendees"], attendees)
|
|
|
|
async def test_delete_meeting(self):
|
|
m = await self._db.create_meeting(self._pid, "Deletable", "2026-03-10T10:00:00Z")
|
|
ok = await self._db.delete_meeting(m["meeting_id"])
|
|
self.assertTrue(ok)
|
|
fetched = await self._db.get_meeting(m["meeting_id"])
|
|
self.assertIsNone(fetched)
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestDialogGraph(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Dialog Map graph (dialog_nodes + dialog_edges)."""
|
|
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
p = await self._db.create_project("GraphProject")
|
|
self._pid = p["project_id"]
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_upsert_dialog_node_creates(self):
|
|
node = await self._db.upsert_dialog_node(
|
|
self._pid, "task", "task_001", title="My task", summary="Do something"
|
|
)
|
|
self.assertIn("node_id", node)
|
|
self.assertEqual(node["node_type"], "task")
|
|
self.assertEqual(node["ref_id"], "task_001")
|
|
|
|
async def test_upsert_dialog_node_deduplicates(self):
|
|
n1 = await self._db.upsert_dialog_node(self._pid, "doc", "doc_001", title="First")
|
|
n2 = await self._db.upsert_dialog_node(self._pid, "doc", "doc_001", title="Updated")
|
|
# Same ref_id → same node_id
|
|
self.assertEqual(n1["node_id"], n2["node_id"])
|
|
# Title should be updated
|
|
self.assertEqual(n2["title"], "Updated")
|
|
|
|
async def test_create_dialog_edge(self):
|
|
n1 = await self._db.upsert_dialog_node(self._pid, "message", "msg_001")
|
|
n2 = await self._db.upsert_dialog_node(self._pid, "task", "task_002")
|
|
edge = await self._db.create_dialog_edge(
|
|
self._pid, n1["node_id"], n2["node_id"], "derives_task"
|
|
)
|
|
self.assertIn("edge_id", edge)
|
|
self.assertEqual(edge["edge_type"], "derives_task")
|
|
|
|
async def test_get_project_dialog_map(self):
|
|
n1 = await self._db.upsert_dialog_node(self._pid, "message", "msg_map_001", title="Hello")
|
|
n2 = await self._db.upsert_dialog_node(self._pid, "task", "task_map_001", title="Do it")
|
|
await self._db.create_dialog_edge(self._pid, n1["node_id"], n2["node_id"], "derives_task")
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
self.assertIn("nodes", graph)
|
|
self.assertIn("edges", graph)
|
|
self.assertGreaterEqual(graph["node_count"], 2)
|
|
self.assertGreaterEqual(graph["edge_count"], 1)
|
|
|
|
async def test_no_self_loop_edges(self):
|
|
n = await self._db.upsert_dialog_node(self._pid, "goal", "goal_001", title="Self loop test")
|
|
# Self-loop should silently fail (SQLite CHECK constraint)
|
|
edge = await self._db.create_dialog_edge(
|
|
self._pid, n["node_id"], n["node_id"], "references"
|
|
)
|
|
# Edge won't be in the graph (self-loop blocked)
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
self_loops = [e for e in graph["edges"] if e["from_node_id"] == e["to_node_id"]]
|
|
self.assertEqual(len(self_loops), 0)
|
|
|
|
async def test_entity_link_created(self):
|
|
link = await self._db.create_entity_link(
|
|
self._pid, "message", "msg_x", "task", "task_x", "derives_task"
|
|
)
|
|
self.assertIn("link_id", link)
|
|
self.assertEqual(link["link_type"], "derives_task")
|
|
|
|
async def test_doc_version_round_trip(self):
|
|
# Create a dummy document first
|
|
doc = await self._db.create_document(
|
|
self._pid, "f_ver", "v"*64, "text/plain", 100, "versioned.txt",
|
|
extracted_text="original content"
|
|
)
|
|
v = await self._db.save_doc_version(doc["doc_id"], "new content v2", author_id="test_user")
|
|
self.assertIn("version_id", v)
|
|
content = await self._db.get_doc_version_content(v["version_id"])
|
|
self.assertEqual(content, "new content v2")
|
|
versions = await self._db.list_doc_versions(doc["doc_id"])
|
|
self.assertGreaterEqual(len(versions), 1)
|
|
|
|
async def test_dialog_view_upsert(self):
|
|
view = await self._db.upsert_dialog_view(
|
|
self._pid, "default",
|
|
filters={"node_types": ["task", "doc"]},
|
|
layout={"zoom": 1.0, "pan": [0, 0]},
|
|
)
|
|
self.assertEqual(view["name"], "default")
|
|
self.assertIn("task", view["filters"].get("node_types", []))
|
|
# Upsert again (update)
|
|
view2 = await self._db.upsert_dialog_view(self._pid, "default", layout={"zoom": 2.0})
|
|
self.assertEqual(view2["layout"].get("zoom"), 2.0)
|
|
|
|
|
|
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
|
|
class TestTransactionalIntegrity(unittest.IsolatedAsyncioTestCase):
|
|
"""Graph Contract: every artifact creation is atomic with its dialog_node."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
self._pid = f"tx_proj_{uuid.uuid4().hex[:8]}"
|
|
await self._db.init_db()
|
|
await self._db.create_project("TX Test Project", project_id=self._pid)
|
|
|
|
async def asyncTearDown(self):
|
|
pass # Keep DB open across test classes (shared global connection)
|
|
|
|
async def test_create_task_creates_node_atomically(self):
|
|
"""create_task must produce task + dialog_node in one transaction."""
|
|
task = await self._db.create_task(
|
|
self._pid, "Atomic Task", description="desc", created_by="test"
|
|
)
|
|
self.assertIn("node_id", task, "create_task must return node_id")
|
|
self.assertIsNotNone(task["node_id"])
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
task_nodes = [n for n in graph["nodes"] if n["node_type"] == "task" and n["ref_id"] == task["task_id"]]
|
|
self.assertEqual(len(task_nodes), 1, "Task node must be in dialog map")
|
|
|
|
async def test_create_meeting_creates_node_atomically(self):
|
|
"""create_meeting must produce meeting + dialog_node atomically."""
|
|
meeting = await self._db.create_meeting(
|
|
self._pid, "Atomic Meeting", starts_at="2026-03-01T10:00:00Z", created_by="test"
|
|
)
|
|
self.assertIn("node_id", meeting)
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
m_nodes = [n for n in graph["nodes"] if n["node_type"] == "meeting" and n["ref_id"] == meeting["meeting_id"]]
|
|
self.assertEqual(len(m_nodes), 1, "Meeting node must be in dialog map")
|
|
|
|
async def test_create_task_with_source_msg_creates_edge(self):
|
|
"""create_task with source_msg_id must create derives_task edge."""
|
|
msg_id = f"msg_{uuid.uuid4().hex[:8]}"
|
|
task = await self._db.create_task(
|
|
self._pid, "Task from msg", source_msg_id=msg_id, created_by="test"
|
|
)
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
derives_edges = [
|
|
e for e in graph["edges"]
|
|
if e["edge_type"] == "derives_task" and e["to_node_id"] == task["node_id"]
|
|
]
|
|
self.assertGreaterEqual(len(derives_edges), 1, "Must have derives_task edge from message")
|
|
|
|
async def test_create_meeting_with_source_msg_creates_edge(self):
|
|
"""create_meeting with source_msg_id must create schedules_meeting edge."""
|
|
msg_id = f"msg_{uuid.uuid4().hex[:8]}"
|
|
meeting = await self._db.create_meeting(
|
|
self._pid, "Meeting from msg",
|
|
starts_at="2026-03-02T15:00:00Z",
|
|
source_msg_id=msg_id,
|
|
created_by="test",
|
|
)
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
sched_edges = [
|
|
e for e in graph["edges"]
|
|
if e["edge_type"] == "schedules_meeting" and e["to_node_id"] == meeting["node_id"]
|
|
]
|
|
self.assertGreaterEqual(len(sched_edges), 1, "Must have schedules_meeting edge from message")
|
|
|
|
|
|
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
|
|
class TestGraphIntegrity(unittest.IsolatedAsyncioTestCase):
|
|
"""Graph Contract: check_graph_integrity detects violations."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
self._pid = f"integrity_proj_{uuid.uuid4().hex[:8]}"
|
|
await self._db.init_db()
|
|
await self._db.create_project("Integrity Test", project_id=self._pid)
|
|
|
|
async def asyncTearDown(self):
|
|
pass # Keep DB open
|
|
|
|
async def test_clean_project_passes_integrity(self):
|
|
"""A freshly created project with proper artifacts should pass."""
|
|
await self._db.create_task(self._pid, "Clean task")
|
|
await self._db.create_meeting(self._pid, "Clean meeting", starts_at="2026-04-01T09:00:00Z")
|
|
result = await self._db.check_graph_integrity(self._pid)
|
|
self.assertTrue(result["ok"], f"Integrity must pass, violations: {result['violations']}")
|
|
self.assertEqual(result["violations"], [])
|
|
self.assertGreaterEqual(result["stats"]["node_count"], 2)
|
|
|
|
async def test_integrity_detects_dangling_task_node(self):
|
|
"""Manually inserted task node without matching task row should be detected."""
|
|
db = await self._db.get_db()
|
|
fake_task_id = f"fake_{uuid.uuid4().hex}"
|
|
node_id = str(uuid.uuid4())
|
|
now = "2026-01-01T00:00:00Z"
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,summary,props,created_by,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(node_id, self._pid, "task", fake_task_id, "Orphan", "", "{}", "test", now, now),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.check_graph_integrity(self._pid)
|
|
self.assertFalse(result["ok"], "Should detect dangling task node")
|
|
violation_types = [v["type"] for v in result["violations"]]
|
|
self.assertIn("dangling_task_nodes", violation_types)
|
|
|
|
async def test_no_self_loops_after_operations(self):
|
|
"""After normal CRUD operations, there must be no self-loop edges."""
|
|
task = await self._db.create_task(self._pid, "Loop check task")
|
|
meeting = await self._db.create_meeting(
|
|
self._pid, "Loop check meeting", starts_at="2026-05-01T08:00:00Z"
|
|
)
|
|
await self._db.create_dialog_edge(
|
|
self._pid, task["node_id"], meeting["node_id"], "relates_to"
|
|
)
|
|
result = await self._db.check_graph_integrity(self._pid)
|
|
self.assertGreaterEqual(result["stats"]["edge_count"], 1)
|
|
loop_violations = [v for v in result["violations"] if v["type"] == "self_loop_edges"]
|
|
self.assertEqual(loop_violations, [])
|
|
|
|
|
|
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
|
|
class TestEvidencePackEngine(unittest.IsolatedAsyncioTestCase):
|
|
"""Evidence Pack Engine: Supervisor run → node + tasks + edges atomically."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
self._pid = f"evidence_proj_{uuid.uuid4().hex[:8]}"
|
|
await self._db.init_db()
|
|
await self._db.create_project("Evidence Test", project_id=self._pid)
|
|
|
|
async def asyncTearDown(self):
|
|
pass # Keep DB open
|
|
|
|
async def test_evidence_pack_creates_agent_run_node(self):
|
|
"""create_evidence_pack must create an agent_run node."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
pack = await self._db.create_evidence_pack(
|
|
project_id=self._pid,
|
|
run_id=run_id,
|
|
graph_name="release_check",
|
|
result_data={"status": "completed", "summary": "All checks passed"},
|
|
)
|
|
self.assertTrue(pack["ok"])
|
|
self.assertIsNotNone(pack["node_id"])
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
run_nodes = [n for n in graph["nodes"] if n["node_type"] == "agent_run" and n["ref_id"] == run_id]
|
|
self.assertEqual(len(run_nodes), 1, "agent_run node must be in dialog map")
|
|
|
|
async def test_evidence_pack_creates_follow_up_tasks(self):
|
|
"""create_evidence_pack with follow_up_tasks must create tasks + produced_by edges."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
pack = await self._db.create_evidence_pack(
|
|
project_id=self._pid,
|
|
run_id=run_id,
|
|
graph_name="incident_triage",
|
|
result_data={
|
|
"status": "completed",
|
|
"follow_up_tasks": [
|
|
{"title": "Fix DB index", "priority": "high"},
|
|
{"title": "Update runbook", "priority": "normal"},
|
|
],
|
|
},
|
|
)
|
|
self.assertEqual(pack["tasks_created"], 2)
|
|
self.assertEqual(len(pack["task_ids"]), 2)
|
|
tasks = await self._db.list_tasks(self._pid)
|
|
task_titles = {t["title"] for t in tasks}
|
|
self.assertIn("Fix DB index", task_titles)
|
|
self.assertIn("Update runbook", task_titles)
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
produced_edges = [e for e in graph["edges"] if e["edge_type"] == "produced_by"]
|
|
self.assertEqual(len(produced_edges), 2, "Must have produced_by edges for each task")
|
|
|
|
async def test_evidence_pack_idempotent_on_rerun(self):
|
|
"""Re-recording same run_id must not duplicate nodes (ON CONFLICT DO UPDATE)."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
pack1 = await self._db.create_evidence_pack(
|
|
self._pid, run_id, "release_check",
|
|
{"status": "completed", "summary": "First run"}
|
|
)
|
|
pack2 = await self._db.create_evidence_pack(
|
|
self._pid, run_id, "release_check",
|
|
{"status": "completed", "summary": "Updated summary"}
|
|
)
|
|
self.assertEqual(pack1["node_id"], pack2["node_id"], "Node ID must be stable on re-run")
|
|
|
|
async def test_full_integrity_after_evidence_pack(self):
|
|
"""After creating an evidence pack, integrity check must still pass."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._db.create_evidence_pack(
|
|
self._pid, run_id, "postmortem_draft",
|
|
result_data={
|
|
"status": "completed",
|
|
"follow_up_tasks": [{"title": "Write postmortem", "priority": "urgent"}],
|
|
},
|
|
)
|
|
result = await self._db.check_graph_integrity(self._pid)
|
|
self.assertTrue(result["ok"], f"Integrity must pass after evidence pack: {result['violations']}")
|
|
|
|
|
|
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
|
|
class TestGraphHygiene(unittest.IsolatedAsyncioTestCase):
|
|
"""Graph Hygiene Engine: fingerprints, dedup, lifecycle, importance."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
self._pid = f"hygiene_proj_{uuid.uuid4().hex[:8]}"
|
|
await self._db.init_db()
|
|
await self._db.create_project("Hygiene Test", project_id=self._pid)
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def test_importance_baseline_scores(self):
|
|
"""Base importance scores must match contract values."""
|
|
self.assertAlmostEqual(self._db._compute_importance("decision"), 0.95, places=2)
|
|
self.assertAlmostEqual(self._db._compute_importance("goal"), 0.90, places=2)
|
|
self.assertAlmostEqual(self._db._compute_importance("task"), 0.70, places=2)
|
|
self.assertAlmostEqual(self._db._compute_importance("message"), 0.15, places=2)
|
|
# Done task halved
|
|
self.assertAlmostEqual(self._db._compute_importance("task", task_status="done"), 0.35, places=2)
|
|
|
|
async def test_importance_lifecycle_multiplier(self):
|
|
"""Archived and superseded nodes must have reduced importance."""
|
|
active = self._db._compute_importance("decision", lifecycle="active")
|
|
superseded = self._db._compute_importance("decision", lifecycle="superseded")
|
|
archived = self._db._compute_importance("decision", lifecycle="archived")
|
|
self.assertGreater(active, superseded)
|
|
self.assertGreater(superseded, archived)
|
|
|
|
async def test_importance_bump_factors(self):
|
|
"""High risk and pinned nodes get importance bumps."""
|
|
base = self._db._compute_importance("task")
|
|
with_risk = self._db._compute_importance("task", risk_level="high")
|
|
pinned = self._db._compute_importance("task", pinned=True)
|
|
self.assertGreater(with_risk, base)
|
|
self.assertGreater(pinned, base)
|
|
|
|
async def test_fingerprint_is_deterministic(self):
|
|
"""Same title+summary must always produce same fingerprint."""
|
|
fp1 = self._db._compute_fingerprint("task", "Fix DB index", "Critical bug")
|
|
fp2 = self._db._compute_fingerprint("task", "Fix DB index", "Critical bug")
|
|
fp3 = self._db._compute_fingerprint("task", " Fix DB Index ", "critical bug") # normalized
|
|
self.assertEqual(fp1, fp2)
|
|
self.assertEqual(fp1, fp3) # lowercase + strip normalization
|
|
|
|
async def test_fingerprint_differs_for_different_content(self):
|
|
"""Different titles must produce different fingerprints."""
|
|
fp1 = self._db._compute_fingerprint("task", "Fix index", "")
|
|
fp2 = self._db._compute_fingerprint("task", "Deploy service", "")
|
|
self.assertNotEqual(fp1, fp2)
|
|
|
|
async def test_hygiene_dry_run_detects_duplicates(self):
|
|
"""Dry-run must find duplicates without writing changes."""
|
|
# Create two tasks with same title (will get same fingerprint)
|
|
t1 = await self._db.create_task(self._pid, "Duplicate Decision", created_by="test")
|
|
# Manually insert second node with same fingerprint-equivalent title
|
|
db = await self._db.get_db()
|
|
n2_id = str(uuid.uuid4())
|
|
now = "2025-01-01T00:00:00Z"
|
|
fp = self._db._compute_fingerprint("task", "Duplicate Decision", "")
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,summary,props,fingerprint,lifecycle,importance,created_by,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
|
(n2_id, self._pid, "task", "fake_task_dup", "Duplicate Decision", "", "{}", fp, "active", 0.7, "test", now, now),
|
|
)
|
|
await db.commit()
|
|
|
|
result = await self._db.run_graph_hygiene(self._pid, dry_run=True)
|
|
self.assertTrue(result["ok"])
|
|
self.assertTrue(result["dry_run"])
|
|
# Must find the duplicate
|
|
self.assertGreater(result["stats"]["duplicates_found"], 0, "Should detect duplicates")
|
|
# Dry-run: no changes applied
|
|
archived_changes = [c for c in result["changes"] if c["action"] == "archive_duplicate"]
|
|
self.assertGreater(len(archived_changes), 0)
|
|
# Node lifecycle must NOT be changed (dry_run=True)
|
|
async with db.execute("SELECT lifecycle FROM dialog_nodes WHERE node_id=?", (n2_id,)) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertEqual(row[0], "active", "Dry-run must not modify lifecycle")
|
|
|
|
async def test_hygiene_apply_archives_duplicates(self):
|
|
"""Non-dry-run hygiene must archive duplicate nodes."""
|
|
# Create two nodes with identical fingerprint-equivalent titles
|
|
t1 = await self._db.create_task(self._pid, "Archive Dup Task", created_by="test")
|
|
db = await self._db.get_db()
|
|
n2_id = str(uuid.uuid4())
|
|
fp = self._db._compute_fingerprint("task", "Archive Dup Task", "")
|
|
# older created_at → will be archived (canonical = latest)
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,summary,props,fingerprint,lifecycle,importance,created_by,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
|
(n2_id, self._pid, "task", "fake_dup2", "Archive Dup Task", "", "{}", fp, "active", 0.7, "test", "2024-01-01T00:00:00Z", "2024-01-01T00:00:00Z"),
|
|
)
|
|
await db.commit()
|
|
|
|
result = await self._db.run_graph_hygiene(self._pid, dry_run=False)
|
|
self.assertFalse(result["dry_run"])
|
|
self.assertGreater(result["stats"]["archived"], 0, "Should archive duplicates")
|
|
# The older node must now be archived
|
|
async with db.execute("SELECT lifecycle FROM dialog_nodes WHERE node_id=?", (n2_id,)) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertIn(row[0], ("archived", "superseded"), "Duplicate must be archived/superseded")
|
|
|
|
async def test_hygiene_idempotent(self):
|
|
"""Running hygiene twice must not create new violations."""
|
|
await self._db.create_task(self._pid, "Idempotent Task")
|
|
r1 = await self._db.run_graph_hygiene(self._pid, dry_run=False)
|
|
r2 = await self._db.run_graph_hygiene(self._pid, dry_run=False)
|
|
# Second run should find no new duplicates to archive
|
|
self.assertEqual(r2["stats"]["archived"], 0, "Second hygiene run must be idempotent")
|
|
|
|
async def test_hygiene_recomputes_importance(self):
|
|
"""Hygiene must update importance for nodes without it set."""
|
|
# Create task node and manually clear its importance
|
|
task = await self._db.create_task(self._pid, "Importance Test Task")
|
|
db = await self._db.get_db()
|
|
await db.execute("UPDATE dialog_nodes SET importance=0.0 WHERE node_id=?", (task["node_id"],))
|
|
await db.commit()
|
|
|
|
result = await self._db.run_graph_hygiene(self._pid, dry_run=False)
|
|
importance_changes = [c for c in result["changes"] if c["action"] == "update_importance"]
|
|
self.assertGreater(len(importance_changes), 0, "Hygiene must recompute importance")
|
|
|
|
|
|
@unittest.skipUnless(_DB_AVAILABLE, "aiosqlite not available")
|
|
class TestSelfReflection(unittest.IsolatedAsyncioTestCase):
|
|
"""Self-Reflection Engine: supervisor run analysis."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
self._pid = f"reflect_proj_{uuid.uuid4().hex[:8]}"
|
|
await self._db.init_db()
|
|
await self._db.create_project("Reflection Test", project_id=self._pid)
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
|
|
async def _create_run(self, run_id: str, graph: str = "release_check") -> dict:
|
|
"""Helper: create evidence pack (agent_run node) first."""
|
|
return await self._db.create_evidence_pack(
|
|
self._pid, run_id, graph,
|
|
result_data={"status": "completed", "summary": f"Run {run_id[:8]}"},
|
|
)
|
|
|
|
async def test_reflection_creates_decision_node(self):
|
|
"""create_run_reflection must create a decision node."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._create_run(run_id)
|
|
result = await self._db.create_run_reflection(
|
|
self._pid, run_id,
|
|
evidence_data={
|
|
"summary": "Release checks passed",
|
|
"findings": [{"name": "tests", "status": "pass"}, {"name": "lint", "status": "pass"}],
|
|
},
|
|
)
|
|
self.assertTrue(result["ok"])
|
|
self.assertIsNotNone(result["node_id"])
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
reflection_nodes = [n for n in graph["nodes"] if n["node_type"] == "decision" and "reflection" in n.get("ref_id", "")]
|
|
self.assertGreaterEqual(len(reflection_nodes), 1)
|
|
|
|
async def test_reflection_links_to_agent_run(self):
|
|
"""Reflection must create reflects_on edge to agent_run node."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
pack = await self._create_run(run_id)
|
|
result = await self._db.create_run_reflection(self._pid, run_id, evidence_data={})
|
|
self.assertIsNotNone(result["edge_id"])
|
|
graph = await self._db.get_project_dialog_map(self._pid)
|
|
reflects_edges = [e for e in graph["edges"] if e["edge_type"] == "reflects_on"]
|
|
self.assertGreaterEqual(len(reflects_edges), 1)
|
|
|
|
async def test_reflection_scores_completeness(self):
|
|
"""Reflection must compute plan_completeness_score from findings."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._create_run(run_id)
|
|
result = await self._db.create_run_reflection(
|
|
self._pid, run_id,
|
|
evidence_data={
|
|
"findings": [
|
|
{"name": "a", "status": "pass"},
|
|
{"name": "b", "status": "pass"},
|
|
{"name": "c", "status": "fail"},
|
|
{"name": "d", "status": "pass"},
|
|
],
|
|
},
|
|
)
|
|
refl = result["reflection"]
|
|
# 3/4 passed = 0.75
|
|
self.assertAlmostEqual(refl["plan_completeness_score"], 0.75, places=2)
|
|
self.assertEqual(refl["confidence"], "medium")
|
|
self.assertEqual(len(refl["open_risks"]), 1)
|
|
|
|
async def test_reflection_creates_risk_tasks(self):
|
|
"""Failed findings must auto-create risk tasks."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._create_run(run_id)
|
|
result = await self._db.create_run_reflection(
|
|
self._pid, run_id,
|
|
evidence_data={
|
|
"findings": [
|
|
{"name": "DB migration", "status": "fail", "detail": "Migration pending"},
|
|
{"name": "Security scan", "status": "error", "message": "CVE-2024-001"},
|
|
],
|
|
},
|
|
)
|
|
self.assertGreater(result["risk_tasks_created"], 0)
|
|
tasks = await self._db.list_tasks(self._pid)
|
|
risk_titles = [t["title"] for t in tasks if "[RISK]" in t["title"]]
|
|
self.assertGreater(len(risk_titles), 0)
|
|
|
|
async def test_reflection_idempotent(self):
|
|
"""Reflecting on same run_id twice must not duplicate nodes."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._create_run(run_id)
|
|
r1 = await self._db.create_run_reflection(self._pid, run_id, evidence_data={})
|
|
r2 = await self._db.create_run_reflection(self._pid, run_id, evidence_data={})
|
|
self.assertEqual(r1["node_id"], r2["node_id"], "Reflection node must be stable")
|
|
|
|
async def test_full_integrity_after_reflection(self):
|
|
"""After reflection, graph integrity must still pass."""
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._create_run(run_id, "incident_triage")
|
|
await self._db.create_run_reflection(
|
|
self._pid, run_id,
|
|
evidence_data={
|
|
"findings": [{"name": "x", "status": "fail", "detail": "Timeout"}],
|
|
},
|
|
)
|
|
integrity = await self._db.check_graph_integrity(self._pid)
|
|
self.assertTrue(integrity["ok"], f"Integrity must pass after reflection: {integrity['violations']}")
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestStrategicCTOLayer(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for graph_snapshots and graph_signals (Strategic CTO Layer)."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
await self._db.init_db()
|
|
# Use unique project_id per test to avoid UNIQUE conflicts across test methods
|
|
self._pid = f"cto-{uuid.uuid4().hex[:10]}"
|
|
r = await self._db.create_project("CTO Test Project", project_id=self._pid)
|
|
self._pid = r["project_id"]
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
|
|
# ── Snapshot Tests ────────────────────────────────────────────────────────
|
|
|
|
async def test_snapshot_empty_project(self):
|
|
"""Snapshot on an empty project must return zero metrics without errors."""
|
|
result = await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
self.assertTrue(result["ok"])
|
|
m = result["metrics"]
|
|
self.assertEqual(m["tasks_created"], 0)
|
|
self.assertEqual(m["tasks_done"], 0)
|
|
self.assertEqual(m["wip"], 0)
|
|
self.assertEqual(m["risk_tasks_open"], 0)
|
|
self.assertEqual(m["agent_runs_total"], 0)
|
|
|
|
async def test_snapshot_with_tasks(self):
|
|
"""Snapshot correctly counts tasks_created and wip."""
|
|
await self._db.create_task(self._pid, "Task A", status="backlog")
|
|
await self._db.create_task(self._pid, "Task B", status="in_progress")
|
|
await self._db.create_task(self._pid, "Task C", status="done")
|
|
result = await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
m = result["metrics"]
|
|
self.assertEqual(m["tasks_created"], 3)
|
|
self.assertGreaterEqual(m["wip"], 1)
|
|
self.assertGreaterEqual(m["tasks_done"], 1)
|
|
|
|
async def test_snapshot_risk_tasks_count(self):
|
|
"""Snapshot correctly counts open [RISK] tasks."""
|
|
await self._db.create_task(self._pid, "[RISK] Critical vuln A", status="backlog", priority="high")
|
|
await self._db.create_task(self._pid, "[RISK] Critical vuln B", status="done", priority="high")
|
|
await self._db.create_task(self._pid, "Normal task", status="backlog")
|
|
result = await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
m = result["metrics"]
|
|
self.assertEqual(m["risk_tasks_open"], 1, "Only non-done [RISK] tasks should count")
|
|
|
|
async def test_snapshot_idempotent_same_day(self):
|
|
"""Two calls on same day produce single snapshot (ON CONFLICT DO UPDATE)."""
|
|
await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM graph_snapshots WHERE project_id=? AND window='7d'",
|
|
(self._pid,),
|
|
) as cur:
|
|
count = (await cur.fetchone())[0]
|
|
self.assertEqual(count, 1, "Should upsert not duplicate snapshot")
|
|
|
|
async def test_get_latest_snapshot(self):
|
|
"""get_latest_snapshot returns None before first compute, data after."""
|
|
snap = await self._db.get_latest_snapshot(self._pid, window="7d")
|
|
self.assertIsNone(snap)
|
|
await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
snap = await self._db.get_latest_snapshot(self._pid, window="7d")
|
|
self.assertIsNotNone(snap)
|
|
self.assertIn("metrics", snap)
|
|
self.assertIsInstance(snap["metrics"], dict)
|
|
|
|
async def test_snapshot_graph_density(self):
|
|
"""graph_density metric equals edges/nodes ratio."""
|
|
await self._db.compute_graph_snapshot(self._pid, window="7d")
|
|
snap = await self._db.get_latest_snapshot(self._pid, window="7d")
|
|
m = snap["metrics"]
|
|
if m["node_count"] > 0:
|
|
expected = round(m["edge_count"] / m["node_count"], 3)
|
|
self.assertAlmostEqual(m["graph_density"], expected, places=2)
|
|
|
|
# ── Signals Tests ─────────────────────────────────────────────────────────
|
|
|
|
async def test_signals_empty_project_no_signals(self):
|
|
"""Empty project generates no signals."""
|
|
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
self.assertTrue(result["ok"])
|
|
self.assertEqual(result["signals_generated"], 0)
|
|
self.assertEqual(result["signals_upserted"], 0)
|
|
|
|
async def test_signal_dry_run_does_not_persist(self):
|
|
"""dry_run=True computes signals but does not write to DB."""
|
|
# Create conditions for run_quality_regression rule
|
|
for i in range(3):
|
|
run_id = f"run_{uuid.uuid4().hex[:8]}"
|
|
await self._db.create_evidence_pack(
|
|
project_id=self._pid, run_id=run_id, graph_name="release_check",
|
|
result_data={"status": "completed", "findings": [{"name": "test", "status": "fail", "detail": "bad"}]},
|
|
)
|
|
await self._db.create_run_reflection(self._pid, run_id, evidence_data={
|
|
"findings": [{"name": "x", "status": "fail", "detail": "Critical fail"}] * 3,
|
|
})
|
|
dry = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=True)
|
|
live = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
# Dry run: no upserts
|
|
self.assertEqual(dry["signals_upserted"], 0)
|
|
# Dry and live should detect same signals_generated count
|
|
self.assertEqual(dry["signals_generated"], live["signals_generated"])
|
|
|
|
async def test_signal_idempotency(self):
|
|
"""Running signals twice with same conditions must not create new signals."""
|
|
# Create risk tasks for risk_cluster rule
|
|
for i in range(3):
|
|
await self._db.create_task(
|
|
self._pid, f"[RISK] Issue {i}", status="backlog", priority="high",
|
|
labels=["backend", "security"]
|
|
)
|
|
r1 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
r2 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
# Second run must not create new signals (may be skip_cooldown or refresh, but not new)
|
|
new_in_r2 = [d for d in r2["diff"] if d["action"] == "new"]
|
|
self.assertEqual(len(new_in_r2), 0, "Second run must not create new signals")
|
|
non_new = [d for d in r2["diff"] if d["action"] in ("skip_cooldown", "refresh", "exists", "cooldown")]
|
|
self.assertGreater(len(non_new), 0, "Should see non-new actions on second run")
|
|
|
|
async def test_signal_risk_cluster_rule(self):
|
|
"""risk_cluster signal fires when 3+ [RISK] tasks share a label."""
|
|
for i in range(4):
|
|
await self._db.create_task(
|
|
self._pid, f"[RISK] DB problem {i}", status="backlog", priority="high",
|
|
labels=["database"]
|
|
)
|
|
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=True)
|
|
types = [d["signal_type"] for d in result["diff"]]
|
|
self.assertIn("risk_cluster", types, "risk_cluster signal must fire for 4 tasks with shared label")
|
|
|
|
async def test_signal_stale_goal(self):
|
|
"""stale_goal signal fires for goals not updated in 14 days."""
|
|
import datetime
|
|
db = await self._db.get_db()
|
|
old_date = (datetime.datetime.utcnow() - datetime.timedelta(days=20)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
node_id = str(uuid.uuid4())
|
|
await db.execute(
|
|
"""INSERT INTO dialog_nodes(node_id, project_id, node_type, ref_id, title, lifecycle, importance, created_at, updated_at)
|
|
VALUES(?,?,?,?,?,?,?,?,?)""",
|
|
(node_id, self._pid, "goal", node_id, "Old Stale Goal", "active", 0.9, old_date, old_date),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=True)
|
|
types = [d["signal_type"] for d in result["diff"]]
|
|
self.assertIn("stale_goal", types, "stale_goal must fire for goal not updated in 20 days")
|
|
|
|
async def test_signal_ack_changes_status(self):
|
|
"""ack action changes signal status to 'ack'."""
|
|
# Create a signal manually
|
|
db = await self._db.get_db()
|
|
sig_id = str(uuid.uuid4())
|
|
now = self._db._now()
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, self._pid, "stale_goal", "medium", "Test Signal", "", "{}", "open", "fp123", now, now),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.update_signal_status(sig_id, "ack")
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["status"], "ack")
|
|
|
|
async def test_signal_evidence_node_ids_valid(self):
|
|
"""risk_cluster signal evidence contains valid task IDs."""
|
|
task_ids = []
|
|
for i in range(3):
|
|
t = await self._db.create_task(
|
|
self._pid, f"[RISK] infra problem {i}", status="backlog", priority="high",
|
|
labels=["infra"]
|
|
)
|
|
task_ids.append(t["task_id"])
|
|
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
# Load saved signals
|
|
signals = await self._db.get_graph_signals(self._pid, status="open")
|
|
cluster = [s for s in signals if s["signal_type"] == "risk_cluster"]
|
|
self.assertTrue(len(cluster) > 0, "risk_cluster signal must exist")
|
|
ev_ids = cluster[0]["evidence"].get("task_ids", [])
|
|
for eid in ev_ids[:3]:
|
|
self.assertIn(eid, task_ids, f"Signal evidence must reference valid task IDs: {eid}")
|
|
|
|
async def test_get_signals_by_status(self):
|
|
"""get_graph_signals filters correctly by status."""
|
|
db = await self._db.get_db()
|
|
now = self._db._now()
|
|
for status, fp in [("open", "fp1"), ("ack", "fp2"), ("dismissed", "fp3")]:
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
|
|
(str(uuid.uuid4()), self._pid, "stale_goal", "medium", f"Sig {status}", "", "{}", status, fp, now, now),
|
|
)
|
|
await db.commit()
|
|
open_sigs = await self._db.get_graph_signals(self._pid, status="open")
|
|
self.assertTrue(all(s["status"] == "open" for s in open_sigs))
|
|
all_sigs = await self._db.get_graph_signals(self._pid, status="all")
|
|
self.assertGreaterEqual(len(all_sigs), 3)
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestOpsGraphBridging(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for upsert_ops_run_node (Ops Graph Bridging)."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
await self._db.init_db()
|
|
self._pid = f"ops-{uuid.uuid4().hex[:10]}"
|
|
r = await self._db.create_project("Ops Test", project_id=self._pid)
|
|
self._pid = r["project_id"]
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
|
|
async def test_ops_run_node_created(self):
|
|
"""upsert_ops_run_node creates dialog_node with node_type=ops_run."""
|
|
run_id = f"ops-{uuid.uuid4().hex[:8]}"
|
|
result = await self._db.upsert_ops_run_node(
|
|
project_id=self._pid,
|
|
ops_run_id=run_id,
|
|
action_id="smoke_gateway",
|
|
node_id="NODA1",
|
|
status="ok",
|
|
elapsed_ms=250,
|
|
)
|
|
self.assertIn("node_id", result)
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT node_type, title FROM dialog_nodes WHERE node_id=?", (result["node_id"],)
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertIsNotNone(row)
|
|
self.assertEqual(row[0], "ops_run")
|
|
self.assertIn("smoke_gateway", row[1])
|
|
|
|
async def test_ops_run_node_idempotent(self):
|
|
"""Calling upsert_ops_run_node twice with same ops_run_id updates, not duplicates."""
|
|
run_id = f"ops-{uuid.uuid4().hex[:8]}"
|
|
r1 = await self._db.upsert_ops_run_node(self._pid, run_id, "drift_check", "NODA1", "ok")
|
|
r2 = await self._db.upsert_ops_run_node(self._pid, run_id, "drift_check", "NODA1", "failed")
|
|
self.assertEqual(r1["node_id"], r2["node_id"], "Same run_id must return same node_id")
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM dialog_nodes WHERE project_id=? AND node_type='ops_run' AND ref_id=?",
|
|
(self._pid, run_id),
|
|
) as cur:
|
|
count = (await cur.fetchone())[0]
|
|
self.assertEqual(count, 1, "No duplicate nodes on upsert")
|
|
|
|
async def test_ops_run_failed_higher_importance(self):
|
|
"""Failed ops_run nodes have higher importance than successful ones."""
|
|
ok_id = f"ops-ok-{uuid.uuid4().hex[:6]}"
|
|
fail_id = f"ops-fail-{uuid.uuid4().hex[:6]}"
|
|
r_ok = await self._db.upsert_ops_run_node(self._pid, ok_id, "smoke_all", "NODA1", "ok")
|
|
r_fail = await self._db.upsert_ops_run_node(self._pid, fail_id, "smoke_all", "NODA1", "failed")
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT importance FROM dialog_nodes WHERE node_id=?", (r_ok["node_id"],)
|
|
) as cur:
|
|
imp_ok = (await cur.fetchone())[0]
|
|
async with db.execute(
|
|
"SELECT importance FROM dialog_nodes WHERE node_id=?", (r_fail["node_id"],)
|
|
) as cur:
|
|
imp_fail = (await cur.fetchone())[0]
|
|
self.assertGreater(imp_fail, imp_ok, "Failed ops_run must have higher importance")
|
|
|
|
async def test_ops_run_links_to_source_agent_run(self):
|
|
"""ops_run node gets a produced_by edge from source supervisor run."""
|
|
# Create a source agent_run node
|
|
src_run_id = f"run-{uuid.uuid4().hex[:8]}"
|
|
await self._db.create_evidence_pack(
|
|
self._pid, src_run_id, "release_check",
|
|
result_data={"status": "completed"}
|
|
)
|
|
ops_id = f"ops-{uuid.uuid4().hex[:8]}"
|
|
result = await self._db.upsert_ops_run_node(
|
|
self._pid, ops_id, "smoke_gateway", "NODA1", "ok",
|
|
source_run_id=src_run_id,
|
|
)
|
|
self.assertIsNotNone(result["edge_id"], "Edge must be created when source_run_id is given")
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT edge_type FROM dialog_edges WHERE edge_id=?", (result["edge_id"],)
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertIsNotNone(row)
|
|
self.assertEqual(row[0], "produced_by")
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestMitigationPlanner(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for create_mitigation_plan (Mitigation Planner)."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
await self._db.init_db()
|
|
self._pid = f"mit-{uuid.uuid4().hex[:10]}"
|
|
r = await self._db.create_project("Mit Test", project_id=self._pid)
|
|
self._pid = r["project_id"]
|
|
# Create a test signal
|
|
db = await self._db.get_db()
|
|
self._sig_id = str(uuid.uuid4())
|
|
now = self._db._now()
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?,?)",
|
|
(self._sig_id, self._pid, "release_blocker", "critical",
|
|
"Test Release Blocker", "Test summary", '{"blocker_count": 2}',
|
|
"open", f"fp-{uuid.uuid4().hex[:8]}", now, now),
|
|
)
|
|
await db.commit()
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
|
|
async def test_mitigation_creates_plan_node(self):
|
|
"""create_mitigation_plan creates a decision node for the plan."""
|
|
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
|
|
self.assertTrue(result["ok"])
|
|
self.assertIn("plan_node_id", result)
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT node_type, title FROM dialog_nodes WHERE node_id=?",
|
|
(result["plan_node_id"],),
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertIsNotNone(row)
|
|
self.assertEqual(row[0], "decision")
|
|
self.assertIn("Mitigation Plan", row[1])
|
|
|
|
async def test_mitigation_creates_tasks_from_templates(self):
|
|
"""Mitigation plan creates tasks matching release_blocker templates."""
|
|
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
|
|
self.assertGreater(result["task_count"], 0)
|
|
self.assertEqual(len(result["task_ids"]), result["task_count"])
|
|
# Verify tasks exist in DB
|
|
db = await self._db.get_db()
|
|
for tid in result["task_ids"]:
|
|
async with db.execute("SELECT title FROM tasks WHERE task_id=?", (tid,)) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertIsNotNone(row, f"Task {tid} must exist in DB")
|
|
self.assertIn("[Mitigation]", row[0])
|
|
|
|
async def test_mitigation_task_count_by_signal_type(self):
|
|
"""Each signal_type has the expected number of mitigation templates."""
|
|
expected_counts = {
|
|
"release_blocker": 4,
|
|
"ops_instability": 3,
|
|
"stale_goal": 3,
|
|
"risk_cluster": 4,
|
|
"run_quality_regression": 3,
|
|
}
|
|
db = await self._db.get_db()
|
|
now = self._db._now()
|
|
for sig_type, expected in expected_counts.items():
|
|
sid = str(uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?,?)",
|
|
(sid, self._pid, sig_type, "high", f"Test {sig_type}", "", "{}",
|
|
"open", f"fp-{sid[:8]}", now, now),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.create_mitigation_plan(self._pid, sid)
|
|
self.assertEqual(result["task_count"], expected,
|
|
f"{sig_type} should have {expected} tasks, got {result['task_count']}")
|
|
|
|
async def test_mitigation_creates_plan_to_task_edges(self):
|
|
"""Each mitigation task has a derives_task edge from the plan node."""
|
|
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
|
|
db = await self._db.get_db()
|
|
plan_nid = result["plan_node_id"]
|
|
# Get task node_ids from dialog_nodes
|
|
task_node_ids = []
|
|
for tid in result["task_ids"]:
|
|
async with db.execute(
|
|
"SELECT node_id FROM dialog_nodes WHERE project_id=? AND node_type='task' AND ref_id=?",
|
|
(self._pid, tid),
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
if row:
|
|
task_node_ids.append(row[0])
|
|
# Check derives_task edges from plan_node
|
|
for tnid in task_node_ids:
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM dialog_edges WHERE project_id=? AND from_node_id=? AND to_node_id=? AND edge_type='derives_task'",
|
|
(self._pid, plan_nid, tnid),
|
|
) as cur:
|
|
count = (await cur.fetchone())[0]
|
|
self.assertEqual(count, 1, f"Missing derives_task edge for task node {tnid}")
|
|
|
|
async def test_mitigation_updates_signal_evidence(self):
|
|
"""After mitigation, signal.evidence contains plan_node_id and mitigation_task_ids."""
|
|
result = await self._db.create_mitigation_plan(self._pid, self._sig_id)
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT evidence FROM graph_signals WHERE id=?", (self._sig_id,)) as cur:
|
|
row = await cur.fetchone()
|
|
import json as _json
|
|
ev = _json.loads(row[0])
|
|
self.assertIn("plan_node_id", ev)
|
|
self.assertEqual(ev["plan_node_id"], result["plan_node_id"])
|
|
self.assertIn("mitigation_task_ids", ev)
|
|
|
|
async def test_mitigation_idempotent(self):
|
|
"""Running mitigation twice does not duplicate plan node."""
|
|
r1 = await self._db.create_mitigation_plan(self._pid, self._sig_id)
|
|
r2 = await self._db.create_mitigation_plan(self._pid, self._sig_id)
|
|
self.assertEqual(r1["plan_node_id"], r2["plan_node_id"], "Plan node must be stable")
|
|
|
|
async def test_mitigation_invalid_signal_raises(self):
|
|
"""create_mitigation_plan raises ValueError for unknown signal."""
|
|
with self.assertRaises(ValueError):
|
|
await self._db.create_mitigation_plan(self._pid, "nonexistent-signal-id")
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestSignalLifecycle(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for signal merge/reopen/cooldown and auto-resolve."""
|
|
|
|
async def asyncSetUp(self):
|
|
self._db = _db_module
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
await self._db.init_db()
|
|
self._pid = f"slc-{uuid.uuid4().hex[:10]}"
|
|
r = await self._db.create_project("SLC Test", project_id=self._pid)
|
|
self._pid = r["project_id"]
|
|
|
|
async def asyncTearDown(self):
|
|
await self._db.close_db()
|
|
self._db._db_conn = None
|
|
|
|
async def _make_signal(self, sig_type="stale_goal", severity="medium", status="open", fp=None):
|
|
"""Helper: insert a signal directly."""
|
|
db = await self._db.get_db()
|
|
sid = str(uuid.uuid4())
|
|
now = self._db._now()
|
|
_fp = fp or f"fp-{uuid.uuid4().hex[:8]}"
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,fingerprint,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?,?)",
|
|
(sid, self._pid, sig_type, severity, f"Test {sig_type}", "",
|
|
'{"cooldown_hours": 24}', status, _fp, now, now),
|
|
)
|
|
await db.commit()
|
|
return sid, _fp
|
|
|
|
# ── Cooldown / Reopen ─────────────────────────────────────────────────────
|
|
|
|
async def test_signal_new_creates_correctly(self):
|
|
"""First-time signal is created with last_triggered_at in evidence."""
|
|
for i in range(3):
|
|
await self._db.create_task(
|
|
self._pid, f"[RISK] Cluster task {i}", status="backlog", priority="high",
|
|
labels=["infra-cluster"]
|
|
)
|
|
result = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
diff_new = [d for d in result["diff"] if d["action"] == "new"]
|
|
self.assertGreater(len(diff_new), 0)
|
|
# Verify evidence has last_triggered_at
|
|
sigs = await self._db.get_graph_signals(self._pid)
|
|
for s in sigs:
|
|
self.assertIn("last_triggered_at", s["evidence"])
|
|
self.assertIn("cooldown_hours", s["evidence"])
|
|
|
|
async def test_cooldown_prevents_duplicate(self):
|
|
"""Second recompute within cooldown skips already-active signal."""
|
|
for i in range(3):
|
|
await self._db.create_task(
|
|
self._pid, f"[RISK] X {i}", status="backlog", priority="high", labels=["comp-x"]
|
|
)
|
|
r1 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
r2 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
# All r2 diff entries should be skip_cooldown (not new)
|
|
r2_new = [d for d in r2["diff"] if d["action"] == "new"]
|
|
self.assertEqual(len(r2_new), 0, "Second recompute in cooldown must not create new signals")
|
|
skip = [d for d in r2["diff"] if d["action"] == "skip_cooldown"]
|
|
self.assertGreater(len(skip), 0, "skip_cooldown must appear in diff")
|
|
|
|
async def test_resolved_signal_reopens_after_cooldown(self):
|
|
"""A resolved signal with expired cooldown gets reopened on next recompute."""
|
|
import datetime
|
|
db = await self._db.get_db()
|
|
# Create stale goal to trigger stale_goal rule
|
|
node_id = str(uuid.uuid4())
|
|
old_ts = (datetime.datetime.utcnow() - datetime.timedelta(days=20)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,lifecycle,importance,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?)",
|
|
(node_id, self._pid, "goal", node_id, "Stale Test Goal", "active", 0.9, old_ts, old_ts),
|
|
)
|
|
await db.commit()
|
|
# First recompute: creates the signal
|
|
r1 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
new_sigs = [d for d in r1["diff"] if d["action"] == "new" and d["signal_type"] == "stale_goal"]
|
|
self.assertGreater(len(new_sigs), 0, "stale_goal signal must be created")
|
|
# Find and mark it resolved, with old updated_at (cooldown expired)
|
|
sigs = await self._db.get_graph_signals(self._pid)
|
|
stale_sig = next((s for s in sigs if s["signal_type"] == "stale_goal"), None)
|
|
self.assertIsNotNone(stale_sig)
|
|
old_updated = (datetime.datetime.utcnow() - datetime.timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
await db.execute(
|
|
"UPDATE graph_signals SET status='resolved', updated_at=?, evidence=? WHERE id=?",
|
|
(old_updated, '{"cooldown_hours": 1, "last_triggered_at": "' + old_updated + '"}', stale_sig["id"]),
|
|
)
|
|
await db.commit()
|
|
# Second recompute: should reopen (cooldown of 1h expired)
|
|
r2 = await self._db.recompute_graph_signals(self._pid, window="7d", dry_run=False)
|
|
reopen_entries = [d for d in r2["diff"] if d.get("action") == "reopen"]
|
|
self.assertGreater(len(reopen_entries), 0, "Resolved signal must be reopened after cooldown expires")
|
|
|
|
# ── Auto-resolve ──────────────────────────────────────────────────────────
|
|
|
|
async def test_auto_resolve_dry_run_does_not_change_status(self):
|
|
"""auto_resolve dry_run computes but does not change signal status."""
|
|
sid, _ = await self._make_signal("release_blocker", status="open")
|
|
result = await self._db.auto_resolve_signals(self._pid, dry_run=True)
|
|
self.assertTrue(result["ok"])
|
|
self.assertEqual(result["resolved"], 0)
|
|
# Status unchanged
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT status FROM graph_signals WHERE id=?", (sid,)) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertEqual(row[0], "open")
|
|
|
|
async def test_auto_resolve_release_blocker_when_no_risks(self):
|
|
"""release_blocker resolves when no open [RISK] tasks remain."""
|
|
sid, _ = await self._make_signal("release_blocker", status="open")
|
|
# No [RISK] tasks → criteria met
|
|
result = await self._db.auto_resolve_signals(self._pid, dry_run=False)
|
|
resolved = [d for d in result["diff"] if d.get("action") == "resolved" and d["signal_type"] == "release_blocker"]
|
|
self.assertGreater(len(resolved), 0, "release_blocker must resolve when no [RISK] tasks")
|
|
# Verify status in DB
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT status, evidence FROM graph_signals WHERE id=?", (sid,)) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertEqual(row[0], "resolved")
|
|
import json as _j
|
|
ev = _j.loads(row[1])
|
|
self.assertIn("resolved_at", ev)
|
|
self.assertIn("resolution_reason", ev)
|
|
|
|
async def test_auto_resolve_release_blocker_stays_open_with_risks(self):
|
|
"""release_blocker stays open when [RISK] tasks exist."""
|
|
await self._db.create_task(self._pid, "[RISK] Critical blocker", status="backlog", priority="high")
|
|
sid, _ = await self._make_signal("release_blocker", status="open")
|
|
result = await self._db.auto_resolve_signals(self._pid, dry_run=False)
|
|
still_open = [d for d in result["diff"] if d.get("action") == "still_open"]
|
|
self.assertGreater(len(still_open), 0)
|
|
|
|
async def test_auto_resolve_run_quality_regression_resolves(self):
|
|
"""run_quality_regression resolves when avg completeness >= 75%."""
|
|
sid, _ = await self._make_signal("run_quality_regression", status="open")
|
|
# Insert 3 high-quality reflections
|
|
db = await self._db.get_db()
|
|
now = self._db._now()
|
|
for i in range(3):
|
|
import json as _j
|
|
props = _j.dumps({"plan_completeness_score": 0.85, "confidence": "high"})
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,ref_id,title,props,lifecycle,importance,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(str(uuid.uuid4()), self._pid, "decision", f"refl-{i}-{uuid.uuid4().hex[:6]}",
|
|
f"Reflection: run{i}", props, "active", 0.7, now, now),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.auto_resolve_signals(self._pid, dry_run=False)
|
|
resolved = [d for d in result["diff"] if d.get("action") == "resolved" and d["signal_type"] == "run_quality_regression"]
|
|
self.assertGreater(len(resolved), 0, "run_quality_regression must resolve with good quality")
|
|
|
|
async def test_auto_resolve_returns_correct_counts(self):
|
|
"""auto_resolve result has accurate checked/resolved counts."""
|
|
sid1, _ = await self._make_signal("release_blocker", status="open")
|
|
sid2, _ = await self._make_signal("stale_goal", status="ack")
|
|
result = await self._db.auto_resolve_signals(self._pid, dry_run=True)
|
|
self.assertEqual(result["checked"], 2)
|
|
self.assertEqual(len(result["diff"]), 2)
|
|
|
|
|
|
class TestPlaybooks(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Playbooks v1 (Graph Learning Layer)."""
|
|
|
|
async def asyncSetUp(self):
|
|
import uuid as _uuid
|
|
self._pid = f"pb-{_uuid.uuid4().hex[:10]}"
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
_db_module._DB_PATH = ":memory:"
|
|
await _db_module.get_db()
|
|
await _db_module.init_db()
|
|
await _db_module.create_project(name="PB Test", project_id=self._pid)
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
|
|
async def _make_mitigated_signal(self, signal_type: str = "risk_cluster", label: str = "auth"):
|
|
"""Create a signal and run mitigation on it."""
|
|
import uuid as _uuid
|
|
sig_id = str(_uuid.uuid4())
|
|
now = "2026-02-26T12:00:00"
|
|
evidence = {"label": label, "count": 3}
|
|
db = await self._db.get_db()
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, self._pid, signal_type, "high", f"Test {signal_type}", "",
|
|
json.dumps(evidence), "open", now, now),
|
|
)
|
|
await db.commit()
|
|
# Create mitigation
|
|
result = await self._db.create_mitigation_plan(self._pid, sig_id)
|
|
return sig_id, result
|
|
|
|
def test_context_key_risk_cluster(self):
|
|
"""compute_context_key returns label:<label> for risk_cluster."""
|
|
ck = self._db._compute_context_key("risk_cluster", {"label": "auth"})
|
|
self.assertEqual(ck, "label:auth")
|
|
|
|
def test_context_key_ops_instability(self):
|
|
"""compute_context_key returns ops_action:<action> for ops_instability."""
|
|
ck = self._db._compute_context_key("ops_instability", {"failed_ops_actions": ["smoke_gateway"]})
|
|
self.assertEqual(ck, "ops_action:smoke_gateway")
|
|
|
|
def test_context_key_release_blocker(self):
|
|
"""compute_context_key returns global for release_blocker."""
|
|
ck = self._db._compute_context_key("release_blocker", {})
|
|
self.assertEqual(ck, "global")
|
|
|
|
async def test_upsert_playbook_creates_new(self):
|
|
"""upsert_playbook_from_signal creates a new playbook."""
|
|
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "auth")
|
|
result = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
self.assertTrue(result["ok"])
|
|
self.assertTrue(result["created"])
|
|
self.assertIn("playbook_id", result)
|
|
self.assertEqual(result["signal_type"], "risk_cluster")
|
|
self.assertEqual(result["context_key"], "label:auth")
|
|
|
|
async def test_upsert_playbook_idempotent(self):
|
|
"""Promoting the same signal twice does not create duplicate playbook or examples."""
|
|
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "payments")
|
|
r1 = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
r2 = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
# Both should return the same playbook_id
|
|
self.assertEqual(r1["playbook_id"], r2["playbook_id"])
|
|
# Should not create a new playbook
|
|
self.assertTrue(r1["created"])
|
|
self.assertFalse(r2.get("created", True) if not r2["created"] else False or True)
|
|
# uses should not be double-counted
|
|
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
|
|
self.assertEqual(len(pbs), 1)
|
|
|
|
async def test_upsert_playbook_requires_mitigation(self):
|
|
"""upsert_playbook_from_signal raises ValueError if signal has no plan_node_id."""
|
|
import uuid as _uuid
|
|
sig_id = str(_uuid.uuid4())
|
|
now = "2026-02-26T12:00:00"
|
|
db = await self._db.get_db()
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, self._pid, "stale_goal", "medium", "Stale", "", "{}", "open", now, now),
|
|
)
|
|
await db.commit()
|
|
with self.assertRaises(ValueError):
|
|
await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
|
|
async def test_list_playbooks_ordered(self):
|
|
"""list_playbooks returns playbooks ordered by success_rate desc."""
|
|
sig1, _ = await self._make_mitigated_signal("risk_cluster", "auth")
|
|
sig2, _ = await self._make_mitigated_signal("risk_cluster", "payments")
|
|
# Promote both with different success rates (manual update after create)
|
|
await self._db.upsert_playbook_from_signal(self._pid, sig1, resolved=True, time_to_resolve_h=5.0)
|
|
await self._db.upsert_playbook_from_signal(self._pid, sig2, resolved=False)
|
|
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
|
|
self.assertGreaterEqual(len(pbs), 1)
|
|
# Verify order: higher success_rate first (resolved one should be first)
|
|
if len(pbs) >= 2:
|
|
self.assertGreaterEqual(pbs[0]["success_rate"], pbs[1]["success_rate"])
|
|
|
|
async def test_apply_playbook_creates_tasks(self):
|
|
"""apply_playbook_to_signal creates tasks and links them to plan node."""
|
|
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "db")
|
|
pb_res = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
playbook_id = pb_res["playbook_id"]
|
|
|
|
# Create a fresh signal to apply the playbook to
|
|
import uuid as _uuid
|
|
new_sig_id = str(_uuid.uuid4())
|
|
db = await self._db.get_db()
|
|
now = "2026-02-26T13:00:00"
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(new_sig_id, self._pid, "risk_cluster", "high", "New cluster", "", '{"label":"db"}', "open", now, now),
|
|
)
|
|
await db.commit()
|
|
|
|
result = await self._db.apply_playbook_to_signal(self._pid, new_sig_id, playbook_id)
|
|
self.assertTrue(result["ok"])
|
|
self.assertIn("plan_node_id", result)
|
|
self.assertIn("task_ids", result)
|
|
# Signal should be ACKed
|
|
async with db.execute("SELECT status FROM graph_signals WHERE id=?", (new_sig_id,)) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertEqual(row[0], "ack")
|
|
|
|
async def test_apply_playbook_increments_uses(self):
|
|
"""apply_playbook_to_signal increments uses count."""
|
|
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "cache")
|
|
pb_res = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
playbook_id = pb_res["playbook_id"]
|
|
|
|
# Apply to a new signal
|
|
import uuid as _uuid
|
|
new_sig_id = str(_uuid.uuid4())
|
|
db = await self._db.get_db()
|
|
now = "2026-02-26T14:00:00"
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(new_sig_id, self._pid, "risk_cluster", "high", "Cache cluster", "", '{"label":"cache"}', "open", now, now),
|
|
)
|
|
await db.commit()
|
|
await self._db.apply_playbook_to_signal(self._pid, new_sig_id, playbook_id)
|
|
|
|
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
|
|
pb = next((p for p in pbs if p["playbook_id"] == playbook_id), None)
|
|
self.assertIsNotNone(pb)
|
|
self.assertGreaterEqual(pb["uses"], 1)
|
|
|
|
async def test_update_playbook_stats_on_resolve(self):
|
|
"""update_playbook_stats_on_resolve increments successes for a NEW signal (not yet in examples)."""
|
|
# First create a playbook from one signal
|
|
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "api")
|
|
pb_res = await self._db.upsert_playbook_from_signal(self._pid, sig_id)
|
|
playbook_id = pb_res["playbook_id"]
|
|
|
|
# Create a second mitigated signal with the same signal_type/label (same playbook)
|
|
sig_id2, _ = await self._make_mitigated_signal("risk_cluster", "api")
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT evidence FROM graph_signals WHERE id=?", (sig_id2,)) as cur:
|
|
row = await cur.fetchone()
|
|
evidence = json.loads(row[0]) if row else {}
|
|
|
|
# update_playbook_stats_on_resolve with the NEW signal_id (not yet in examples)
|
|
pb_id_updated = await self._db.update_playbook_stats_on_resolve(
|
|
project_id=self._pid,
|
|
signal_id=sig_id2,
|
|
signal_type="risk_cluster",
|
|
evidence=evidence,
|
|
resolved_at="2026-02-26T15:00:00",
|
|
created_at="2026-02-26T12:00:00",
|
|
)
|
|
self.assertEqual(pb_id_updated, playbook_id)
|
|
|
|
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
|
|
pb = next((p for p in pbs if p["playbook_id"] == playbook_id), None)
|
|
self.assertIsNotNone(pb)
|
|
self.assertGreater(pb["successes"], 0)
|
|
|
|
async def test_update_playbook_stats_idempotent(self):
|
|
"""update_playbook_stats_on_resolve does not double-count the same signal."""
|
|
sig_id, _ = await self._make_mitigated_signal("risk_cluster", "storage")
|
|
await self._db.upsert_playbook_from_signal(self._pid, sig_id, resolved=True, time_to_resolve_h=2.0)
|
|
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT evidence FROM graph_signals WHERE id=?", (sig_id,)) as cur:
|
|
row = await cur.fetchone()
|
|
evidence = json.loads(row[0]) if row else {}
|
|
|
|
# Call twice with same signal_id
|
|
await self._db.update_playbook_stats_on_resolve(
|
|
self._pid, sig_id, "risk_cluster", evidence,
|
|
"2026-02-26T15:00:00", "2026-02-26T12:00:00",
|
|
)
|
|
await self._db.update_playbook_stats_on_resolve(
|
|
self._pid, sig_id, "risk_cluster", evidence,
|
|
"2026-02-26T15:00:00", "2026-02-26T12:00:00",
|
|
)
|
|
pbs = await self._db.list_playbooks(self._pid, "risk_cluster")
|
|
pb = next((p for p in pbs if p.get("context_key") == "label:storage"), None)
|
|
# successes should not be inflated beyond what was set during upsert
|
|
if pb:
|
|
self.assertLessEqual(pb["successes"], pb["uses"])
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestLessons(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Lessons v1 (weekly Lessons Learned)."""
|
|
|
|
async def asyncSetUp(self):
|
|
import uuid as _uuid
|
|
self._pid = f"ls-{_uuid.uuid4().hex[:10]}"
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
_db_module._DB_PATH = ":memory:"
|
|
await _db_module.get_db()
|
|
await _db_module.init_db()
|
|
await _db_module.create_project(name="Lesson Test", project_id=self._pid)
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
|
|
def test_compute_lesson_bucket(self):
|
|
"""compute_lesson_bucket returns valid ISO week string."""
|
|
bucket = self._db.compute_lesson_bucket()
|
|
import re
|
|
self.assertRegex(bucket, r"^\d{4}-W\d{2}$")
|
|
|
|
def test_compute_lesson_bucket_specific_date(self):
|
|
"""compute_lesson_bucket with a known date returns correct week."""
|
|
bucket = self._db.compute_lesson_bucket("2026-02-28T00:00:00")
|
|
self.assertTrue(bucket.startswith("2026-W"), f"Expected 2026-Www, got {bucket}")
|
|
|
|
async def test_dry_run_does_not_write(self):
|
|
"""generate with dry_run=True returns report but writes nothing to DB."""
|
|
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=True)
|
|
self.assertTrue(result["dry_run"])
|
|
self.assertIn("markdown", result)
|
|
self.assertIn("date_bucket", result)
|
|
# Verify nothing written
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT COUNT(*) FROM lessons WHERE project_id=?", (self._pid,)) as cur:
|
|
count = (await cur.fetchone())[0]
|
|
self.assertEqual(count, 0)
|
|
|
|
async def test_dry_run_returns_planned_tasks(self):
|
|
"""dry_run result includes planned_improvement_tasks list."""
|
|
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=True)
|
|
self.assertIn("planned_improvement_tasks", result)
|
|
self.assertIsInstance(result["planned_improvement_tasks"], list)
|
|
self.assertLessEqual(len(result["planned_improvement_tasks"]), 3)
|
|
|
|
async def test_apply_creates_lesson(self):
|
|
"""apply (dry_run=False) creates lessons row and decision node."""
|
|
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
self.assertFalse(result["dry_run"])
|
|
self.assertIn("lesson_id", result)
|
|
self.assertIn("lesson_node_id", result)
|
|
self.assertIn("date_bucket", result)
|
|
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT COUNT(*) FROM lessons WHERE project_id=?", (self._pid,)) as cur:
|
|
count = (await cur.fetchone())[0]
|
|
self.assertEqual(count, 1)
|
|
|
|
async def test_apply_creates_decision_node(self):
|
|
"""apply creates a decision dialog_node with lesson title."""
|
|
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
db = await self._db.get_db()
|
|
async with db.execute(
|
|
"SELECT title, node_type FROM dialog_nodes WHERE node_id=?",
|
|
(result["lesson_node_id"],),
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertIsNotNone(row)
|
|
self.assertEqual(row[1], "decision")
|
|
self.assertIn("Lesson:", row[0])
|
|
|
|
async def test_apply_idempotent_same_bucket(self):
|
|
"""Applying twice for the same bucket yields the same lesson_id."""
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
self.assertEqual(r1["lesson_id"], r2["lesson_id"])
|
|
self.assertEqual(r1["lesson_node_id"], r2["lesson_node_id"])
|
|
|
|
db = await self._db.get_db()
|
|
async with db.execute("SELECT COUNT(*) FROM lessons WHERE project_id=?", (self._pid,)) as cur:
|
|
count = (await cur.fetchone())[0]
|
|
self.assertEqual(count, 1)
|
|
|
|
async def test_improvement_tasks_capped_at_3(self):
|
|
"""Improvement tasks are capped at 3 max."""
|
|
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
self.assertLessEqual(len(result["created_task_ids"]), 3)
|
|
|
|
async def test_improvement_tasks_deduped(self):
|
|
"""Calling apply twice does not duplicate improvement tasks."""
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
# Task IDs should be the same set (same lesson, same fingerprints)
|
|
self.assertEqual(set(r1["created_task_ids"]), set(r2["created_task_ids"]))
|
|
|
|
async def test_triggers_with_signals(self):
|
|
"""Improvement triggers fire when signals exist in window."""
|
|
import uuid as _uuid
|
|
now = "2026-02-28T12:00:00"
|
|
db = await self._db.get_db()
|
|
# Add ops_instability signals (high/critical) ≥2 to trigger trigger 1
|
|
for i in range(3):
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(str(_uuid.uuid4()), self._pid, "ops_instability", "high",
|
|
f"OpsInstab {i}", "", "{}", "open", now, now),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=True)
|
|
tasks = result["planned_improvement_tasks"]
|
|
# Should have at least 1 task for ops resilience
|
|
ops_tasks = [t for t in tasks if "ops" in t["title"].lower() or "ops" in " ".join(t["labels"])]
|
|
self.assertGreater(len(ops_tasks), 0)
|
|
|
|
async def test_list_lessons(self):
|
|
"""list_lessons returns lessons in date_bucket desc order."""
|
|
await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
lessons = await self._db.list_lessons(self._pid)
|
|
self.assertGreaterEqual(len(lessons), 1)
|
|
self.assertIn("lesson_id", lessons[0])
|
|
self.assertIn("date_bucket", lessons[0])
|
|
self.assertIn("metrics", lessons[0])
|
|
|
|
async def test_get_lesson_detail(self):
|
|
"""get_lesson_detail returns markdown and evidence ids."""
|
|
apply_result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
lesson_id = apply_result["lesson_id"]
|
|
detail = await self._db.get_lesson_detail(self._pid, lesson_id)
|
|
self.assertIsNotNone(detail)
|
|
self.assertEqual(detail["lesson_id"], lesson_id)
|
|
self.assertIn("markdown", detail)
|
|
self.assertIn("linked_signal_ids", detail)
|
|
self.assertIn("improvement_task_ids", detail)
|
|
|
|
async def test_evidence_signal_links(self):
|
|
"""Lesson links to signals via entity_links summarizes."""
|
|
import uuid as _uuid
|
|
now = "2026-02-28T12:00:00"
|
|
db = await self._db.get_db()
|
|
sig_id = str(_uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at) "
|
|
"VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, self._pid, "ops_instability", "medium", "Test sig", "", "{}", "open", now, now),
|
|
)
|
|
await db.commit()
|
|
apply_result = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
detail = await self._db.get_lesson_detail(self._pid, apply_result["lesson_id"])
|
|
# Signal should be in linked_signal_ids
|
|
self.assertIn(sig_id, detail["linked_signal_ids"])
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestLessonsDelta(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Delta Intelligence and Impact Tracking (Lessons v2)."""
|
|
|
|
async def asyncSetUp(self):
|
|
import uuid as _uuid
|
|
self._pid = f"ld-{_uuid.uuid4().hex[:10]}"
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
_db_module._DB_PATH = ":memory:"
|
|
await _db_module.get_db()
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
# Create project
|
|
pid = self._pid
|
|
db = await _db_module.get_db()
|
|
import time as _time
|
|
now_ts = _db_module._now()
|
|
await db.execute(
|
|
"INSERT INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
|
|
(pid, f"Delta Test {pid}", now_ts, now_ts),
|
|
)
|
|
await db.commit()
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
|
|
async def test_delta_schema_present_after_two_lessons(self):
|
|
"""Second lesson has delta block with expected keys."""
|
|
# Generate first lesson (W01 sim)
|
|
import datetime
|
|
# Patch compute_lesson_bucket to return fixed buckets
|
|
orig = _db_module.compute_lesson_bucket
|
|
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W01"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
self.assertFalse(r1.get("dry_run"))
|
|
|
|
# Generate second lesson
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W02"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
# Check metrics_json of second lesson
|
|
detail2 = await self._db.get_lesson_detail(self._pid, r2["lesson_id"])
|
|
metrics = detail2["metrics"]
|
|
self.assertIn("current", metrics)
|
|
self.assertIn("delta", metrics)
|
|
self.assertIn("trend_flags", metrics)
|
|
# previous should reference W01
|
|
prev = metrics.get("previous")
|
|
self.assertIsNotNone(prev)
|
|
self.assertEqual(prev.get("date_bucket"), "2026-W01")
|
|
|
|
async def test_trend_flags_correct_for_good_directions(self):
|
|
"""trend_flags structure is present and risk_regressing=True when risk_open increased."""
|
|
import json
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W10"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
# Manually set risk_tasks_open to 0 in first lesson current block
|
|
db = await _db_module.get_db()
|
|
m1 = json.loads((await (await db.execute("SELECT metrics_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],))).fetchone())[0])
|
|
m1.setdefault("current", {})["risk_open"] = 0
|
|
m1["risk_tasks_open"] = 0
|
|
await db.execute("UPDATE lessons SET metrics_json=? WHERE lesson_id=?", (json.dumps(m1), r1["lesson_id"]))
|
|
await db.commit()
|
|
|
|
# Manually inject a [RISK] task so current risk_open > 0
|
|
import uuid as _uuid_m
|
|
t_id = str(_uuid_m.uuid4())
|
|
n_id = str(_uuid_m.uuid4())
|
|
now_ts = _db_module._now()
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,title,summary,props,created_by,created_at,updated_at,lifecycle,importance,ref_id)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?)",
|
|
(n_id, self._pid, "task", "[RISK] Critical auth issue", "", "{}", "test", now_ts, now_ts, "active", 0.9, t_id),
|
|
)
|
|
await db.execute(
|
|
"INSERT INTO tasks(task_id,project_id,title,status,priority,description,labels,created_by,created_at,updated_at)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(t_id, self._pid, "[RISK] Critical auth issue", "backlog", "high", "", '["risk"]', "test", now_ts, now_ts),
|
|
)
|
|
await db.commit()
|
|
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W11"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
detail2 = await self._db.get_lesson_detail(self._pid, r2["lesson_id"])
|
|
metrics = detail2["metrics"]
|
|
# trend_flags must have all expected keys
|
|
tf = metrics.get("trend_flags", {})
|
|
for key in ("risk_improving", "risk_regressing", "quality_improving", "quality_regressing"):
|
|
self.assertIn(key, tf, f"Missing trend_flag key: {key}")
|
|
# Previous risk_open=0, current risk_open>=1 → risk_regressing=True
|
|
prev = metrics.get("previous", {})
|
|
curr = metrics.get("current", {})
|
|
if (prev.get("risk_open") or 0) == 0 and (curr.get("risk_open") or 0) >= 1:
|
|
self.assertTrue(tf.get("risk_regressing"), "risk_regressing should be True when risk increased")
|
|
|
|
async def test_delta_first_lesson_has_no_previous(self):
|
|
"""First lesson has delta=None or trend=new for all metrics."""
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W50"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
detail = await self._db.get_lesson_detail(self._pid, r1["lesson_id"])
|
|
metrics = detail["metrics"]
|
|
prev = metrics.get("previous")
|
|
self.assertIsNone(prev, "First lesson should have no previous block")
|
|
# Delta should exist but trend=new for all
|
|
delta = metrics.get("delta", {})
|
|
if delta:
|
|
for k, v in delta.items():
|
|
self.assertEqual(v.get("trend"), "new", f"Expected trend=new for {k} in first lesson")
|
|
|
|
async def test_impact_evaluated_when_next_bucket_exists(self):
|
|
"""evaluate_lesson_impact writes impact_score to prior lesson."""
|
|
import json
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W20"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W21"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
# After W21 lesson, W20 lesson should have impact evaluated
|
|
db = await _db_module.get_db()
|
|
row = await (await db.execute(
|
|
"SELECT impact_score, impact_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
|
|
)).fetchone()
|
|
self.assertIsNotNone(row)
|
|
# impact_json should reference evaluated_bucket W21
|
|
ij = json.loads(row[1] or "{}")
|
|
self.assertEqual(ij.get("evaluated_bucket"), "2026-W21")
|
|
|
|
async def test_impact_idempotent(self):
|
|
"""Calling evaluate_lesson_impact twice does not change result."""
|
|
import json
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W30"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W31"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
# First evaluation (auto-called after upsert)
|
|
db = await _db_module.get_db()
|
|
row1 = await (await db.execute(
|
|
"SELECT impact_score FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
|
|
)).fetchone()
|
|
score1 = row1[0] if row1 else None
|
|
|
|
# Second call with force=False (should skip)
|
|
result2 = await self._db.evaluate_lesson_impact(self._pid, "7d", "2026-W31", force=False)
|
|
self.assertIsNone(result2, "Second call without force should return None (idempotent)")
|
|
|
|
row2 = await (await db.execute(
|
|
"SELECT impact_score FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
|
|
)).fetchone()
|
|
self.assertEqual(score1, row2[0] if row2 else None)
|
|
|
|
async def test_dry_run_no_write_for_impact_recompute(self):
|
|
"""evaluate_lesson_impact with force=True writes, then a dry_run check via API endpoint logic."""
|
|
import json
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W40"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W41"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
db = await _db_module.get_db()
|
|
row = await (await db.execute(
|
|
"SELECT impact_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
|
|
)).fetchone()
|
|
ij = json.loads((row[0] if row else None) or "{}")
|
|
eval_at_1 = ij.get("evaluated_at")
|
|
|
|
# Force recompute
|
|
res = await self._db.evaluate_lesson_impact(self._pid, "7d", "2026-W41", force=True)
|
|
self.assertIsNotNone(res)
|
|
row2 = await (await db.execute(
|
|
"SELECT impact_json FROM lessons WHERE lesson_id=?", (r1["lesson_id"],)
|
|
)).fetchone()
|
|
ij2 = json.loads((row2[0] if row2 else None) or "{}")
|
|
eval_at_2 = ij2.get("evaluated_at")
|
|
# updated_at should be new (force re-ran)
|
|
self.assertIsNotNone(eval_at_2)
|
|
|
|
async def test_detail_includes_delta_and_impact_fields(self):
|
|
"""get_lesson_detail returns delta, trend_flags, current, previous, impact."""
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W60"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W61"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
|
|
detail2 = await self._db.get_lesson_detail(self._pid, r2["lesson_id"])
|
|
# Must have these keys
|
|
for key in ("delta", "trend_flags", "current", "previous", "impact"):
|
|
self.assertIn(key, detail2, f"Missing key: {key}")
|
|
|
|
|
|
@unittest.skipUnless(_AIOSQLITE_AVAILABLE and _DB_AVAILABLE, "aiosqlite or db not available")
|
|
class TestLevelFive(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Level 5: Streaks, Portfolio Drift Signals, Impact Attribution."""
|
|
|
|
async def asyncSetUp(self):
|
|
import uuid as _uuid
|
|
self._pid = f"lv5-{_uuid.uuid4().hex[:10]}"
|
|
self._pid2 = f"lv5b-{_uuid.uuid4().hex[:10]}"
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
_db_module._DB_PATH = ":memory:"
|
|
await _db_module.get_db()
|
|
await _db_module.init_db()
|
|
self._db = _db_module
|
|
db = await _db_module.get_db()
|
|
now_ts = _db_module._now()
|
|
for pid in (self._pid, self._pid2):
|
|
await db.execute(
|
|
"INSERT INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
|
|
(pid, f"L5 Test {pid}", now_ts, now_ts),
|
|
)
|
|
await db.commit()
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
_db_module._db_conn = None
|
|
|
|
async def _make_lesson_with_flags(self, pid, bucket, risk_reg=False, quality_reg=False, ops_reg=False,
|
|
risk_imp=False, quality_imp=False, ops_imp=False):
|
|
"""Insert a synthetic lesson with given trend_flags."""
|
|
import json
|
|
import uuid as _uuid
|
|
lesson_id = str(_uuid.uuid4())
|
|
tf = {
|
|
"risk_regressing": risk_reg, "risk_improving": risk_imp,
|
|
"quality_regressing": quality_reg, "quality_improving": quality_imp,
|
|
"ops_regressing": ops_reg, "ops_improving": ops_imp,
|
|
"delivery_improving": False, "delivery_regressing": False,
|
|
}
|
|
metrics = {
|
|
"kind": "lesson_metrics", "project_id": pid, "window": "7d", "date_bucket": bucket,
|
|
"current": {"risk_open": 5 if risk_reg else 0, "quality_avg": 0.5, "ops_failure_rate": 0.3},
|
|
"previous": None, "delta": {}, "trend_flags": tf,
|
|
"wip": 3, "tasks_done": 5, "risk_tasks_open": 2, "run_quality_avg": 0.6,
|
|
"ops_failure_rate": 0.2, "agent_runs_in_window": 4, "signals_in_window": 2,
|
|
"improvement_tasks_count": 0,
|
|
}
|
|
fp = _db_module._lesson_fingerprint(pid, "7d", bucket)
|
|
now_ts = _db_module._now()
|
|
db = await _db_module.get_db()
|
|
# Create a dummy decision node for lesson_node_id
|
|
node_id = str(_uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO dialog_nodes(node_id,project_id,node_type,title,summary,props,ref_id,created_by,created_at,updated_at,lifecycle,importance)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?)",
|
|
(node_id, pid, "decision", f"Lesson: {bucket}", "", json.dumps({"markdown": "# Test"}),
|
|
node_id, "test", now_ts, now_ts, "active", 0.9),
|
|
)
|
|
await db.execute(
|
|
"INSERT INTO lessons(lesson_id,project_id,window,date_bucket,fingerprint,status,lesson_node_id,doc_version_id,metrics_json,impact_score,impact_json,created_at,updated_at)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)",
|
|
(lesson_id, pid, "7d", bucket, fp, "published", node_id, "", json.dumps(metrics), 0.0, "{}", now_ts, now_ts),
|
|
)
|
|
await db.commit()
|
|
return lesson_id
|
|
|
|
async def test_streak_len2_regressing(self):
|
|
"""Two consecutive regressing lessons → streak len=2."""
|
|
await self._make_lesson_with_flags(self._pid, "2026-W01", risk_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W02", risk_reg=True)
|
|
streaks = await self._db.compute_lesson_streaks(self._pid)
|
|
self.assertEqual(streaks["risk"]["dir"], "regressing")
|
|
self.assertEqual(streaks["risk"]["len"], 2)
|
|
self.assertEqual(streaks["risk"]["since_bucket"], "2026-W01")
|
|
|
|
async def test_streak_len3_improving(self):
|
|
"""Three consecutive improving lessons → streak len=3."""
|
|
await self._make_lesson_with_flags(self._pid, "2026-W10", ops_imp=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W11", ops_imp=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W12", ops_imp=True)
|
|
streaks = await self._db.compute_lesson_streaks(self._pid)
|
|
self.assertEqual(streaks["ops"]["dir"], "improving")
|
|
self.assertEqual(streaks["ops"]["len"], 3)
|
|
|
|
async def test_streak_flat_when_mixed(self):
|
|
"""Mixed directions → streak len=0 or len=1 (not a streak)."""
|
|
await self._make_lesson_with_flags(self._pid, "2026-W20", quality_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W21", quality_imp=True)
|
|
streaks = await self._db.compute_lesson_streaks(self._pid)
|
|
self.assertLess(streaks["quality"]["len"], 2, "Mixed directions should not produce len>=2 streak")
|
|
|
|
async def test_portfolio_drift_signal_created(self):
|
|
"""portfolio_risk_drift signal is created when a project has risk streak>=2."""
|
|
await self._make_lesson_with_flags(self._pid, "2026-W30", risk_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W31", risk_reg=True)
|
|
result = await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
|
|
self.assertFalse(result["dry_run"])
|
|
types = [c["signal_type"] for c in result["changes"]]
|
|
self.assertIn("portfolio_risk_drift", types)
|
|
|
|
async def test_portfolio_drift_signal_idempotent(self):
|
|
"""Running recompute twice does not create duplicate signals."""
|
|
import json
|
|
await self._make_lesson_with_flags(self._pid, "2026-W40", risk_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W41", risk_reg=True)
|
|
await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
|
|
await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
|
|
db = await _db_module.get_db()
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM graph_signals WHERE project_id=? AND signal_type=?",
|
|
("portfolio", "portfolio_risk_drift"),
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertEqual(row[0], 1, "Should have exactly 1 drift signal, not duplicates")
|
|
|
|
async def test_portfolio_drift_severity_by_len(self):
|
|
"""len=2 → high, len=3 → critical."""
|
|
# len=3 streak for ops
|
|
await self._make_lesson_with_flags(self._pid, "2026-W50", ops_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W51", ops_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W52", ops_reg=True)
|
|
result = await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
|
|
ops_change = next((c for c in result["changes"] if c["signal_type"] == "portfolio_ops_drift"), None)
|
|
self.assertIsNotNone(ops_change)
|
|
self.assertEqual(ops_change["severity"], "critical")
|
|
|
|
async def test_portfolio_drift_dry_run_no_write(self):
|
|
"""dry_run=True should not write signals to DB."""
|
|
await self._make_lesson_with_flags(self._pid, "2026-W60", risk_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W61", risk_reg=True)
|
|
result = await self._db.recompute_portfolio_signals(window="7d", dry_run=True)
|
|
self.assertTrue(result["dry_run"])
|
|
db = await _db_module.get_db()
|
|
async with db.execute(
|
|
"SELECT COUNT(*) FROM graph_signals WHERE project_id='portfolio'",
|
|
) as cur:
|
|
row = await cur.fetchone()
|
|
self.assertEqual(row[0], 0, "dry_run should not write any signals")
|
|
|
|
async def test_attribution_unknown_when_no_tasks(self):
|
|
"""Impact with no improvement tasks → attribution unknown."""
|
|
orig = _db_module.compute_lesson_bucket
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W70"
|
|
r1 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = lambda now=None: "2026-W71"
|
|
r2 = await self._db.upsert_lesson(self._pid, window="7d", dry_run=False)
|
|
_db_module.compute_lesson_bucket = orig
|
|
detail = await self._db.get_lesson_detail(self._pid, r1["lesson_id"])
|
|
imp = detail.get("impact") or {}
|
|
attr = imp.get("attribution") or {}
|
|
self.assertEqual(attr.get("level"), "unknown", "No tasks → attribution unknown")
|
|
|
|
async def test_list_portfolio_signals(self):
|
|
"""list_portfolio_signals returns signals with project_id='portfolio'."""
|
|
await self._make_lesson_with_flags(self._pid, "2026-W80", quality_reg=True)
|
|
await self._make_lesson_with_flags(self._pid, "2026-W81", quality_reg=True)
|
|
await self._db.recompute_portfolio_signals(window="7d", dry_run=False)
|
|
sigs = await self._db.list_portfolio_signals(status="open")
|
|
self.assertGreater(len(sigs), 0, "Should have at least one portfolio drift signal")
|
|
for s in sigs:
|
|
self.assertEqual(s["project_id"], "portfolio")
|
|
|
|
|
|
class TestLevel6(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Level 6: Governance Gates + Auto-plan/Auto-run."""
|
|
|
|
async def asyncSetUp(self):
|
|
import uuid as _uuid
|
|
self._pid = f"lv6-{_uuid.uuid4().hex[:10]}"
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
db = await _db_module.get_db()
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO projects(project_id, name, created_at, updated_at) VALUES(?,?,?,?)",
|
|
(self._pid, "LV6 Test", _db_module._now(), _db_module._now()),
|
|
)
|
|
await db.commit()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Gate: BLOCK_RELEASE ──────────────────────────────────────────────────
|
|
async def test_block_release_gate_triggers(self):
|
|
"""BLOCK_RELEASE fires when open high/critical [RISK] tasks exist."""
|
|
db = await self._db.get_db()
|
|
task_id = "t-risk-" + self._pid[:8]
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO tasks(task_id, project_id, title, status, priority, labels, created_at, updated_at)"
|
|
" VALUES(?,?,?,?,?,?,?,?)",
|
|
(task_id, self._pid, "[RISK] critical auth bypass", "backlog", "urgent", "risk", self._db._now(), self._db._now()),
|
|
)
|
|
# dialog_node required for join
|
|
node_id = "dn-risk-" + self._pid[:8]
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO dialog_nodes(node_id,project_id,node_type,title,ref_id,created_by,created_at,updated_at,lifecycle,importance)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(node_id, self._pid, "task", "[RISK] critical auth bypass", task_id, "test", self._db._now(), self._db._now(), "active", 0.7),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
gate = next((g for g in result["gates"] if g["name"] == "BLOCK_RELEASE"), None)
|
|
self.assertIsNotNone(gate)
|
|
self.assertEqual(gate["status"], "BLOCKED", "Should be BLOCKED with open [RISK] task")
|
|
|
|
async def test_block_release_gate_passes_when_no_risks(self):
|
|
"""BLOCK_RELEASE passes when no open high/critical [RISK] tasks."""
|
|
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
gate = next((g for g in result["gates"] if g["name"] == "BLOCK_RELEASE"), None)
|
|
self.assertIsNotNone(gate)
|
|
self.assertEqual(gate["status"], "PASS")
|
|
|
|
# ── Gate: DEGRADE_MODE ───────────────────────────────────────────────────
|
|
async def test_degrade_mode_passes_when_no_snapshot(self):
|
|
"""DEGRADE_MODE should PASS (no data = conservative) when no snapshot."""
|
|
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
gate = next((g for g in result["gates"] if g["name"] == "DEGRADE_MODE"), None)
|
|
self.assertIsNotNone(gate)
|
|
self.assertEqual(gate["status"], "PASS", "No snapshot → conservative PASS")
|
|
|
|
async def test_degrade_mode_triggers_from_snapshot(self):
|
|
"""DEGRADE_MODE fires when ops_failure_rate > 0.33 and runs >= 3."""
|
|
snap_id = "snap-lv6-" + self._pid[:8]
|
|
metrics = {"ops_failure_rate": 0.5, "agent_runs_in_window": 5}
|
|
db = await self._db.get_db()
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO graph_snapshots(id, project_id, scope, window, date_bucket, metrics, created_at)"
|
|
" VALUES(?,?,?,?,?,?,?)",
|
|
(snap_id, self._pid, "project", "7d", "2026-W99", __import__("json").dumps(metrics), self._db._now()),
|
|
)
|
|
await db.commit()
|
|
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
gate = next((g for g in result["gates"] if g["name"] == "DEGRADE_MODE"), None)
|
|
self.assertIsNotNone(gate)
|
|
self.assertEqual(gate["status"], "DEGRADED")
|
|
|
|
# ── Gate: PROMPT_FREEZE ──────────────────────────────────────────────────
|
|
async def test_prompt_freeze_passes_without_streaks(self):
|
|
"""PROMPT_FREEZE passes when no quality regression streak."""
|
|
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
gate = next((g for g in result["gates"] if g["name"] == "PROMPT_FREEZE"), None)
|
|
self.assertIsNotNone(gate)
|
|
self.assertEqual(gate["status"], "PASS")
|
|
|
|
# ── Gate persistence ─────────────────────────────────────────────────────
|
|
async def test_evaluate_gates_idempotent_apply(self):
|
|
"""Applying gates twice for same bucket reuses existing node (upsert)."""
|
|
r1 = await self._db.evaluate_governance_gates(self._pid, dry_run=False)
|
|
node_id_1 = r1.get("gate_node_id")
|
|
r2 = await self._db.evaluate_governance_gates(self._pid, dry_run=False)
|
|
node_id_2 = r2.get("gate_node_id")
|
|
self.assertEqual(node_id_1, node_id_2, "Same bucket → same node_id (upsert)")
|
|
|
|
# ── Auto-plan ────────────────────────────────────────────────────────────
|
|
async def test_auto_plan_creates_planned_entries(self):
|
|
"""auto_plan_drift_signal creates planned run entries in evidence."""
|
|
db = await self._db.get_db()
|
|
# Ensure portfolio project exists
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
|
|
("portfolio", "portfolio", self._db._now(), self._db._now()),
|
|
)
|
|
import json, hashlib
|
|
ev = {
|
|
"bucket": "2026-W90",
|
|
"metric": "ops",
|
|
"projects": [{"project_id": self._pid, "streak": {"dir": "regressing", "len": 2}}],
|
|
}
|
|
sig_id = "sig-lv6-" + self._pid[:8]
|
|
fp = hashlib.sha256(f"portfolio_ops_drift|2026-W90|{self._pid}".encode()).hexdigest()[:24]
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, "portfolio", "portfolio_ops_drift", "high", "Ops drift W90", "", json.dumps(ev), "open", self._db._now(), self._db._now()),
|
|
)
|
|
await db.commit()
|
|
|
|
result = await self._db.auto_plan_drift_signal(sig_id)
|
|
self.assertNotIn("error", result)
|
|
self.assertGreater(result["total_planned"], 0, "Should plan at least one run")
|
|
|
|
# Verify idempotency: planning again adds 0 new entries
|
|
result2 = await self._db.auto_plan_drift_signal(sig_id)
|
|
self.assertEqual(result2["total_planned"], 0, "Idempotent: no new entries on second call")
|
|
|
|
async def test_auto_run_dry_run_no_writes(self):
|
|
"""auto_run_drift_signal with dry_run=True does not fire real runs."""
|
|
db = await self._db.get_db()
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
|
|
("portfolio", "portfolio", self._db._now(), self._db._now()),
|
|
)
|
|
import json
|
|
ev = {
|
|
"bucket": "2026-W91",
|
|
"metric": "risk",
|
|
"projects": [{"project_id": self._pid, "streak": {"dir": "regressing", "len": 3}}],
|
|
}
|
|
sig_id = "sig-lv6b-" + self._pid[:8]
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, "portfolio", "portfolio_risk_drift", "critical", "Risk drift W91", "", json.dumps(ev), "open", self._db._now(), self._db._now()),
|
|
)
|
|
await db.commit()
|
|
|
|
result = await self._db.auto_run_drift_signal(sig_id, dry_run=True, supervisor_url="http://localhost:0")
|
|
self.assertTrue(result["dry_run"])
|
|
# All fired entries should be dry_run=True (planned) not real runs
|
|
for f in result.get("fired", []):
|
|
self.assertTrue(f.get("dry_run", True), "dry_run=True → no real fire")
|
|
|
|
# ── Gates summary ────────────────────────────────────────────────────────
|
|
async def test_gates_result_has_summary(self):
|
|
"""Gate evaluation returns summary with all_clear and blocked list."""
|
|
result = await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
self.assertIn("summary", result)
|
|
self.assertIn("all_clear", result["summary"])
|
|
self.assertIn("blocked", result["summary"])
|
|
|
|
|
|
class TestLevel7(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Level 7: Governance Audit Trail (append_governance_event, list, hooks)."""
|
|
|
|
async def asyncSetUp(self):
|
|
import uuid as _uuid
|
|
self._pid = f"lv7-{_uuid.uuid4().hex[:10]}"
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
db = await _db_module.get_db()
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
|
|
(self._pid, "LV7 Test", _db_module._now(), _db_module._now()),
|
|
)
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO projects(project_id,name,created_at,updated_at) VALUES(?,?,?,?)",
|
|
("portfolio", "portfolio", _db_module._now(), _db_module._now()),
|
|
)
|
|
await db.commit()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
|
|
# ── append_governance_event ──────────────────────────────────────────────
|
|
async def test_append_event_basic(self):
|
|
"""append_governance_event returns an event_id and is stored."""
|
|
ev_id = await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="user",
|
|
event_type="test_event",
|
|
idempotency_key=f"test|basic|{self._pid}",
|
|
severity="info", status="ok",
|
|
)
|
|
self.assertIsNotNone(ev_id)
|
|
items = await self._db.list_governance_events(project_id=self._pid)
|
|
self.assertTrue(any(i["event_type"] == "test_event" for i in items))
|
|
|
|
async def test_append_event_idempotent(self):
|
|
"""Same idempotency_key does not create duplicate event."""
|
|
ikey = f"test|idem|{self._pid}"
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="user",
|
|
event_type="idem_event", idempotency_key=ikey,
|
|
)
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="user",
|
|
event_type="idem_event", idempotency_key=ikey,
|
|
)
|
|
items = await self._db.list_governance_events(project_id=self._pid, event_type="idem_event")
|
|
self.assertEqual(len(items), 1, "Should have exactly 1 event despite 2 inserts")
|
|
|
|
# ── list_governance_events filters ──────────────────────────────────────
|
|
async def test_list_filter_by_event_type(self):
|
|
"""list_governance_events filters by event_type."""
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="system",
|
|
event_type="type_a", idempotency_key=f"fa|{self._pid}",
|
|
)
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="system",
|
|
event_type="type_b", idempotency_key=f"fb|{self._pid}",
|
|
)
|
|
items_a = await self._db.list_governance_events(project_id=self._pid, event_type="type_a")
|
|
items_b = await self._db.list_governance_events(project_id=self._pid, event_type="type_b")
|
|
self.assertEqual(len(items_a), 1)
|
|
self.assertEqual(len(items_b), 1)
|
|
|
|
async def test_list_filter_by_status(self):
|
|
"""list_governance_events filters by status."""
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="system",
|
|
event_type="err_event", idempotency_key=f"ferr|{self._pid}",
|
|
status="error",
|
|
)
|
|
errors = await self._db.list_governance_events(project_id=self._pid, status="error")
|
|
self.assertTrue(any(i["status"] == "error" for i in errors))
|
|
|
|
async def test_list_portfolio_scope(self):
|
|
"""Events written to portfolio scope are queryable by scope=portfolio."""
|
|
await self._db.append_governance_event(
|
|
scope="portfolio", project_id="portfolio", actor_type="system",
|
|
event_type="portfolio_event", idempotency_key=f"port|{self._pid}",
|
|
)
|
|
items = await self._db.list_governance_events(scope="portfolio", project_id="portfolio")
|
|
self.assertTrue(any(i["event_type"] == "portfolio_event" for i in items))
|
|
|
|
# ── Gate hooks ───────────────────────────────────────────────────────────
|
|
async def test_gate_preview_logs_event(self):
|
|
"""evaluate_governance_gates(dry_run=True) logs gate_previewed event."""
|
|
import asyncio
|
|
await self._db.evaluate_governance_gates(self._pid, dry_run=True)
|
|
# Fire-and-forget tasks need a moment to run
|
|
await asyncio.sleep(0.05)
|
|
items = await self._db.list_governance_events(project_id=self._pid, event_type="gate_previewed")
|
|
self.assertEqual(len(items), 1, "Should log gate_previewed on preview")
|
|
|
|
async def test_gate_persist_logs_event(self):
|
|
"""evaluate_governance_gates(dry_run=False) logs gate_evaluated event."""
|
|
import asyncio
|
|
await self._db.evaluate_governance_gates(self._pid, dry_run=False)
|
|
await asyncio.sleep(0.05)
|
|
items = await self._db.list_governance_events(project_id=self._pid, event_type="gate_evaluated")
|
|
self.assertEqual(len(items), 1, "Should log gate_evaluated on persist")
|
|
|
|
# ── Auto-plan hook ────────────────────────────────────────────────────────
|
|
async def test_drift_plan_logs_event(self):
|
|
"""auto_plan_drift_signal logs drift_planned event."""
|
|
import asyncio, json
|
|
db = await self._db.get_db()
|
|
ev = {"bucket": "2026-W95", "metric": "ops",
|
|
"projects": [{"project_id": self._pid, "streak": {"dir": "regressing", "len": 2}}]}
|
|
sig_id = f"sig-lv7-{self._pid[:8]}"
|
|
await db.execute(
|
|
"INSERT OR IGNORE INTO graph_signals(id,project_id,signal_type,severity,title,summary,evidence,status,created_at,updated_at)"
|
|
" VALUES(?,?,?,?,?,?,?,?,?,?)",
|
|
(sig_id, "portfolio", "portfolio_ops_drift", "high", "Ops W95", "", json.dumps(ev), "open", self._db._now(), self._db._now()),
|
|
)
|
|
await db.commit()
|
|
|
|
await self._db.auto_plan_drift_signal(sig_id)
|
|
await asyncio.sleep(0.05)
|
|
items = await self._db.list_governance_events(scope="portfolio", project_id="portfolio", event_type="drift_planned")
|
|
self.assertGreater(len(items), 0, "drift_planned event should be logged")
|
|
|
|
# ── Evidence JSON schema ──────────────────────────────────────────────────
|
|
async def test_evidence_schema_v1(self):
|
|
"""Stored evidence has v=1 schema with required keys."""
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id=self._pid, actor_type="user",
|
|
event_type="schema_check", idempotency_key=f"schema|{self._pid}",
|
|
evidence=self._db._make_evidence("test schema", bucket="2026-W09"),
|
|
)
|
|
items = await self._db.list_governance_events(project_id=self._pid, event_type="schema_check")
|
|
self.assertEqual(len(items), 1)
|
|
ev = items[0]["evidence"]
|
|
self.assertEqual(ev.get("v"), 1)
|
|
self.assertIn("message", ev)
|
|
self.assertIn("links", ev)
|
|
self.assertIn("timings", ev)
|
|
|
|
|
|
class TestAgentOverrides(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Level 8: Agent Overrides (Projects = Agents)."""
|
|
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
await _db_module.get_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
|
|
# ── CRUD ─────────────────────────────────────────────────────────────────
|
|
async def test_upsert_creates_override(self):
|
|
"""upsert_agent_override creates a new row."""
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", "helion",
|
|
display_name="Helion Test",
|
|
domain="ai, strategy",
|
|
)
|
|
self.assertEqual(ov["node_id"], "NODA1")
|
|
self.assertEqual(ov["agent_id"], "helion")
|
|
self.assertEqual(ov["display_name"], "Helion Test")
|
|
|
|
async def test_upsert_updates_override(self):
|
|
"""Second upsert updates existing row."""
|
|
await self._db.upsert_agent_override("NODA1", "druid", display_name="Druid v1")
|
|
ov2 = await self._db.upsert_agent_override("NODA1", "druid", display_name="Druid v2")
|
|
self.assertEqual(ov2["display_name"], "Druid v2")
|
|
|
|
async def test_get_returns_none_for_missing(self):
|
|
"""get_agent_override returns None when no override exists."""
|
|
result = await self._db.get_agent_override("NODA1", "nonexistent_agent")
|
|
self.assertIsNone(result)
|
|
|
|
async def test_upsert_partial_update(self):
|
|
"""Partial upsert only changes provided fields."""
|
|
await self._db.upsert_agent_override("NODA1", "sofiia",
|
|
display_name="Sophia", domain="arch")
|
|
# Update only system_prompt_md, not display_name
|
|
ov2 = await self._db.upsert_agent_override("NODA1", "sofiia",
|
|
system_prompt_md="# You are Sophia")
|
|
self.assertEqual(ov2["display_name"], "Sophia") # unchanged
|
|
self.assertEqual(ov2["system_prompt_md"], "# You are Sophia") # updated
|
|
|
|
async def test_delete_removes_override(self):
|
|
"""delete_agent_override removes the row."""
|
|
await self._db.upsert_agent_override("NODA1", "senpai", display_name="Senpai")
|
|
await self._db.delete_agent_override("NODA1", "senpai")
|
|
result = await self._db.get_agent_override("NODA1", "senpai")
|
|
self.assertIsNone(result)
|
|
|
|
async def test_list_agent_overrides_filtered(self):
|
|
"""list_agent_overrides filters by node_id."""
|
|
await self._db.upsert_agent_override("NODA1", "agent_a", display_name="A")
|
|
await self._db.upsert_agent_override("NODA2", "agent_b", display_name="B")
|
|
noda1 = await self._db.list_agent_overrides("NODA1")
|
|
noda2 = await self._db.list_agent_overrides("NODA2")
|
|
self.assertTrue(all(o["node_id"] == "NODA1" for o in noda1))
|
|
self.assertTrue(all(o["node_id"] == "NODA2" for o in noda2))
|
|
|
|
async def test_is_hidden_flag(self):
|
|
"""is_hidden flag is stored and retrieved correctly."""
|
|
await self._db.upsert_agent_override("NODA1", "hidden_agent", is_hidden=True)
|
|
ov = await self._db.get_agent_override("NODA1", "hidden_agent")
|
|
self.assertTrue(bool(ov["is_hidden"]))
|
|
|
|
# ── Registry merge ────────────────────────────────────────────────────────
|
|
async def test_override_display_name_persists(self):
|
|
"""display_name override is persisted and retrievable."""
|
|
await self._db.upsert_agent_override("NODA1", "helion", display_name="Helion Custom")
|
|
ov = await self._db.get_agent_override("NODA1", "helion")
|
|
self.assertEqual(ov["display_name"], "Helion Custom")
|
|
self.assertIsNone(ov.get("system_prompt_md"))
|
|
|
|
|
|
class TestAgentsOps(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Agents Ops: versioning, safe apply, drift, bulk, audit events."""
|
|
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
await _db_module.get_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Version snapshots ──────────────────────────────────────────────────────
|
|
|
|
async def test_upsert_creates_version_snapshot(self):
|
|
"""upsert_agent_override creates a version row automatically."""
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", "helion",
|
|
display_name="Helion", system_prompt_md="# Helion prompt",
|
|
)
|
|
self.assertIn("version_hash", ov)
|
|
self.assertIsNotNone(ov["version_hash"])
|
|
versions = await self._db.list_agent_versions("NODA1", "helion")
|
|
self.assertGreaterEqual(len(versions), 1)
|
|
self.assertEqual(versions[0]["version_hash"], ov["version_hash"])
|
|
|
|
async def test_versions_are_idempotent_by_content(self):
|
|
"""Same content twice → only one version row (idempotent by hash)."""
|
|
await self._db.upsert_agent_override("NODA1", "druid",
|
|
system_prompt_md="# same")
|
|
await self._db.upsert_agent_override("NODA1", "druid",
|
|
system_prompt_md="# same")
|
|
versions = await self._db.list_agent_versions("NODA1", "druid")
|
|
hashes = [v["version_hash"] for v in versions]
|
|
self.assertEqual(len(hashes), len(set(hashes)), "duplicate version hash")
|
|
|
|
async def test_different_content_creates_new_version(self):
|
|
"""Different content → new version row."""
|
|
await self._db.upsert_agent_override("NODA1", "sofiia",
|
|
system_prompt_md="# v1")
|
|
await self._db.upsert_agent_override("NODA1", "sofiia",
|
|
system_prompt_md="# v2")
|
|
versions = await self._db.list_agent_versions("NODA1", "sofiia")
|
|
self.assertGreaterEqual(len(versions), 2)
|
|
|
|
async def test_get_agent_version_by_hash(self):
|
|
"""get_agent_version_by_hash returns correct payload."""
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", "senpai", system_prompt_md="# senpai v1",
|
|
)
|
|
vh = ov["version_hash"]
|
|
version = await self._db.get_agent_version_by_hash("NODA1", "senpai", vh)
|
|
self.assertIsNotNone(version)
|
|
self.assertEqual(version["payload"].get("system_prompt_md"), "# senpai v1")
|
|
|
|
async def test_get_agent_version_returns_none_for_unknown_hash(self):
|
|
"""Unknown version_hash returns None."""
|
|
result = await self._db.get_agent_version_by_hash("NODA1", "nobody", "deadbeef")
|
|
self.assertIsNone(result)
|
|
|
|
# ── Payload hash ──────────────────────────────────────────────────────────
|
|
|
|
async def test_payload_hash_ignores_timestamps(self):
|
|
"""_agent_payload_hash ignores updated_at and last_applied_at."""
|
|
p1 = {"display_name": "X", "domain": "ai", "system_prompt_md": "# hi",
|
|
"updated_at": "2026-01-01", "last_applied_at": "2026-02-01"}
|
|
p2 = {"display_name": "X", "domain": "ai", "system_prompt_md": "# hi",
|
|
"updated_at": "2026-05-05", "last_applied_at": None}
|
|
h1 = self._db._agent_payload_hash(p1)
|
|
h2 = self._db._agent_payload_hash(p2)
|
|
self.assertEqual(h1, h2)
|
|
|
|
async def test_payload_hash_differs_on_content_change(self):
|
|
"""Different content → different hash."""
|
|
h1 = self._db._agent_payload_hash({"system_prompt_md": "v1"})
|
|
h2 = self._db._agent_payload_hash({"system_prompt_md": "v2"})
|
|
self.assertNotEqual(h1, h2)
|
|
|
|
# ── last_applied_hash (mark applied) ──────────────────────────────────────
|
|
|
|
async def test_mark_applied_hash_stored(self):
|
|
"""_mark_applied_hash is stored in last_applied_hash."""
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", "helion_apply", system_prompt_md="# prompt",
|
|
)
|
|
vh = ov["version_hash"]
|
|
# Simulate successful apply
|
|
updated = await self._db.upsert_agent_override(
|
|
"NODA1", "helion_apply", _mark_applied_hash=vh,
|
|
)
|
|
self.assertEqual(updated["last_applied_hash"], vh)
|
|
self.assertIsNotNone(updated.get("last_applied_at"))
|
|
|
|
async def test_drift_detected_when_hash_differs(self):
|
|
"""Drift = last_applied_hash != desired_hash."""
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", "drift_agent", system_prompt_md="# original",
|
|
)
|
|
old_hash = ov["version_hash"]
|
|
# Mark applied at old version
|
|
await self._db.upsert_agent_override("NODA1", "drift_agent",
|
|
_mark_applied_hash=old_hash)
|
|
# Change content → new desired hash
|
|
ov2 = await self._db.upsert_agent_override(
|
|
"NODA1", "drift_agent", system_prompt_md="# changed",
|
|
)
|
|
new_hash = ov2["version_hash"]
|
|
self.assertNotEqual(old_hash, new_hash, "should be different versions")
|
|
fresh = await self._db.get_agent_override("NODA1", "drift_agent")
|
|
self.assertEqual(fresh["last_applied_hash"], old_hash)
|
|
# Drift: desired (new_hash) != last_applied (old_hash)
|
|
desired = {"display_name": fresh.get("display_name"),
|
|
"domain": fresh.get("domain"),
|
|
"system_prompt_md": fresh.get("system_prompt_md")}
|
|
desired_hash = self._db._agent_payload_hash(desired)
|
|
self.assertNotEqual(desired_hash, fresh["last_applied_hash"])
|
|
|
|
# ── Audit integration ─────────────────────────────────────────────────────
|
|
|
|
async def test_audit_agent_override_saved_event(self):
|
|
"""agent_override_saved audit event is written correctly."""
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id="helion",
|
|
actor_type="user",
|
|
event_type="agent_override_saved",
|
|
idempotency_key="test|aos|NODA1|helion|abc123",
|
|
severity="info", status="ok",
|
|
ref_type="agent", ref_id="helion",
|
|
evidence=self._db._make_evidence(
|
|
message="Override saved",
|
|
outputs={"version_hash": "abc123"},
|
|
),
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="project", project_id="helion",
|
|
event_type="agent_override_saved",
|
|
)
|
|
self.assertGreaterEqual(len(events), 1)
|
|
self.assertEqual(events[0]["event_type"], "agent_override_saved")
|
|
|
|
async def test_audit_agent_apply_planned_event(self):
|
|
"""agent_apply_planned audit event written for dry-run."""
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id="druid",
|
|
actor_type="user",
|
|
event_type="agent_apply_planned",
|
|
idempotency_key="test|aap|NODA1|druid|planXXX",
|
|
severity="info", status="ok",
|
|
ref_type="agent", ref_id="druid",
|
|
evidence=self._db._make_evidence(
|
|
message="Apply planned for druid@NODA1",
|
|
outputs={"plan_id": "planXXX", "will_change": True, "diff_lines": 5},
|
|
),
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="project", project_id="druid",
|
|
event_type="agent_apply_planned",
|
|
)
|
|
self.assertGreaterEqual(len(events), 1)
|
|
ev = events[0]
|
|
self.assertEqual(ev["status"], "ok")
|
|
ev_json = ev.get("evidence") or {}
|
|
self.assertIn("plan_id", ev_json.get("outputs", {}))
|
|
|
|
async def test_audit_idempotency_no_duplicate(self):
|
|
"""Same idempotency_key → only one event row."""
|
|
key = "test|unique|ops|agent1|abc"
|
|
for _ in range(3):
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id="agent1",
|
|
actor_type="user",
|
|
event_type="agent_apply_executed",
|
|
idempotency_key=key,
|
|
severity="info", status="ok",
|
|
ref_type="agent", ref_id="agent1",
|
|
evidence={},
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="project", project_id="agent1",
|
|
event_type="agent_apply_executed",
|
|
)
|
|
matching = [e for e in events if e.get("idempotency_key") == key]
|
|
self.assertEqual(len(matching), 1)
|
|
|
|
async def test_audit_rollback_event(self):
|
|
"""agent_rollback_executed event is written."""
|
|
await self._db.append_governance_event(
|
|
scope="project", project_id="senpai",
|
|
actor_type="user",
|
|
event_type="agent_rollback_executed",
|
|
idempotency_key="test|arb|NODA1|senpai|abc|t1",
|
|
severity="warn", status="ok",
|
|
ref_type="agent", ref_id="senpai",
|
|
evidence=self._db._make_evidence(
|
|
message="Rollback to abc",
|
|
outputs={"version_hash": "abc"},
|
|
),
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="project", project_id="senpai",
|
|
event_type="agent_rollback_executed",
|
|
)
|
|
self.assertGreaterEqual(len(events), 1)
|
|
self.assertEqual(events[0]["severity"], "warn")
|
|
|
|
|
|
class TestMultiNodeCanary(unittest.IsolatedAsyncioTestCase):
|
|
"""Tests for Multi-node fanout + Canary Apply (Agents Ops v2)."""
|
|
|
|
async def asyncSetUp(self):
|
|
_db_module._db_conn = None
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
await _db_module.get_db()
|
|
self._db = _db_module
|
|
|
|
async def asyncTearDown(self):
|
|
try:
|
|
await _db_module.close_db()
|
|
except Exception:
|
|
pass
|
|
|
|
# ── node_policy helper ─────────────────────────────────────────────────────
|
|
|
|
def test_node_policy_prod_defaults(self):
|
|
"""get_node_policy returns prod defaults for NODA1."""
|
|
import sys
|
|
sys.path.insert(0, str(
|
|
__import__('pathlib').Path(__file__).resolve().parent.parent /
|
|
'services' / 'sofiia-console'
|
|
))
|
|
from app.config import get_node_policy
|
|
policy = get_node_policy("NODA1")
|
|
self.assertEqual(policy["node_role"], "prod")
|
|
self.assertGreaterEqual(policy["gateway_timeout_ms"], 1000)
|
|
|
|
def test_node_policy_dev_shorter_timeout(self):
|
|
"""NODA2 (dev) has shorter timeout than NODA1 (prod)."""
|
|
import sys
|
|
sys.path.insert(0, str(
|
|
__import__('pathlib').Path(__file__).resolve().parent.parent /
|
|
'services' / 'sofiia-console'
|
|
))
|
|
from app.config import get_node_policy
|
|
prod = get_node_policy("NODA1")
|
|
dev = get_node_policy("NODA2")
|
|
self.assertLessEqual(dev["gateway_timeout_ms"], prod["gateway_timeout_ms"])
|
|
|
|
# ── payload hash / drift ───────────────────────────────────────────────────
|
|
|
|
async def test_canary_selects_drift_agents_deterministically(self):
|
|
"""Canary candidate selection: only drift agents, sorted by agent_id."""
|
|
# Create 3 overrides; mark 2 as drift
|
|
for aid in ["beta_agent", "alpha_agent", "gamma_agent"]:
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", aid, system_prompt_md=f"# {aid} v1"
|
|
)
|
|
# mark alpha and gamma as applied (no drift), beta NOT applied (drift)
|
|
if aid in ("alpha_agent", "gamma_agent"):
|
|
await self._db.upsert_agent_override(
|
|
"NODA1", aid, _mark_applied_hash=ov["version_hash"]
|
|
)
|
|
|
|
overrides = await self._db.list_agent_overrides("NODA1")
|
|
candidates = sorted(
|
|
[o for o in overrides if not o.get("is_hidden")],
|
|
key=lambda o: o["agent_id"],
|
|
)
|
|
drift_candidates = []
|
|
for o in candidates:
|
|
desired = {"display_name": o.get("display_name"),
|
|
"domain": o.get("domain"),
|
|
"system_prompt_md": o.get("system_prompt_md")}
|
|
plan_id = self._db._agent_payload_hash(desired)
|
|
is_drift = bool(o.get("last_applied_hash") and
|
|
o["last_applied_hash"] != plan_id)
|
|
if is_drift:
|
|
drift_candidates.append(o["agent_id"])
|
|
|
|
# beta_agent has no last_applied_hash → no drift by the formula
|
|
# (drift = last_applied_hash exists AND != plan_id)
|
|
# alpha and gamma have last_applied_hash matching plan_id → no drift
|
|
# So for this test, change beta so it HAS last_applied_hash but outdated
|
|
await self._db.upsert_agent_override("NODA1", "beta_agent",
|
|
_mark_applied_hash="oldHash")
|
|
# Now update content so desired != old
|
|
await self._db.upsert_agent_override("NODA1", "beta_agent",
|
|
system_prompt_md="# beta v2")
|
|
overrides2 = await self._db.list_agent_overrides("NODA1")
|
|
drift2 = []
|
|
for o in sorted(overrides2, key=lambda x: x["agent_id"]):
|
|
desired = {"display_name": o.get("display_name"),
|
|
"domain": o.get("domain"),
|
|
"system_prompt_md": o.get("system_prompt_md")}
|
|
plan_id = self._db._agent_payload_hash(desired)
|
|
if o.get("last_applied_hash") and o["last_applied_hash"] != plan_id:
|
|
drift2.append(o["agent_id"])
|
|
# beta_agent should be the only drift candidate
|
|
self.assertIn("beta_agent", drift2)
|
|
self.assertNotIn("alpha_agent", drift2)
|
|
self.assertNotIn("gamma_agent", drift2)
|
|
# Sorted: beta comes after alpha alphabetically
|
|
self.assertEqual(drift2, sorted(drift2))
|
|
|
|
# ── audit events for bulk ──────────────────────────────────────────────────
|
|
|
|
async def test_bulk_plan_created_audit_event(self):
|
|
"""agent_bulk_plan_created event written correctly."""
|
|
run_id = "testrun001"
|
|
await self._db.append_governance_event(
|
|
scope="portfolio", project_id="portfolio",
|
|
actor_type="user",
|
|
event_type="agent_bulk_plan_created",
|
|
idempotency_key=f"abpc|{run_id}|NODA1|all",
|
|
severity="info", status="ok",
|
|
evidence=self._db._make_evidence(
|
|
message="Bulk apply planned",
|
|
outputs={"mode": "all", "nodes": ["NODA1"], "dry_run": True},
|
|
),
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="portfolio", project_id="portfolio",
|
|
event_type="agent_bulk_plan_created",
|
|
)
|
|
self.assertGreaterEqual(len(events), 1)
|
|
|
|
async def test_canary_stopped_audit_event(self):
|
|
"""agent_bulk_canary_stopped event has high severity."""
|
|
run_id = "testrun002"
|
|
await self._db.append_governance_event(
|
|
scope="portfolio", project_id="portfolio",
|
|
actor_type="user",
|
|
event_type="agent_bulk_canary_stopped",
|
|
idempotency_key=f"abcstop|{run_id}|NODA1|helion",
|
|
severity="high", status="error",
|
|
evidence=self._db._make_evidence(
|
|
message="Canary stopped on helion: HTTP 500",
|
|
outputs={"failed_agent": "helion", "error": "HTTP 500"},
|
|
),
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="portfolio", project_id="portfolio",
|
|
event_type="agent_bulk_canary_stopped",
|
|
)
|
|
self.assertGreaterEqual(len(events), 1)
|
|
self.assertEqual(events[0]["severity"], "high")
|
|
self.assertEqual(events[0]["status"], "error")
|
|
|
|
async def test_canary_completed_audit_event(self):
|
|
"""agent_bulk_apply_completed event written on success."""
|
|
run_id = "testrun003"
|
|
await self._db.append_governance_event(
|
|
scope="portfolio", project_id="portfolio",
|
|
actor_type="user",
|
|
event_type="agent_bulk_apply_completed",
|
|
idempotency_key=f"abac|{run_id}|NODA1",
|
|
severity="info", status="ok",
|
|
evidence=self._db._make_evidence(
|
|
message="Canary apply completed: 2 agents",
|
|
outputs={"agents_applied": ["sofiia", "helion"]},
|
|
),
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="portfolio", project_id="portfolio",
|
|
event_type="agent_bulk_apply_completed",
|
|
)
|
|
self.assertGreaterEqual(len(events), 1)
|
|
self.assertEqual(events[0]["status"], "ok")
|
|
|
|
async def test_bulk_audit_idempotent(self):
|
|
"""Bulk audit events with same key do not duplicate."""
|
|
key = "abpc|DUPTEST|NODA1|canary"
|
|
for _ in range(3):
|
|
await self._db.append_governance_event(
|
|
scope="portfolio", project_id="portfolio",
|
|
actor_type="user",
|
|
event_type="agent_bulk_plan_created",
|
|
idempotency_key=key,
|
|
severity="info", status="ok",
|
|
evidence={},
|
|
)
|
|
events = await self._db.list_governance_events(
|
|
scope="portfolio", project_id="portfolio",
|
|
event_type="agent_bulk_plan_created",
|
|
)
|
|
matching = [e for e in events if e.get("idempotency_key") == key]
|
|
self.assertEqual(len(matching), 1)
|
|
|
|
# ── partial failure isolation ──────────────────────────────────────────────
|
|
|
|
async def test_partial_failure_does_not_block_other_overrides(self):
|
|
"""If one agent has no prompt, others in overrides list are still processed."""
|
|
# Create 2 overrides: one with prompt, one without
|
|
await self._db.upsert_agent_override("NODA1", "agent_with_prompt",
|
|
system_prompt_md="# valid prompt")
|
|
await self._db.upsert_agent_override("NODA1", "agent_no_prompt",
|
|
domain="test-domain") # no system_prompt_md
|
|
|
|
overrides = await self._db.list_agent_overrides("NODA1")
|
|
results = []
|
|
for ov in overrides:
|
|
desired_prompt = (ov.get("system_prompt_md") or "")
|
|
gw_url = "" # no gateway in test
|
|
if not desired_prompt or not gw_url:
|
|
results.append({"agent_id": ov["agent_id"], "status": "skipped"})
|
|
else:
|
|
results.append({"agent_id": ov["agent_id"], "status": "planned"})
|
|
# Both processed, none throws
|
|
agent_ids = [r["agent_id"] for r in results]
|
|
self.assertIn("agent_with_prompt", agent_ids)
|
|
self.assertIn("agent_no_prompt", agent_ids)
|
|
# agent_with_prompt has prompt but no gw_url → skipped (same path as no-gw)
|
|
for r in results:
|
|
self.assertIn(r["status"], ("skipped", "planned"))
|
|
|
|
async def test_limit_canary_to_n_agents(self):
|
|
"""Canary with limit=2 selects at most 2 drift agents."""
|
|
for i in range(5):
|
|
ov = await self._db.upsert_agent_override(
|
|
"NODA1", f"canary_agent_{i}", system_prompt_md=f"# v{i}")
|
|
await self._db.upsert_agent_override(
|
|
"NODA1", f"canary_agent_{i}", _mark_applied_hash="oldhash{i}")
|
|
# Update to create drift
|
|
await self._db.upsert_agent_override(
|
|
"NODA1", f"canary_agent_{i}", system_prompt_md=f"# v{i} updated")
|
|
|
|
overrides = await self._db.list_agent_overrides("NODA1")
|
|
candidates = sorted(
|
|
[o for o in overrides if not o.get("is_hidden") and
|
|
o["agent_id"].startswith("canary_agent_")],
|
|
key=lambda o: o["agent_id"],
|
|
)
|
|
limit = 2
|
|
drift_cands = []
|
|
for o in candidates:
|
|
desired = {"display_name": o.get("display_name"),
|
|
"domain": o.get("domain"),
|
|
"system_prompt_md": o.get("system_prompt_md")}
|
|
plan_id = self._db._agent_payload_hash(desired)
|
|
if o.get("last_applied_hash") and o["last_applied_hash"] != plan_id:
|
|
drift_cands.append(o["agent_id"])
|
|
selected = drift_cands[:limit]
|
|
self.assertLessEqual(len(selected), limit)
|
|
# Sorted deterministically
|
|
self.assertEqual(selected, sorted(selected))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|