Files
microdao-daarion/tests/test_alert_state_machine.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

300 lines
11 KiB
Python

"""
Tests for Alert State Machine — MemoryAlertStore state transitions.
Covers:
- claim moves new→processing and locks
- second claim does not re-claim locked alerts
- lock expiry allows re-claim (stale processing requeue)
- mark_failed sets failed + retry lock
- mark_acked sets acked
- priority ordering (P0 before P1)
- requeue_expired_processing
- dashboard_counts
- top_signatures
- SignatureStateStore cooldown
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
ROUTER = ROOT / "services" / "router"
if str(ROUTER) not in sys.path:
sys.path.insert(0, str(ROUTER))
def _make_alert(service="gw", severity="P1", kind="slo_breach", fp="fp1", ref=None):
from alert_ingest import ingest_alert
return {
"source": "monitor@node1",
"service": service,
"env": "prod",
"severity": severity,
"kind": kind,
"title": f"{service} {kind}",
"summary": f"{service} issue",
"started_at": datetime.utcnow().isoformat(),
"labels": {"fingerprint": fp},
"metrics": {},
"evidence": {},
}
def _store_with_alert(alert_data=None):
from alert_store import MemoryAlertStore, set_alert_store
from alert_ingest import ingest_alert
store = MemoryAlertStore()
set_alert_store(store)
if alert_data is None:
alert_data = _make_alert()
result = ingest_alert(store, alert_data)
return store, result["alert_ref"]
class TestStateMachineClaim:
def teardown_method(self):
from alert_store import set_alert_store
set_alert_store(None)
def test_claim_new_alert(self):
store, ref = _store_with_alert()
claimed = store.claim_next_alerts(limit=5, owner="test_owner")
assert len(claimed) == 1
assert claimed[0]["alert_ref"] == ref
assert claimed[0]["status"] == "processing"
assert claimed[0]["processing_owner"] == "test_owner"
def test_claim_sets_lock(self):
store, ref = _store_with_alert()
store.claim_next_alerts(limit=5, owner="loop1", lock_ttl_seconds=600)
rec = store.get_alert(ref)
assert rec["processing_lock_until"] is not None
# Lock should be in the future
from datetime import datetime
lock = rec["processing_lock_until"]
assert lock > datetime.utcnow().isoformat()
def test_second_claim_skips_locked(self):
store, ref = _store_with_alert()
store.claim_next_alerts(limit=5, owner="loop1", lock_ttl_seconds=600)
# Second claim should not get the same alert
claimed2 = store.claim_next_alerts(limit=5, owner="loop2", lock_ttl_seconds=600)
assert len(claimed2) == 0
def test_expired_lock_allows_reclaim(self):
store, ref = _store_with_alert()
store.claim_next_alerts(limit=5, owner="loop1", lock_ttl_seconds=600)
# Manually expire the lock
with store._lock:
store._alerts[ref]["processing_lock_until"] = (
(datetime.utcnow() - timedelta(seconds=10)).isoformat()
)
claimed2 = store.claim_next_alerts(limit=5, owner="loop2", lock_ttl_seconds=600)
assert len(claimed2) == 1
assert claimed2[0]["processing_owner"] == "loop2"
def test_acked_alert_not_claimed(self):
store, ref = _store_with_alert()
store.mark_acked(ref, "test")
claimed = store.claim_next_alerts(limit=5)
assert len(claimed) == 0
def test_failed_alert_retried_after_lock_expires(self):
store, ref = _store_with_alert()
store.mark_failed(ref, "processing error", retry_after_seconds=300)
# Immediately after mark_failed, lock is in future → not claimable
claimed = store.claim_next_alerts(limit=5)
assert len(claimed) == 0
# Expire the retry lock
with store._lock:
store._alerts[ref]["processing_lock_until"] = (
(datetime.utcnow() - timedelta(seconds=10)).isoformat()
)
claimed2 = store.claim_next_alerts(limit=5)
assert len(claimed2) == 1
class TestStateMachineTransitions:
def teardown_method(self):
from alert_store import set_alert_store
set_alert_store(None)
def test_mark_acked(self):
store, ref = _store_with_alert()
store.claim_next_alerts(limit=5, owner="loop")
result = store.mark_acked(ref, "sofiia", note="incident:inc_001")
assert result["status"] == "acked"
rec = store.get_alert(ref)
assert rec["status"] == "acked"
assert rec["acked_at"] is not None
assert rec["processing_lock_until"] is None
def test_mark_failed(self):
store, ref = _store_with_alert()
store.claim_next_alerts(limit=5)
result = store.mark_failed(ref, "gateway timeout", retry_after_seconds=300)
assert result["status"] == "failed"
assert "retry_at" in result
rec = store.get_alert(ref)
assert rec["status"] == "failed"
assert rec["last_error"] == "gateway timeout"
def test_requeue_expired_processing(self):
store, ref = _store_with_alert()
store.claim_next_alerts(limit=5, lock_ttl_seconds=600)
# Expire the lock manually
with store._lock:
store._alerts[ref]["processing_lock_until"] = (
(datetime.utcnow() - timedelta(seconds=5)).isoformat()
)
count = store.requeue_expired_processing()
assert count == 1
rec = store.get_alert(ref)
assert rec["status"] == "new"
assert rec["processing_lock_until"] is None
def test_secret_redacted_in_last_error(self):
store, ref = _store_with_alert()
store.mark_failed(ref, "token=sk-secret123 failed processing")
rec = store.get_alert(ref)
assert "sk-secret123" not in rec["last_error"]
assert "***" in rec["last_error"]
class TestStateMachineDashboard:
def teardown_method(self):
from alert_store import set_alert_store
set_alert_store(None)
def test_dashboard_counts(self):
from alert_store import MemoryAlertStore, set_alert_store
from alert_ingest import ingest_alert
store = MemoryAlertStore()
set_alert_store(store)
a1 = ingest_alert(store, _make_alert(fp="fp1", ref="a1"))
a2 = ingest_alert(store, _make_alert(fp="fp2", ref="a2"))
a3 = ingest_alert(store, _make_alert(fp="fp3", ref="a3"))
store.claim_next_alerts(limit=1, owner="loop")
store.mark_acked(a2["alert_ref"], "test")
counts = store.dashboard_counts()
assert counts["new"] >= 1
assert counts["processing"] >= 1
assert counts["acked"] >= 1
def test_top_signatures(self):
from alert_store import MemoryAlertStore, set_alert_store
from alert_ingest import ingest_alert
store = MemoryAlertStore()
set_alert_store(store)
# Same signature: 3 occurrences
for i in range(3):
ingest_alert(store, _make_alert(fp="samefp"))
# Different signature: 1 occurrence
ingest_alert(store, _make_alert(fp="otherfp"))
top = store.top_signatures()
assert len(top) >= 1
assert top[0]["occurrences"] >= 3 # most common first
def test_list_alerts_status_filter(self):
from alert_store import MemoryAlertStore, set_alert_store
from alert_ingest import ingest_alert
store = MemoryAlertStore()
set_alert_store(store)
r1 = ingest_alert(store, _make_alert(fp="fp1"))
r2 = ingest_alert(store, _make_alert(fp="fp2"))
store.mark_acked(r2["alert_ref"], "test")
new_only = store.list_alerts({"status_in": ["new"]})
assert all(a["status"] == "new" for a in new_only)
acked_only = store.list_alerts({"status_in": ["acked"]})
assert all(a["status"] == "acked" for a in acked_only)
class TestSignatureStateStore:
def setup_method(self):
from signature_state_store import MemorySignatureStateStore, set_signature_state_store
self.store = MemorySignatureStateStore()
set_signature_state_store(self.store)
def teardown_method(self):
from signature_state_store import set_signature_state_store
set_signature_state_store(None)
def test_first_call_should_triage(self):
assert self.store.should_run_triage("sig_abc", cooldown_minutes=15) is True
def test_after_mark_cooldown_active(self):
self.store.mark_triage_run("sig_abc")
assert self.store.should_run_triage("sig_abc", cooldown_minutes=15) is False
def test_after_cooldown_passes_ok(self):
self.store.mark_triage_run("sig_abc")
# Manually back-date last_triage_at
with self.store._lock:
self.store._states["sig_abc"]["last_triage_at"] = (
(datetime.utcnow() - timedelta(minutes=20)).isoformat()
)
assert self.store.should_run_triage("sig_abc", cooldown_minutes=15) is True
def test_mark_alert_seen_creates_state(self):
self.store.mark_alert_seen("sig_xyz")
state = self.store.get_state("sig_xyz")
assert state is not None
assert state["last_alert_at"] is not None
assert state["last_triage_at"] is None
def test_triage_count_increments(self):
for _ in range(3):
self.store.mark_triage_run("sig_count")
state = self.store.get_state("sig_count")
assert state["triage_count_24h"] == 3
def test_different_signatures_independent(self):
self.store.mark_triage_run("sig_a")
assert self.store.should_run_triage("sig_b", cooldown_minutes=15) is True
class TestAlertStoreFactory:
def test_default_is_memory(self):
from alert_store import _create_alert_store, MemoryAlertStore
with patch.dict(os.environ, {"ALERT_BACKEND": "memory"}, clear=False):
store = _create_alert_store()
assert isinstance(store, MemoryAlertStore)
def test_auto_with_dsn_is_auto(self):
from alert_store import _create_alert_store, AutoAlertStore
env = {"ALERT_BACKEND": "auto", "DATABASE_URL": "postgresql://x:x@localhost/test"}
with patch.dict(os.environ, env, clear=False):
store = _create_alert_store()
assert isinstance(store, AutoAlertStore)
class TestClaimDedupeAndPriority:
def teardown_method(self):
from alert_store import set_alert_store
set_alert_store(None)
def test_multiple_new_alerts_claimed_in_order(self):
from alert_store import MemoryAlertStore, set_alert_store
from alert_ingest import ingest_alert
store = MemoryAlertStore()
set_alert_store(store)
ingest_alert(store, _make_alert(fp="fp1"))
ingest_alert(store, _make_alert(fp="fp2"))
ingest_alert(store, _make_alert(fp="fp3"))
claimed = store.claim_next_alerts(limit=2)
assert len(claimed) == 2
remaining = store.claim_next_alerts(limit=10)
assert len(remaining) == 1 # only one left