""" Cost & Resource Analyzer (FinOps MVP) Reads audit events from AuditStore and computes: - Aggregated cost_units by tool/agent/workspace/status - Top spenders (tools, agents, users) - Anomalies (cost spikes, error rate spikes) - Cost model weights "cost_units" = cost_per_call(tool) + duration_ms * cost_per_ms(tool) These are relative units, not real dollars. No payload access — all inputs are aggregation parameters only. """ from __future__ import annotations import datetime import logging import os from collections import defaultdict from pathlib import Path from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # ─── Config loader ──────────────────────────────────────────────────────────── _weights_cache: Optional[Dict] = None _WEIGHTS_PATH = os.path.join( os.getenv("REPO_ROOT", str(Path(__file__).parent.parent.parent)), "config", "cost_weights.yml", ) def _load_weights() -> Dict: global _weights_cache if _weights_cache is not None: return _weights_cache try: import yaml with open(_WEIGHTS_PATH, "r") as f: _weights_cache = yaml.safe_load(f) or {} except Exception as e: logger.warning("cost_weights.yml not loaded: %s", e) _weights_cache = {} return _weights_cache def reload_cost_weights() -> None: """Force reload weights (for tests).""" global _weights_cache _weights_cache = None def get_weights_for_tool(tool: str) -> Tuple[float, float]: """Return (cost_per_call, cost_per_ms) for a tool.""" cfg = _load_weights() defaults = cfg.get("defaults", {}) tool_cfg = (cfg.get("tools") or {}).get(tool, {}) cpc = float(tool_cfg.get("cost_per_call", defaults.get("cost_per_call", 1.0))) cpm = float(tool_cfg.get("cost_per_ms", defaults.get("cost_per_ms", 0.001))) return cpc, cpm def compute_event_cost(event: Dict) -> float: """Compute cost_units for a single audit event.""" tool = event.get("tool", "") duration_ms = float(event.get("duration_ms", 0)) cpc, cpm = get_weights_for_tool(tool) return round(cpc + duration_ms * cpm, 4) # ─── Time helpers ───────────────────────────────────────────────────────────── def _now_utc() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _iso(dt: datetime.datetime) -> str: return dt.isoformat() def _parse_iso(s: str) -> datetime.datetime: s = s.replace("Z", "+00:00") try: return datetime.datetime.fromisoformat(s) except Exception: return _now_utc() def _bucket_hour(ts: str) -> str: """Truncate ISO ts to hour: '2026-02-23T10:00:00+00:00'.""" return ts[:13] + ":00" # ─── Aggregation helpers ────────────────────────────────────────────────────── def _aggregate( events: List[Dict], group_keys: List[str], ) -> Dict[str, Dict]: """ Aggregate events by composite key (e.g. ["tool"] or ["agent_id", "tool"]). Returns {key_str: {count, cost_units, duration_sum, failed_count, ...}}. """ result: Dict[str, Dict] = defaultdict(lambda: { "count": 0, "cost_units": 0.0, "duration_ms_sum": 0.0, "failed_count": 0, "denied_count": 0, "in_size_sum": 0, "out_size_sum": 0, }) for ev in events: parts = [str(ev.get(k, "unknown")) for k in group_keys] key = ":".join(parts) cost = compute_event_cost(ev) status = ev.get("status", "pass") r = result[key] r["count"] += 1 r["cost_units"] = round(r["cost_units"] + cost, 4) r["duration_ms_sum"] = round(r["duration_ms_sum"] + float(ev.get("duration_ms", 0)), 2) r["in_size_sum"] += int(ev.get("in_size", 0)) r["out_size_sum"] += int(ev.get("out_size", 0)) if status in ("failed", "error"): r["failed_count"] += 1 elif status == "denied": r["denied_count"] += 1 # Enrich with averages for key, r in result.items(): n = r["count"] or 1 r["avg_duration_ms"] = round(r["duration_ms_sum"] / n, 1) r["avg_cost_units"] = round(r["cost_units"] / n, 4) r["error_rate"] = round(r["failed_count"] / (r["count"] or 1), 4) return dict(result) def _top_n(aggregated: Dict[str, Dict], key_field: str, n: int, sort_by: str = "cost_units") -> List[Dict]: """Sort aggregated dict by sort_by and return top N.""" items = [ {"key": k, key_field: k, **v} for k, v in aggregated.items() ] items.sort(key=lambda x: x.get(sort_by, 0), reverse=True) return items[:n] # ─── Actions ────────────────────────────────────────────────────────────────── def action_report( store, time_range: Optional[Dict[str, str]] = None, group_by: Optional[List[str]] = None, top_n: int = 10, include_failed: bool = True, include_hourly: bool = False, ) -> Dict[str, Any]: """ Generate aggregated cost report for a time range. Returns: totals, breakdowns by group_by keys, top spenders, optional hourly trend. """ now = _now_utc() tr = time_range or {} from_ts = tr.get("from") or _iso(now - datetime.timedelta(days=7)) to_ts = tr.get("to") or _iso(now) events = store.read(from_ts=from_ts, to_ts=to_ts, limit=200_000) if not include_failed: events = [e for e in events if e.get("status", "pass") not in ("failed", "error")] # Totals total_cost = sum(compute_event_cost(e) for e in events) total_calls = len(events) total_failed = sum(1 for e in events if e.get("status") in ("failed", "error")) total_denied = sum(1 for e in events if e.get("status") == "denied") # Breakdowns by_key = group_by or ["tool"] breakdowns: Dict[str, List[Dict]] = {} for gk in by_key: agg = _aggregate(events, [gk]) breakdowns[gk] = _top_n(agg, gk, top_n) # Hourly trend (optional, for last 7d max) hourly: List[Dict] = [] if include_hourly and events: hourly_agg: Dict[str, Dict] = defaultdict(lambda: {"count": 0, "cost_units": 0.0}) for ev in events: bucket = _bucket_hour(ev.get("ts", "")) hourly_agg[bucket]["count"] += 1 hourly_agg[bucket]["cost_units"] = round( hourly_agg[bucket]["cost_units"] + compute_event_cost(ev), 4 ) hourly = [{"hour": k, **v} for k, v in sorted(hourly_agg.items())] return { "time_range": {"from": from_ts, "to": to_ts}, "totals": { "calls": total_calls, "cost_units": round(total_cost, 2), "failed": total_failed, "denied": total_denied, "error_rate": round(total_failed / (total_calls or 1), 4), }, "breakdowns": breakdowns, **({"hourly": hourly} if include_hourly else {}), } def action_top( store, window_hours: int = 24, top_n: int = 10, ) -> Dict[str, Any]: """ Quick top-N report for tools, agents, and users over window_hours. """ now = _now_utc() from_ts = _iso(now - datetime.timedelta(hours=window_hours)) to_ts = _iso(now) events = store.read(from_ts=from_ts, to_ts=to_ts, limit=100_000) top_tools = _top_n(_aggregate(events, ["tool"]), "tool", top_n) top_agents = _top_n(_aggregate(events, ["agent_id"]), "agent_id", top_n) top_users = _top_n(_aggregate(events, ["user_id"]), "user_id", top_n) top_workspaces = _top_n(_aggregate(events, ["workspace_id"]), "workspace_id", top_n) return { "window_hours": window_hours, "time_range": {"from": from_ts, "to": to_ts}, "total_calls": len(events), "top_tools": top_tools, "top_agents": top_agents, "top_users": top_users, "top_workspaces": top_workspaces, } def action_anomalies( store, window_minutes: int = 60, baseline_hours: int = 24, ratio_threshold: Optional[float] = None, min_calls: Optional[int] = None, tools_filter: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Detect cost/call spikes and elevated error rates. Algorithm: 1. Compute per-tool metrics for window [now-window_minutes, now] 2. Compute per-tool metrics for baseline [now-baseline_hours, now-window_minutes] 3. Spike = window_rate / baseline_rate >= ratio_threshold AND calls >= min_calls 4. Error spike = failed_rate > 10% AND calls >= min_calls """ cfg = _load_weights() anomaly_cfg = cfg.get("anomaly", {}) if ratio_threshold is None: ratio_threshold = float(anomaly_cfg.get("spike_ratio_threshold", 3.0)) if min_calls is None: min_calls = int(anomaly_cfg.get("min_calls_threshold", 10)) now = _now_utc() window_from = _iso(now - datetime.timedelta(minutes=window_minutes)) baseline_from = _iso(now - datetime.timedelta(hours=baseline_hours)) baseline_to = window_from # non-overlapping # Fetch both windows window_events = store.read(from_ts=window_from, to_ts=_iso(now), limit=50_000) baseline_events = store.read(from_ts=baseline_from, to_ts=baseline_to, limit=200_000) if tools_filter: window_events = [e for e in window_events if e.get("tool") in tools_filter] baseline_events = [e for e in baseline_events if e.get("tool") in tools_filter] # Aggregate by tool window_by_tool = _aggregate(window_events, ["tool"]) baseline_by_tool = _aggregate(baseline_events, ["tool"]) # Normalise baseline to per-minute rate baseline_minutes = (baseline_hours * 60) - window_minutes baseline_minutes = max(baseline_minutes, 1) window_minutes_actual = float(window_minutes) anomalies = [] all_tools = set(window_by_tool.keys()) | set(baseline_by_tool.keys()) for tool_key in sorted(all_tools): w = window_by_tool.get(tool_key, {}) b = baseline_by_tool.get(tool_key, {}) w_calls = w.get("count", 0) b_calls = b.get("count", 0) if w_calls < min_calls: continue # Not enough traffic for meaningful anomaly # Per-minute rates w_rate = w_calls / window_minutes_actual b_rate = b_calls / baseline_minutes if b_calls > 0 else 0.0 # Cost spike w_cost_pm = w.get("cost_units", 0) / window_minutes_actual b_cost_pm = b.get("cost_units", 0) / baseline_minutes if b_calls > 0 else 0.0 call_ratio = (w_rate / b_rate) if b_rate > 0 else float("inf") cost_ratio = (w_cost_pm / b_cost_pm) if b_cost_pm > 0 else float("inf") if call_ratio >= ratio_threshold or cost_ratio >= ratio_threshold: ratio_display = round(max(call_ratio, cost_ratio), 2) if ratio_display == float("inf"): ratio_display = "∞ (no baseline)" w_cost = w.get("cost_units", 0) b_cost = b.get("cost_units", 0) anomalies.append({ "type": "cost_spike", "key": f"tool:{tool_key}", "tool": tool_key, "window": f"last_{window_minutes}m", "baseline": f"prev_{baseline_hours}h", "window_calls": w_calls, "baseline_calls": b_calls, "window_cost_units": round(w_cost, 2), "baseline_cost_units": round(b_cost, 2), "ratio": ratio_display, "recommendation": _spike_recommendation(tool_key, ratio_display, w_calls), }) # Error rate spike w_err_rate = w.get("error_rate", 0) if w_err_rate > 0.10 and w_calls >= min_calls: anomalies.append({ "type": "error_spike", "key": f"tool:{tool_key}", "tool": tool_key, "window": f"last_{window_minutes}m", "failed_calls": w.get("failed_count", 0), "total_calls": w_calls, "error_rate": round(w_err_rate, 4), "recommendation": f"Investigate failures for '{tool_key}': {w.get('failed_count',0)} failed / {w_calls} calls ({round(w_err_rate*100,1)}% error rate).", }) # De-duplicate tool+type combos (error_spike already separate) seen = set() unique_anomalies = [] for a in anomalies: key = (a["type"], a.get("tool", "")) if key not in seen: unique_anomalies.append(a) seen.add(key) return { "anomalies": unique_anomalies, "anomaly_count": len(unique_anomalies), "window_minutes": window_minutes, "baseline_hours": baseline_hours, "ratio_threshold": ratio_threshold, "min_calls": min_calls, "stats": { "window_calls": len(window_events), "baseline_calls": len(baseline_events), }, } def action_weights(repo_root: Optional[str] = None) -> Dict[str, Any]: """Return current cost weights configuration.""" global _weights_cache _weights_cache = None # Force reload cfg = _load_weights() return { "defaults": cfg.get("defaults", {}), "tools": cfg.get("tools", {}), "anomaly": cfg.get("anomaly", {}), "config_path": _WEIGHTS_PATH, } # ─── Recommendation templates ───────────────────────────────────────────────── def _spike_recommendation(tool: str, ratio: Any, calls: int) -> str: cfg = _load_weights() tool_cfg = (cfg.get("tools") or {}).get(tool, {}) category = tool_cfg.get("category", "") if category == "media": return ( f"'{tool}' cost spike (ratio={ratio}, {calls} calls). " "Consider: rate-limit per workspace, queue with priority, review calling agents." ) if category == "release": return ( f"'{tool}' called more frequently than baseline (ratio={ratio}). " "Review if release_check is looping or being triggered too often." ) if category == "web": return ( f"'{tool}' spike (ratio={ratio}). Consider: result caching, dedup identical queries." ) return ( f"'{tool}' cost spike (ratio={ratio}, {calls} calls in window). " "Review caller agents and apply rate limits if needed." ) # ─── backend=auto store resolver ───────────────────────────────────────────── def _resolve_store(backend: str = "auto"): """ Return an AuditStore based on backend param. backend='auto' (default): uses the globally configured store (which may be AutoAuditStore, Postgres, or JSONL). backend='jsonl': forces JsonlAuditStore (7-day window max recommended). backend='memory': MemoryAuditStore (testing). """ from audit_store import get_audit_store, JsonlAuditStore, MemoryAuditStore if backend in ("auto", None, ""): return get_audit_store() if backend == "jsonl": import os from pathlib import Path audit_dir = os.getenv( "AUDIT_JSONL_DIR", str(Path(os.getenv("REPO_ROOT", ".")) / "ops" / "audit"), ) return JsonlAuditStore(audit_dir) if backend == "memory": return MemoryAuditStore() return get_audit_store() # ─── Digest action ──────────────────────────────────────────────────────────── def action_digest( store, window_hours: int = 24, baseline_hours: int = 168, # 7 days top_n: int = 10, max_markdown_chars: int = 3800, ) -> Dict: """ Daily/weekly cost digest: top tools/agents + anomalies + recommendations. Returns both structured JSON and a Telegram/markdown-friendly `markdown` field. """ now = _now_utc() window_from = _iso(now - datetime.timedelta(hours=window_hours)) window_to = _iso(now) baseline_from = _iso(now - datetime.timedelta(hours=baseline_hours)) # ── Top ────────────────────────────────────────────────────────────────── top_data = action_top(store, window_hours=window_hours, top_n=top_n) top_tools = top_data.get("top_tools") or [] top_agents = top_data.get("top_agents") or [] total_calls = top_data.get("total_calls", 0) # ── Anomalies ───────────────────────────────────────────────────────────── anomaly_data = action_anomalies( store, window_minutes=int(window_hours * 60 / 4), baseline_hours=baseline_hours, min_calls=5, ) anomalies = anomaly_data.get("anomalies") or [] # ── Total cost ──────────────────────────────────────────────────────────── events = store.read(from_ts=window_from, to_ts=window_to, limit=200_000) total_cost = sum(compute_event_cost(e) for e in events) failed = sum(1 for e in events if e.get("status") in ("failed", "error")) error_rate = round(failed / max(len(events), 1), 4) # ── Recommendations ─────────────────────────────────────────────────────── recs = [] for a in anomalies[:5]: r = a.get("recommendation", "") if r: recs.append(r) if error_rate > 0.05: recs.append(f"High error rate {round(error_rate*100,1)}% — investigate failing tools.") if top_tools and top_tools[0].get("cost_units", 0) > 500: tool_name = top_tools[0].get("tool", "?") recs.append(f"Top spender '{tool_name}' used {top_tools[0]['cost_units']:.0f} cost units — review frequency.") recs = list(dict.fromkeys(recs))[:8] # ── Markdown ───────────────────────────────────────────────────────────── period_label = f"Last {window_hours}h" if window_hours <= 48 else f"Last {window_hours//24}d" lines = [ f"📊 **Cost Digest** ({period_label})", f"Total calls: {total_calls} | Cost units: {total_cost:.0f} | Errors: {round(error_rate*100,1)}%", "", "**Top Tools:**", ] for t in top_tools[:5]: lines.append(f" • `{t.get('tool','?')}` — {t.get('cost_units',0):.1f}u, {t.get('count',0)} calls") lines.append("") lines.append("**Top Agents:**") for a in top_agents[:3]: lines.append(f" • `{a.get('agent_id','?')}` — {a.get('cost_units',0):.1f}u, {a.get('count',0)} calls") if anomalies: lines.append("") lines.append(f"⚠️ **{len(anomalies)} Anomaly(ies):**") for anm in anomalies[:3]: lines.append(f" • [{anm.get('type','?')}] `{anm.get('tool','?')}` ratio={anm.get('ratio','?')}") if recs: lines.append("") lines.append("💡 **Recommendations:**") for r in recs[:5]: lines.append(f" {r[:200]}") markdown = "\n".join(lines) if len(markdown) > max_markdown_chars: markdown = markdown[:max_markdown_chars] + "\n…[truncated]" return { "period": period_label, "window_hours": window_hours, "time_range": {"from": window_from, "to": window_to}, "totals": { "calls": total_calls, "cost_units": round(total_cost, 2), "failed": failed, "error_rate": error_rate, }, "top_tools": top_tools[:top_n], "top_agents": top_agents[:top_n], "anomalies": anomalies[:10], "anomaly_count": len(anomalies), "recommendations": recs, "markdown": markdown, } # ─── Main entrypoint ───────────────────────────────────────────────────────── def analyze_cost_dict(action: str, params: Optional[Dict] = None, store=None) -> Dict: """ Wrapper called by tool_manager handler. Returns plain dict for ToolResult. """ params = params or {} if store is None: backend = params.get("backend", "auto") store = _resolve_store(backend) if action == "digest": return action_digest( store, window_hours=int(params.get("window_hours", 24)), baseline_hours=int(params.get("baseline_hours", 168)), top_n=int(params.get("top_n", 10)), max_markdown_chars=int(params.get("max_markdown_chars", 3800)), ) if action == "report": return action_report( store, time_range=params.get("time_range"), group_by=params.get("group_by", ["tool"]), top_n=int(params.get("top_n", 10)), include_failed=bool(params.get("include_failed", True)), include_hourly=bool(params.get("include_hourly", False)), ) if action == "top": return action_top( store, window_hours=int(params.get("window_hours", 24)), top_n=int(params.get("top_n", 10)), ) if action == "anomalies": return action_anomalies( store, window_minutes=int(params.get("window_minutes", 60)), baseline_hours=int(params.get("baseline_hours", 24)), ratio_threshold=params.get("ratio_threshold"), min_calls=params.get("min_calls"), tools_filter=params.get("tools_filter"), ) if action == "weights": return action_weights() return {"error": f"Unknown action '{action}'. Valid: digest, report, top, anomalies, weights"}