New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
216 lines
8.3 KiB
Python
216 lines
8.3 KiB
Python
"""
|
|
Tests for create_autofollowups (auto follow-up creation for high-recurrence buckets).
|
|
"""
|
|
import sys, os, datetime
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "router"))
|
|
|
|
|
|
def _ts(days_ago: float = 0.0) -> str:
|
|
return (datetime.datetime.utcnow() - datetime.timedelta(days=days_ago)).isoformat()
|
|
|
|
|
|
def _make_inc(store, service, kind_tag, sig=None, days_ago=0.0, severity="P1"):
|
|
meta = {}
|
|
if sig:
|
|
meta["incident_signature"] = sig
|
|
if kind_tag:
|
|
meta["kind"] = kind_tag
|
|
return store.create_incident({
|
|
"service": service, "env": "prod", "severity": severity,
|
|
"title": f"{kind_tag} on {service}", "started_at": _ts(days_ago),
|
|
"created_by": "test", "meta": meta,
|
|
})
|
|
|
|
|
|
@pytest.fixture
|
|
def store():
|
|
from incident_store import MemoryIncidentStore
|
|
return MemoryIncidentStore()
|
|
|
|
|
|
@pytest.fixture
|
|
def base_policy():
|
|
import incident_intelligence
|
|
incident_intelligence._POLICY_CACHE = None
|
|
return {
|
|
"recurrence": {
|
|
"thresholds": {"signature": {"warn": 2, "high": 4}, "kind": {"warn": 3, "high": 6}},
|
|
"top_n": 15,
|
|
},
|
|
"buckets": {
|
|
"mode": "service_kind",
|
|
"signature_prefix_len": 12,
|
|
"top_n": 10,
|
|
"min_count": {"7": 2, "30": 3},
|
|
},
|
|
"autofollowups": {
|
|
"enabled": True,
|
|
"only_when_high": True,
|
|
"owner": "oncall",
|
|
"priority": "P1",
|
|
"due_days": 7,
|
|
"max_followups_per_bucket_per_week": 1,
|
|
"dedupe_key_prefix": "intel_recur",
|
|
},
|
|
}
|
|
|
|
|
|
def _make_high_bucket(bucket_key="gateway|error_rate", count_7d=5, incident_id="inc_test_001"):
|
|
return {
|
|
"bucket_key": bucket_key,
|
|
"counts": {"7d": count_7d, "30d": count_7d + 2, "open": 1},
|
|
"last_seen": _ts(0.5),
|
|
"services": ["gateway"],
|
|
"kinds": {"error_rate"},
|
|
"top_signatures": [{"signature": "aabbccdd", "count": count_7d}],
|
|
"severity_mix": {"P1": count_7d},
|
|
"sample_incidents": [{"id": incident_id, "started_at": _ts(0.5), "status": "open",
|
|
"title": "error"}],
|
|
"recommendations": ["Fix error mapping"],
|
|
}
|
|
|
|
|
|
class TestAutoFollowups:
|
|
|
|
def test_creates_followup_on_high_bucket(self, store, base_policy):
|
|
from incident_intelligence import create_autofollowups, detect_recurrence
|
|
|
|
# Create high-recurrence incidents
|
|
inc = _make_inc(store, "gateway", "error_rate", days_ago=0.5)
|
|
# 5 incidents in 7d to hit high threshold (high=4)
|
|
for i in range(4):
|
|
_make_inc(store, "gateway", "error_rate", days_ago=float(i) * 0.3 + 0.1)
|
|
|
|
# Build rec_7d with high recurrence
|
|
from collections import defaultdict
|
|
rec_7d = {
|
|
"high_recurrence": {
|
|
"signatures": [],
|
|
"kinds": [{"kind": "error_rate", "count": 5, "services": ["gateway"]}],
|
|
},
|
|
"warn_recurrence": {"signatures": [], "kinds": []},
|
|
}
|
|
|
|
bucket = _make_high_bucket(bucket_key="gateway|error_rate",
|
|
count_7d=5, incident_id=inc["id"])
|
|
|
|
result = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store
|
|
)
|
|
assert len(result["created"]) == 1
|
|
assert result["created"][0]["bucket_key"] == "gateway|error_rate"
|
|
assert result["created"][0]["incident_id"] == inc["id"]
|
|
|
|
# Verify the event was appended
|
|
events = store.get_events(inc["id"])
|
|
followup_events = [e for e in events if e.get("type") == "followup"]
|
|
assert len(followup_events) == 1
|
|
assert "intel_recur" in followup_events[0].get("meta", {}).get("dedupe_key", "")
|
|
|
|
def test_dedupe_prevents_duplicate_in_same_week(self, store, base_policy):
|
|
from incident_intelligence import create_autofollowups
|
|
|
|
inc = _make_inc(store, "gateway", "error_rate")
|
|
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
|
|
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
|
|
{"kind": "error_rate", "count": 5, "services": ["gateway"]}]},
|
|
"warn_recurrence": {"signatures": [], "kinds": []}}
|
|
|
|
# First call → creates
|
|
r1 = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
|
|
week_str="2026-W08",
|
|
)
|
|
assert len(r1["created"]) == 1
|
|
|
|
# Second call same week → dedupe
|
|
r2 = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
|
|
week_str="2026-W08",
|
|
)
|
|
assert len(r2["created"]) == 0
|
|
assert any(s["reason"] == "already_exists" for s in r2["skipped"])
|
|
|
|
def test_different_week_allows_new_followup(self, store, base_policy):
|
|
from incident_intelligence import create_autofollowups
|
|
|
|
inc = _make_inc(store, "gateway", "error_rate")
|
|
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
|
|
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
|
|
{"kind": "error_rate", "count": 5, "services": ["gateway"]}]},
|
|
"warn_recurrence": {"signatures": [], "kinds": []}}
|
|
|
|
r1 = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
|
|
week_str="2026-W07",
|
|
)
|
|
r2 = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
|
|
week_str="2026-W08",
|
|
)
|
|
assert len(r1["created"]) == 1
|
|
assert len(r2["created"]) == 1
|
|
|
|
def test_not_high_bucket_skipped(self, store, base_policy):
|
|
from incident_intelligence import create_autofollowups
|
|
|
|
inc = _make_inc(store, "svc", "latency")
|
|
# low count — not in high_recurrence
|
|
bucket = {
|
|
"bucket_key": "svc|latency",
|
|
"counts": {"7d": 2, "30d": 3, "open": 1},
|
|
"last_seen": _ts(1), "services": ["svc"], "kinds": {"latency"},
|
|
"top_signatures": [], "severity_mix": {}, "sample_incidents": [
|
|
{"id": inc["id"], "started_at": _ts(1), "status": "open", "title": "t"}],
|
|
"recommendations": [],
|
|
}
|
|
rec_7d = {"high_recurrence": {"signatures": [], "kinds": []},
|
|
"warn_recurrence": {"signatures": [], "kinds": []}}
|
|
|
|
result = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store
|
|
)
|
|
assert len(result["created"]) == 0
|
|
assert any(s["reason"] == "not_high" for s in result["skipped"])
|
|
|
|
def test_disabled_skips_all(self, store, base_policy):
|
|
from incident_intelligence import create_autofollowups
|
|
|
|
base_policy["autofollowups"]["enabled"] = False
|
|
inc = _make_inc(store, "svc", "error_rate")
|
|
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
|
|
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
|
|
{"kind": "error_rate", "count": 5, "services": ["svc"]}]},
|
|
"warn_recurrence": {"signatures": [], "kinds": []}}
|
|
|
|
result = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store
|
|
)
|
|
assert len(result["created"]) == 0
|
|
assert result["skipped"][0]["reason"] == "disabled"
|
|
|
|
def test_followup_event_has_correct_meta(self, store, base_policy):
|
|
from incident_intelligence import create_autofollowups
|
|
|
|
inc = _make_inc(store, "gateway", "error_rate")
|
|
bucket = _make_high_bucket(count_7d=5, incident_id=inc["id"])
|
|
rec_7d = {"high_recurrence": {"signatures": [], "kinds": [
|
|
{"kind": "error_rate", "count": 5, "services": ["gateway"]}]},
|
|
"warn_recurrence": {"signatures": [], "kinds": []}}
|
|
|
|
result = create_autofollowups(
|
|
buckets=[bucket], rec_7d=rec_7d, policy=base_policy, store=store,
|
|
week_str="2026-W09",
|
|
)
|
|
assert result["created"]
|
|
events = store.get_events(inc["id"])
|
|
fu = next(e for e in events if e.get("type") == "followup")
|
|
meta = fu.get("meta", {})
|
|
assert meta.get("priority") == "P1"
|
|
assert meta.get("owner") == "oncall"
|
|
assert meta.get("auto_created") is True
|
|
assert "due_date" in meta
|
|
assert "dedupe_key" in meta
|