Files
microdao-daarion/tests/test_intel_autofollowups.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

216 lines
8.3 KiB
Python

"""
Tests for create_autofollowups (auto follow-up creation for high-recurrence buckets).
"""
import sys, os, datetime
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
def _ts(days_ago: float = 0.0) -> str:
return (datetime.datetime.utcnow() - datetime.timedelta(days=days_ago)).isoformat()
def _make_inc(store, service, kind_tag, sig=None, days_ago=0.0, severity="P1"):
meta = {}
if sig:
meta["incident_signature"] = sig
if kind_tag:
meta["kind"] = kind_tag
return store.create_incident({
"service": service, "env": "prod", "severity": severity,
"title": f"{kind_tag} on {service}", "started_at": _ts(days_ago),
"created_by": "test", "meta": meta,
})
@pytest.fixture
def store():
from incident_store import MemoryIncidentStore
return MemoryIncidentStore()
@pytest.fixture
def base_policy():
import incident_intelligence
incident_intelligence._POLICY_CACHE = None
return {
"recurrence": {
"thresholds": {"signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}},
"top_n": 15,
},
"buckets": {
"mode": "service_kind",
"signature_prefix_len": 12,
"top_n": 10,
"min_count": {"7": 2, "30": 3},
},
"autofollowups": {
"enabled": True,
"only_when_high": True,
"owner": "oncall",
"priority": "P1",
"due_days": 7,
"max_followups_per_bucket_per_week": 1,
"dedupe_key_prefix": "intel_recur",
},
}
def _make_high_bucket(bucket_key="gateway|error_rate", count_7d=5, incident_id="inc_test_001"):
return {
"bucket_key": bucket_key,
"counts": {"7d": count_7d, "30d": count_7d + 2, "open": 1},
"last_seen": _ts(0.5),
"services": ["gateway"],
"kinds": {"error_rate"},
"top_signatures": [{"signature": "aabbccdd", "count": count_7d}],
"severity_mix": {"P1": count_7d},
"sample_incidents": [{"id": incident_id, "started_at": _ts(0.5), "status": "open",
"title": "error"}],
"recommendations": ["Fix error mapping"],
}
class TestAutoFollowups:
def test_creates_followup_on_high_bucket(self, store, base_policy):
from incident_intelligence import create_autofollowups, detect_recurrence
# Create high-recurrence incidents
inc = _make_inc(store, "gateway", "error_rate", days_ago=0.5)
# 5 incidents in 7d to hit high threshold (high=4)
for i in range(4):
_make_inc(store, "gateway", "error_rate", days_ago=float(i) * 0.3 + 0.1)
# Build rec_7d with high recurrence
from collections import defaultdict
rec_7d = {
"high_recurrence": {
"signatures": [],
"kinds": [{"kind": "error_rate", "count": 5, "services": ["gateway"]}],
},
"warn_recurrence": {"signatures": [], "kinds": []},
}
bucket = _make_high_bucket(bucket_key="gateway|error_rate",
count_7d=5, incident_id=inc["id"])
result = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store
)
assert len(result["created"]) == 1
assert result["created"][0]["bucket_key"] == "gateway|error_rate"
assert result["created"][0]["incident_id"] == inc["id"]
# Verify the event was appended
events = store.get_events(inc["id"])
followup_events = [e for e in events if e.get("type") == "followup"]
assert len(followup_events) == 1
assert "intel_recur" in followup_events[0].get("meta", {}).get("dedupe_key", "")
def test_dedupe_prevents_duplicate_in_same_week(self, store, base_policy):
from incident_intelligence import create_autofollowups
inc = _make_inc(store, "gateway", "error_rate")
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
{"kind": "error_rate", "count": 5, "services": ["gateway"]}]},
"warn_recurrence": {"signatures": [], "kinds": []}}
# First call → creates
r1 = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
week_str="2026-W08",
)
assert len(r1["created"]) == 1
# Second call same week → dedupe
r2 = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
week_str="2026-W08",
)
assert len(r2["created"]) == 0
assert any(s["reason"] == "already_exists" for s in r2["skipped"])
def test_different_week_allows_new_followup(self, store, base_policy):
from incident_intelligence import create_autofollowups
inc = _make_inc(store, "gateway", "error_rate")
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
{"kind": "error_rate", "count": 5, "services": ["gateway"]}]},
"warn_recurrence": {"signatures": [], "kinds": []}}
r1 = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
week_str="2026-W07",
)
r2 = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
week_str="2026-W08",
)
assert len(r1["created"]) == 1
assert len(r2["created"]) == 1
def test_not_high_bucket_skipped(self, store, base_policy):
from incident_intelligence import create_autofollowups
inc = _make_inc(store, "svc", "latency")
# low count — not in high_recurrence
bucket = {
"bucket_key": "svc|latency",
"counts": {"7d": 2, "30d": 3, "open": 1},
"last_seen": _ts(1), "services": ["svc"], "kinds": {"latency"},
"top_signatures": [], "severity_mix": {}, "sample_incidents": [
{"id": inc["id"], "started_at": _ts(1), "status": "open", "title": "t"}],
"recommendations": [],
}
rec_7d = {"high_recurrence": {"signatures": [], "kinds": []},
"warn_recurrence": {"signatures": [], "kinds": []}}
result = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store
)
assert len(result["created"]) == 0
assert any(s["reason"] == "not_high" for s in result["skipped"])
def test_disabled_skips_all(self, store, base_policy):
from incident_intelligence import create_autofollowups
base_policy["autofollowups"]["enabled"] = False
inc = _make_inc(store, "svc", "error_rate")
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
{"kind": "error_rate", "count": 5, "services": ["svc"]}]},
"warn_recurrence": {"signatures": [], "kinds": []}}
result = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store
)
assert len(result["created"]) == 0
assert result["skipped"][0]["reason"] == "disabled"
def test_followup_event_has_correct_meta(self, store, base_policy):
from incident_intelligence import create_autofollowups
inc = _make_inc(store, "gateway", "error_rate")
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
{"kind": "error_rate", "count": 5, "services": ["gateway"]}]},
"warn_recurrence": {"signatures": [], "kinds": []}}
result = create_autofollowups(
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
week_str="2026-W09",
)
assert result["created"]
events = store.get_events(inc["id"])
fu = next(e for e in events if e.get("type") == "followup")
meta = fu.get("meta", {})
assert meta.get("priority") == "P1"
assert meta.get("owner") == "oncall"
assert meta.get("auto_created") is True
assert "due_date" in meta
assert "dedupe_key" in meta