New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
509 lines
18 KiB
Python
509 lines
18 KiB
Python
"""
|
|
Tests for Cost & Resource Analyzer (FinOps MVP)
|
|
|
|
Covers:
|
|
1. test_audit_persist_nonfatal — broken store does not crash tool_governance
|
|
2. test_cost_report_aggregation — 20 synthetic events → correct totals
|
|
3. test_anomalies_spike_detection — baseline low, window high → anomaly detected
|
|
4. test_anomalies_no_spike — stable traffic → no anomalies
|
|
5. test_release_check_cost_watch — cost_watch gate always passes, adds recs
|
|
6. test_rbac_cost_tool_deny — denied without entitlements
|
|
7. test_weights_loaded — weights read from cost_weights.yml
|
|
8. test_top_report — top returns correct leaders
|
|
9. test_cost_watch_skipped_on_error — broken cost_analyzer → gate passes (skipped)
|
|
10. test_cost_event_cost_units — compute_event_cost correct calculation
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
# ─── Path setup ──────────────────────────────────────────────────────────────
|
|
ROUTER_DIR = Path(__file__).parent.parent / "services" / "router"
|
|
REPO_ROOT = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(ROUTER_DIR))
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
os.environ.setdefault("REPO_ROOT", str(REPO_ROOT))
|
|
os.environ["AUDIT_BACKEND"] = "memory" # default for all tests
|
|
|
|
# ─── Import modules ───────────────────────────────────────────────────────────
|
|
from audit_store import MemoryAuditStore, JsonlAuditStore, NullAuditStore, set_audit_store
|
|
from cost_analyzer import (
|
|
action_report,
|
|
action_top,
|
|
action_anomalies,
|
|
action_weights,
|
|
compute_event_cost,
|
|
reload_cost_weights,
|
|
analyze_cost_dict,
|
|
)
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _now_iso(delta_minutes: int = 0) -> str:
|
|
dt = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=delta_minutes)
|
|
return dt.isoformat()
|
|
|
|
|
|
def _make_event(
|
|
tool: str = "observability_tool",
|
|
agent_id: str = "sofiia",
|
|
user_id: str = "user_x",
|
|
workspace_id: str = "ws1",
|
|
status: str = "pass",
|
|
duration_ms: int = 200,
|
|
ts: str = None,
|
|
) -> Dict:
|
|
return {
|
|
"ts": ts or _now_iso(),
|
|
"req_id": "req-test-123",
|
|
"workspace_id": workspace_id,
|
|
"user_id": user_id,
|
|
"agent_id": agent_id,
|
|
"tool": tool,
|
|
"action": "query",
|
|
"status": status,
|
|
"duration_ms": duration_ms,
|
|
"in_size": 100,
|
|
"out_size": 500,
|
|
"input_hash": "sha256:abc",
|
|
}
|
|
|
|
|
|
# ─── 1. audit_persist_nonfatal ────────────────────────────────────────────────
|
|
|
|
class BrokenStore:
|
|
"""Raises on every operation to simulate storage failure."""
|
|
def write(self, event) -> None:
|
|
raise RuntimeError("disk full")
|
|
def read(self, **kwargs) -> List:
|
|
raise RuntimeError("disk full")
|
|
|
|
|
|
def test_audit_persist_nonfatal(tmp_path):
|
|
"""
|
|
If audit store raises, _emit_audit must NOT propagate the exception.
|
|
Tool execution continues normally.
|
|
"""
|
|
from tool_governance import ToolGovernance
|
|
|
|
broken = BrokenStore()
|
|
set_audit_store(broken)
|
|
|
|
try:
|
|
gov = ToolGovernance(enable_rbac=False, enable_limits=False, enable_allowlist=False)
|
|
result = gov.pre_call("some_tool", "action", agent_id="agent_cto", input_text="hello")
|
|
assert result.allowed
|
|
|
|
# post_call must not raise even with broken store
|
|
gov.post_call(result.call_ctx, {"data": "ok"})
|
|
# If we get here without exception — test passes
|
|
finally:
|
|
# Restore memory store
|
|
mem = MemoryAuditStore()
|
|
set_audit_store(mem)
|
|
|
|
|
|
# ─── 2. cost_report_aggregation ───────────────────────────────────────────────
|
|
|
|
def test_cost_report_aggregation():
|
|
"""20 synthetic events → totals and top_tools correct."""
|
|
store = MemoryAuditStore()
|
|
# 10 observability calls @ 200ms each
|
|
for _ in range(10):
|
|
store.write(_make_event("observability_tool", duration_ms=200))
|
|
# 5 pr_reviewer calls @ 1000ms each
|
|
for _ in range(5):
|
|
store.write(_make_event("pr_reviewer_tool", duration_ms=1000))
|
|
# 5 memory_search calls @ 50ms each
|
|
for _ in range(5):
|
|
store.write(_make_event("memory_search", duration_ms=50))
|
|
|
|
report = action_report(store, group_by=["tool"], top_n=10)
|
|
|
|
assert report["totals"]["calls"] == 20
|
|
assert report["totals"]["cost_units"] > 0
|
|
|
|
top_tools = report["breakdowns"]["tool"]
|
|
tool_names = [t["tool"] for t in top_tools]
|
|
# pr_reviewer_tool should be most expensive (10 + 2 cost_per_ms*1000 each)
|
|
assert "pr_reviewer_tool" in tool_names
|
|
# pr_reviewer should be #1 spender
|
|
assert top_tools[0]["tool"] == "pr_reviewer_tool"
|
|
|
|
|
|
def test_cost_event_cost_units():
|
|
"""compute_event_cost returns expected value."""
|
|
reload_cost_weights()
|
|
ev = _make_event("pr_reviewer_tool", duration_ms=500)
|
|
cost = compute_event_cost(ev)
|
|
# pr_reviewer: 10.0 + 500 * 0.002 = 11.0
|
|
assert abs(cost - 11.0) < 0.01
|
|
|
|
|
|
def test_cost_event_cost_units_default():
|
|
"""Unknown tool uses default weights."""
|
|
reload_cost_weights()
|
|
ev = _make_event("unknown_fancy_tool", duration_ms=1000)
|
|
cost = compute_event_cost(ev)
|
|
# defaults: 1.0 + 1000 * 0.001 = 2.0
|
|
assert abs(cost - 2.0) < 0.01
|
|
|
|
|
|
# ─── 3. anomalies_spike_detection ─────────────────────────────────────────────
|
|
|
|
def test_anomalies_spike_detection():
|
|
"""
|
|
Baseline: 2 calls in last 24h.
|
|
Window (last 60m): 80 calls — should trigger spike anomaly.
|
|
"""
|
|
store = MemoryAuditStore()
|
|
|
|
# Baseline events: 2 calls, ~23h ago
|
|
for _ in range(2):
|
|
ts = _now_iso(delta_minutes=-(23 * 60))
|
|
store.write(_make_event("comfy_generate_image", ts=ts))
|
|
|
|
# Window events: 80 calls, right now
|
|
for _ in range(80):
|
|
store.write(_make_event("comfy_generate_image"))
|
|
|
|
result = action_anomalies(
|
|
store,
|
|
window_minutes=60,
|
|
baseline_hours=24,
|
|
ratio_threshold=2.0,
|
|
min_calls=5,
|
|
)
|
|
|
|
assert result["anomaly_count"] >= 1
|
|
types = [a["type"] for a in result["anomalies"]]
|
|
assert "cost_spike" in types
|
|
|
|
spike = next(a for a in result["anomalies"] if a["type"] == "cost_spike")
|
|
assert spike["tool"] == "comfy_generate_image"
|
|
assert spike["window_calls"] == 80
|
|
|
|
|
|
def test_anomalies_no_spike():
|
|
"""Stable traffic → no anomalies."""
|
|
store = MemoryAuditStore()
|
|
|
|
# Same rate: 5 calls per hour for 25 hours
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
for h in range(25):
|
|
for _ in range(5):
|
|
ts = (now - datetime.timedelta(hours=h)).isoformat()
|
|
store.write(_make_event("observability_tool", ts=ts))
|
|
|
|
result = action_anomalies(
|
|
store,
|
|
window_minutes=60,
|
|
baseline_hours=24,
|
|
ratio_threshold=3.0,
|
|
min_calls=3,
|
|
)
|
|
|
|
# Should be 0 or very few — stable traffic
|
|
assert result["anomaly_count"] == 0
|
|
|
|
|
|
# ─── 4. top report ────────────────────────────────────────────────────────────
|
|
|
|
def test_top_report():
|
|
"""top action returns correct leaders."""
|
|
store = MemoryAuditStore()
|
|
# 5 comfy calls (expensive)
|
|
for _ in range(5):
|
|
store.write(_make_event("comfy_generate_video", duration_ms=3000))
|
|
# 2 memory calls (cheap)
|
|
for _ in range(2):
|
|
store.write(_make_event("memory_search", duration_ms=50, agent_id="agent_b"))
|
|
|
|
result = action_top(store, window_hours=1, top_n=5)
|
|
assert result["total_calls"] == 7
|
|
top_tools = result["top_tools"]
|
|
assert top_tools[0]["tool"] == "comfy_generate_video"
|
|
|
|
top_agents = result["top_agents"]
|
|
agent_names = [a["agent_id"] for a in top_agents]
|
|
assert "sofiia" in agent_names # "sofiia" is the agent_id mapped to role agent_cto
|
|
|
|
|
|
# ─── 5. release_check cost_watch gate ────────────────────────────────────────
|
|
|
|
def test_release_check_cost_watch_always_passes():
|
|
"""
|
|
cost_watch gate always returns pass=True.
|
|
Anomalies are added to recommendations, not to overall_pass=False.
|
|
"""
|
|
async def _run():
|
|
from release_check_runner import _run_cost_watch
|
|
|
|
class FakeToolResult:
|
|
def __init__(self, data):
|
|
self.success = True
|
|
self.result = data
|
|
self.error = None
|
|
|
|
async def fake_execute(tool_name, args, agent_id=None):
|
|
if tool_name == "cost_analyzer_tool":
|
|
return FakeToolResult({
|
|
"anomalies": [
|
|
{
|
|
"type": "cost_spike",
|
|
"tool": "comfy_generate_image",
|
|
"ratio": 5.0,
|
|
"window_calls": 100,
|
|
"baseline_calls": 2,
|
|
"recommendation": "Cost spike: comfy_generate_image — apply rate limit.",
|
|
}
|
|
],
|
|
"anomaly_count": 1,
|
|
})
|
|
|
|
mock_tm = MagicMock()
|
|
mock_tm.execute_tool = AsyncMock(side_effect=fake_execute)
|
|
return await _run_cost_watch(mock_tm, "sofiia", ratio_threshold=2.0, min_calls=5)
|
|
|
|
ok, gate = asyncio.run(_run())
|
|
|
|
assert ok is True, "cost_watch must always return pass=True"
|
|
assert gate["name"] == "cost_watch"
|
|
assert gate["status"] == "pass"
|
|
assert gate["anomalies_count"] >= 1
|
|
assert any("comfy" in r or "cost" in r.lower() for r in gate.get("recommendations", []))
|
|
|
|
|
|
def test_cost_watch_gate_in_full_release_check():
|
|
"""
|
|
Running release_check with minimal gates — cost_watch should appear in gates
|
|
and overall_pass should NOT be False due to cost_watch.
|
|
"""
|
|
async def _run():
|
|
from release_check_runner import run_release_check
|
|
|
|
class FakeTMResult:
|
|
def __init__(self, data, success=True, error=None):
|
|
self.success = success
|
|
self.result = data
|
|
self.error = error
|
|
|
|
async def fake_exec(tool_name, args, agent_id=None):
|
|
if tool_name == "pr_reviewer_tool":
|
|
return FakeTMResult({"approved": True, "verdict": "LGTM", "issues": []})
|
|
if tool_name == "config_linter_tool":
|
|
return FakeTMResult({"pass": True, "errors": [], "warnings": []})
|
|
if tool_name == "dependency_scanner_tool":
|
|
return FakeTMResult({"pass": True, "summary": "No vulns", "vulnerabilities": []})
|
|
if tool_name == "contract_tool":
|
|
return FakeTMResult({"pass": True, "breaking_changes": [], "warnings": []})
|
|
if tool_name == "threatmodel_tool":
|
|
return FakeTMResult({"risk_level": "low", "threats": []})
|
|
if tool_name == "cost_analyzer_tool":
|
|
return FakeTMResult({
|
|
"anomalies": [
|
|
{"type": "cost_spike", "tool": "observability_tool",
|
|
"ratio": 4.5, "window_calls": 100, "baseline_calls": 5,
|
|
"recommendation": "Reduce observability polling frequency."}
|
|
],
|
|
"anomaly_count": 1,
|
|
})
|
|
return FakeTMResult({})
|
|
|
|
tm = MagicMock()
|
|
tm.execute_tool = AsyncMock(side_effect=fake_exec)
|
|
|
|
inputs = {
|
|
"diff_text": "small change",
|
|
"run_smoke": False,
|
|
"run_drift": False,
|
|
"run_deps": True,
|
|
"run_cost_watch": True,
|
|
"cost_spike_ratio_threshold": 2.0,
|
|
"cost_min_calls_threshold": 5,
|
|
"cost_watch_window_hours": 24,
|
|
"fail_fast": False,
|
|
}
|
|
|
|
return await run_release_check(tm, inputs, agent_id="sofiia")
|
|
|
|
report = asyncio.run(_run())
|
|
|
|
gate_names = [g["name"] for g in report["gates"]]
|
|
assert "cost_watch" in gate_names
|
|
|
|
cost_gate = next(g for g in report["gates"] if g["name"] == "cost_watch")
|
|
assert cost_gate["status"] == "pass"
|
|
assert report["pass"] is True
|
|
|
|
|
|
# ─── 6. RBAC deny ─────────────────────────────────────────────────────────────
|
|
|
|
def test_rbac_cost_tool_deny():
|
|
"""Agent without tools.cost.read entitlements is denied.
|
|
'alateya' maps to role agent_media which has no tools.cost.read.
|
|
"""
|
|
from tool_governance import ToolGovernance
|
|
|
|
gov = ToolGovernance(enable_rbac=True, enable_limits=False, enable_allowlist=False)
|
|
result = gov.pre_call(
|
|
tool="cost_analyzer_tool",
|
|
action="report",
|
|
agent_id="alateya", # maps to agent_media (no tools.cost.read)
|
|
)
|
|
assert not result.allowed
|
|
assert "denied" in result.reason.lower() or "entitlement" in result.reason.lower()
|
|
|
|
|
|
def test_rbac_cost_tool_allow():
|
|
"""'sofiia' maps to role agent_cto which has tools.cost.read → allowed."""
|
|
from tool_governance import ToolGovernance
|
|
|
|
gov = ToolGovernance(enable_rbac=True, enable_limits=False, enable_allowlist=False)
|
|
result = gov.pre_call(
|
|
tool="cost_analyzer_tool",
|
|
action="report",
|
|
agent_id="sofiia", # maps to agent_cto
|
|
)
|
|
assert result.allowed
|
|
|
|
|
|
# ─── 7. weights_loaded ────────────────────────────────────────────────────────
|
|
|
|
def test_weights_loaded():
|
|
"""Weights read from cost_weights.yml and include expected tools."""
|
|
reload_cost_weights()
|
|
weights = action_weights()
|
|
|
|
assert "defaults" in weights
|
|
assert "tools" in weights
|
|
assert "anomaly" in weights
|
|
|
|
# Key tools must be present
|
|
tools = weights["tools"]
|
|
assert "pr_reviewer_tool" in tools
|
|
assert "comfy_generate_image" in tools
|
|
assert "comfy_generate_video" in tools
|
|
|
|
# Verify pr_reviewer cost
|
|
pr = tools["pr_reviewer_tool"]
|
|
assert float(pr["cost_per_call"]) == 10.0
|
|
|
|
# Defaults exist
|
|
defaults = weights["defaults"]
|
|
assert "cost_per_call" in defaults
|
|
assert "cost_per_ms" in defaults
|
|
|
|
|
|
# ─── 8. JSONL store round-trip ────────────────────────────────────────────────
|
|
|
|
def test_jsonl_store_roundtrip():
|
|
"""Write + read cycle with JsonlAuditStore."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
store = JsonlAuditStore(directory=tmpdir)
|
|
for i in range(10):
|
|
ev = _make_event("observability_tool")
|
|
store.write(ev)
|
|
store.close()
|
|
|
|
rows = store.read()
|
|
assert len(rows) == 10
|
|
assert all(r["tool"] == "observability_tool" for r in rows)
|
|
|
|
|
|
def test_jsonl_store_filter_by_tool():
|
|
"""JSONL read respects tool filter."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
store = JsonlAuditStore(directory=tmpdir)
|
|
for i in range(5):
|
|
store.write(_make_event("observability_tool"))
|
|
for i in range(3):
|
|
store.write(_make_event("memory_search"))
|
|
store.close()
|
|
|
|
rows = store.read(tool="memory_search")
|
|
assert len(rows) == 3
|
|
|
|
|
|
# ─── 9. cost_watch skipped on error ──────────────────────────────────────────
|
|
|
|
def test_cost_watch_skipped_on_tool_error():
|
|
"""If cost_analyzer_tool fails, gate is skipped (pass=True, not error)."""
|
|
async def _run():
|
|
from release_check_runner import _run_cost_watch
|
|
|
|
class FailResult:
|
|
success = False
|
|
result = None
|
|
error = "tool unavailable"
|
|
|
|
tm = MagicMock()
|
|
tm.execute_tool = AsyncMock(return_value=FailResult())
|
|
return await _run_cost_watch(tm, "sofiia")
|
|
|
|
ok, gate = asyncio.run(_run())
|
|
assert ok is True
|
|
assert gate["status"] == "pass"
|
|
assert gate.get("skipped") is True
|
|
|
|
|
|
# ─── 10. analyze_cost_dict dispatch ──────────────────────────────────────────
|
|
|
|
def test_analyze_cost_dict_top():
|
|
"""analyze_cost_dict dispatches 'top' action correctly."""
|
|
store = MemoryAuditStore()
|
|
for _ in range(3):
|
|
store.write(_make_event("pr_reviewer_tool", duration_ms=800))
|
|
|
|
result = analyze_cost_dict("top", {"window_hours": 1, "top_n": 5}, store=store)
|
|
assert "top_tools" in result
|
|
assert result["top_tools"][0]["tool"] == "pr_reviewer_tool"
|
|
|
|
|
|
def test_analyze_cost_dict_unknown_action():
|
|
"""Unknown action returns error dict without raising."""
|
|
store = MemoryAuditStore()
|
|
result = analyze_cost_dict("explode", {}, store=store)
|
|
assert "error" in result
|
|
|
|
|
|
# ─── 11. Error rate spike ─────────────────────────────────────────────────────
|
|
|
|
def test_anomalies_error_rate_spike():
|
|
"""High failure rate triggers error_spike anomaly."""
|
|
store = MemoryAuditStore()
|
|
|
|
for _ in range(20):
|
|
store.write(_make_event("observability_tool", status="failed"))
|
|
for _ in range(5):
|
|
store.write(_make_event("observability_tool", status="pass"))
|
|
|
|
result = action_anomalies(
|
|
store,
|
|
window_minutes=60,
|
|
baseline_hours=24,
|
|
ratio_threshold=999.0, # disable cost spike
|
|
min_calls=5,
|
|
)
|
|
|
|
error_spikes = [a for a in result["anomalies"] if a["type"] == "error_spike"]
|
|
assert len(error_spikes) >= 1
|
|
es = error_spikes[0]
|
|
assert es["tool"] == "observability_tool"
|
|
assert float(es["error_rate"]) > 0.10
|
|
|
|
|