Files
microdao-daarion/gateway-bot/daarion_facade/metrics_poller.py

288 lines
9.3 KiB
Python

import asyncio
import json
import logging
import os
import time
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
import httpx
from redis.asyncio import Redis
from .registry_api import _load_crewai_roles, _load_district_registry, _load_registry
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger("daarion-metrics-poller")
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
POLL_INTERVAL_SECONDS = int(os.getenv("DAARION_METRICS_POLL_INTERVAL_SECONDS", "10"))
METRICS_TTL_SECONDS = int(os.getenv("DAARION_METRICS_TTL_SECONDS", "60"))
HTTP_CONNECT_TIMEOUT_SECONDS = float(os.getenv("DAARION_METRICS_HTTP_CONNECT_TIMEOUT_SECONDS", "2"))
HTTP_TOTAL_TIMEOUT_SECONDS = float(os.getenv("DAARION_METRICS_HTTP_TOTAL_TIMEOUT_SECONDS", "5"))
NODES_TOTAL = int(os.getenv("DAARION_NODE_COUNT", "1"))
MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
DASHBOARD_KEY = "daarion:metrics:dashboard"
DISTRICT_KEY_PREFIX = "daarion:metrics:district"
_redis: Optional[Redis] = None
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _ensure_url(value: str) -> str:
value = (value or "").strip()
if not value:
return ""
if value.startswith("http://") or value.startswith("https://"):
return value
return f"https://{value}"
def _health_candidates(district: Dict[str, Any]) -> List[str]:
base = _ensure_url(str(district.get("domain") or ""))
candidates: List[str] = []
explicit = str(district.get("health_url") or "").strip()
if explicit:
candidates.append(_ensure_url(explicit))
if base:
candidates.extend(
[
f"{base}/.well-known/daarion-health.json",
f"{base}/health",
f"{base}/v1/health",
]
)
dedup: List[str] = []
seen = set()
for url in candidates:
if url and url not in seen:
dedup.append(url)
seen.add(url)
return dedup
def _extract_agents_online(payload: Dict[str, Any], agents_total: int) -> Optional[int]:
raw = payload.get("agents_online")
if isinstance(raw, bool):
return agents_total if raw else 0
if isinstance(raw, int):
return max(0, min(raw, agents_total))
agents = payload.get("agents")
if isinstance(agents, list):
count = 0
for agent in agents:
if not isinstance(agent, dict):
continue
status = str(agent.get("status", "")).lower()
if status in {"online", "active", "ok"}:
count += 1
return min(count, agents_total)
return None
async def redis_client() -> Redis:
global _redis
if _redis is None:
_redis = Redis.from_url(REDIS_URL, decode_responses=True)
return _redis
async def close_redis() -> None:
global _redis
if _redis is not None:
await _redis.close()
_redis = None
async def _fetch_json_with_latency(
client: httpx.AsyncClient,
url: str,
) -> Tuple[bool, Optional[Dict[str, Any]], Optional[float], Optional[str]]:
started = time.perf_counter()
try:
response = await client.get(url)
latency_ms = round((time.perf_counter() - started) * 1000, 2)
if response.status_code >= 400:
return False, None, latency_ms, f"HTTP {response.status_code}"
data: Optional[Dict[str, Any]] = None
try:
parsed = response.json()
if isinstance(parsed, dict):
data = parsed
except Exception:
data = None
return True, data, latency_ms, None
except Exception as e:
latency_ms = round((time.perf_counter() - started) * 1000, 2)
return False, None, latency_ms, str(e)
async def _read_memory_vectors(client: httpx.AsyncClient) -> int:
try:
ok, payload, _, _ = await _fetch_json_with_latency(client, f"{MEMORY_SERVICE_URL}/health")
if not ok or not payload:
return 0
return int(payload.get("vector_store", {}).get("memories", {}).get("vectors_count", 0) or 0)
except Exception:
return 0
async def _registry_snapshot() -> Tuple[List[Dict[str, Any]], Dict[str, List[Dict[str, Any]]], int, int]:
raw_districts = _load_district_registry().get("districts", [])
districts = [d for d in raw_districts if isinstance(d, dict) and d.get("district_id")]
agents_map = _load_registry().get("agents", {})
role_counts = await _load_crewai_roles()
by_district: Dict[str, List[Dict[str, Any]]] = {}
subagents_total = 0
for aid, cfg in agents_map.items():
if not isinstance(cfg, dict):
continue
aid_str = str(aid)
district_id = str(cfg.get("district_id") or "city-core")
subagents_total += int(role_counts.get(aid_str, 0))
by_district.setdefault(district_id, []).append(
{
"agent_id": aid_str,
"status": str(cfg.get("status", "active")),
}
)
return districts, by_district, len(agents_map), subagents_total
async def build_dashboard() -> Dict[str, Any]:
districts, agents_by_district, agents_total, subagents_total = await _registry_snapshot()
timeout = httpx.Timeout(timeout=HTTP_TOTAL_TIMEOUT_SECONDS, connect=HTTP_CONNECT_TIMEOUT_SECONDS)
by_district: List[Dict[str, Any]] = []
districts_online = 0
agents_online_total = 0
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
memory_vectors = await _read_memory_vectors(client)
for district in districts:
district_id = str(district.get("district_id"))
title = district.get("title") or district_id
domain = str(district.get("domain") or "")
status = district.get("status") or "active"
members = agents_by_district.get(district_id, [])
agents_total_district = len(members)
sample = {
"district_id": district_id,
"title": title,
"domain": domain,
"status": status,
"ok": False,
"agents_total": agents_total_district,
"agents_online": 0,
"latency_ms": None,
"last_check_ts": _now_iso(),
"error": None,
}
last_error = "No health endpoint configured"
for candidate in _health_candidates(district):
ok, payload, latency_ms, error_message = await _fetch_json_with_latency(client, candidate)
sample["latency_ms"] = latency_ms
if ok:
sample["ok"] = True
sample["error"] = None
inferred = _extract_agents_online(payload or {}, agents_total_district)
sample["agents_online"] = inferred if inferred is not None else agents_total_district
break
last_error = error_message or "health check failed"
if sample["ok"]:
districts_online += 1
agents_online_total += int(sample.get("agents_online") or 0)
else:
sample["error"] = {"message": last_error}
by_district.append(sample)
return {
"global": {
"nodes": NODES_TOTAL,
"districts": len(districts),
"agents": agents_total,
"subagents": subagents_total,
"memory_vectors": memory_vectors,
"districts_online": districts_online,
"agents_online": agents_online_total,
},
"by_district": by_district,
"updated_at": _now_iso(),
}
async def publish_dashboard(dashboard: Dict[str, Any]) -> None:
redis = await redis_client()
payload = json.dumps(dashboard, ensure_ascii=False)
await redis.set(DASHBOARD_KEY, payload, ex=METRICS_TTL_SECONDS)
for row in dashboard.get("by_district", []):
district_id = row.get("district_id")
if not district_id:
continue
key = f"{DISTRICT_KEY_PREFIX}:{district_id}"
await redis.set(key, json.dumps(row, ensure_ascii=False), ex=METRICS_TTL_SECONDS)
async def run_once() -> None:
dashboard = await build_dashboard()
await publish_dashboard(dashboard)
logger.info(
"dashboard_updated districts=%s districts_online=%s agents=%s agents_online=%s",
dashboard["global"].get("districts"),
dashboard["global"].get("districts_online"),
dashboard["global"].get("agents"),
dashboard["global"].get("agents_online"),
)
async def worker_loop() -> None:
logger.info(
"metrics_poller_started interval=%ss ttl=%ss redis=%s",
POLL_INTERVAL_SECONDS,
METRICS_TTL_SECONDS,
REDIS_URL,
)
while True:
started = time.perf_counter()
try:
await run_once()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("metrics_poller_cycle_failed")
elapsed = time.perf_counter() - started
sleep_for = max(1.0, POLL_INTERVAL_SECONDS - elapsed)
await asyncio.sleep(sleep_for)
if __name__ == "__main__":
try:
asyncio.run(worker_loop())
finally:
try:
asyncio.run(close_redis())
except Exception:
pass