gateway: add redis-backed city metrics poller and /v1/metrics/dashboard
This commit is contained in:
@@ -35,6 +35,10 @@ services:
|
|||||||
- STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt
|
- STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt
|
||||||
- OCR_SERVICE_URL=http://swapper-service:8890
|
- OCR_SERVICE_URL=http://swapper-service:8890
|
||||||
- WEB_SEARCH_SERVICE_URL=http://swapper-service:8890
|
- WEB_SEARCH_SERVICE_URL=http://swapper-service:8890
|
||||||
|
- REDIS_URL=redis://redis:6379/0
|
||||||
|
- CREWAI_SERVICE_URL=http://dagi-staging-crewai-service:9010
|
||||||
|
- NATURE_ID_URL=http://plant-vision-node1:8085
|
||||||
|
- NATURE_ID_MIN_CONFIDENCE=0.65
|
||||||
- ONEOK_CRM_BASE_URL=http://oneok-crm-adapter:8088
|
- ONEOK_CRM_BASE_URL=http://oneok-crm-adapter:8088
|
||||||
- ONEOK_CALC_BASE_URL=http://oneok-calc-adapter:8089
|
- ONEOK_CALC_BASE_URL=http://oneok-calc-adapter:8089
|
||||||
- ONEOK_DOCS_BASE_URL=http://oneok-docs-adapter:8090
|
- ONEOK_DOCS_BASE_URL=http://oneok-docs-adapter:8090
|
||||||
@@ -106,6 +110,28 @@ services:
|
|||||||
# Image Generation тепер інтегровано в Swapper Service (lazy loading)
|
# Image Generation тепер інтегровано в Swapper Service (lazy loading)
|
||||||
# Endpoint: POST /image/generate на swapper-service:8890
|
# Endpoint: POST /image/generate на swapper-service:8890
|
||||||
|
|
||||||
|
# Plant Vision wrapper (local nature-id CLI -> HTTP)
|
||||||
|
plant-vision-node1:
|
||||||
|
build:
|
||||||
|
context: ./services/plant-vision-node1
|
||||||
|
dockerfile: Dockerfile
|
||||||
|
container_name: plant-vision-node1
|
||||||
|
environment:
|
||||||
|
- NATURE_ID_CMD=${NATURE_ID_CMD:-python /opt/nature-id/nature_id.py -m plants -l -r 5 -s {image_path}}
|
||||||
|
- NATURE_ID_TIMEOUT=40
|
||||||
|
- DOWNLOAD_TIMEOUT=20
|
||||||
|
networks:
|
||||||
|
- dagi-network
|
||||||
|
volumes:
|
||||||
|
- ${DEPLOY_ROOT:-.}/third_party/nature-id:/opt/nature-id:ro
|
||||||
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD-SHELL", "python -c \"import urllib.request; urllib.request.urlopen('http://localhost:8085/health')\""]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
|
start_period: 15s
|
||||||
|
|
||||||
# Crawl4AI - Advanced Web Crawler with JavaScript support
|
# Crawl4AI - Advanced Web Crawler with JavaScript support
|
||||||
crawl4ai:
|
crawl4ai:
|
||||||
image: unclecode/crawl4ai@sha256:4d8b065bf185962733cb5f9701f4122d03383fa1ab6b5f6a9873f04fa0416a84
|
image: unclecode/crawl4ai@sha256:4d8b065bf185962733cb5f9701f4122d03383fa1ab6b5f6a9873f04fa0416a84
|
||||||
@@ -191,12 +217,15 @@ services:
|
|||||||
- STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt
|
- STT_SERVICE_UPLOAD_URL=http://swapper-service:8890/stt
|
||||||
- OCR_SERVICE_URL=http://swapper-service:8890
|
- OCR_SERVICE_URL=http://swapper-service:8890
|
||||||
- WEB_SEARCH_SERVICE_URL=http://swapper-service:8890
|
- WEB_SEARCH_SERVICE_URL=http://swapper-service:8890
|
||||||
|
- REDIS_URL=redis://redis:6379/0
|
||||||
|
- CREWAI_SERVICE_URL=http://dagi-staging-crewai-service:9010
|
||||||
volumes:
|
volumes:
|
||||||
- ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro
|
- ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro
|
||||||
- ${DEPLOY_ROOT:-.}/logs:/app/logs
|
- ${DEPLOY_ROOT:-.}/logs:/app/logs
|
||||||
depends_on:
|
depends_on:
|
||||||
- router
|
- router
|
||||||
- memory-service
|
- memory-service
|
||||||
|
- redis
|
||||||
networks:
|
networks:
|
||||||
- dagi-network
|
- dagi-network
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
@@ -207,6 +236,61 @@ services:
|
|||||||
retries: 3
|
retries: 3
|
||||||
start_period: 10s
|
start_period: 10s
|
||||||
|
|
||||||
|
|
||||||
|
gateway-worker:
|
||||||
|
build:
|
||||||
|
context: ./gateway-bot
|
||||||
|
dockerfile: Dockerfile
|
||||||
|
container_name: dagi-gateway-worker-node1
|
||||||
|
command: ["python", "-m", "daarion_facade.worker"]
|
||||||
|
environment:
|
||||||
|
- ROUTER_BASE_URL=http://router:8000
|
||||||
|
- REDIS_URL=redis://redis:6379/0
|
||||||
|
- ROUTER_WORKER_TIMEOUT=60
|
||||||
|
volumes:
|
||||||
|
- ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro
|
||||||
|
- ${DEPLOY_ROOT:-.}/logs:/app/logs
|
||||||
|
depends_on:
|
||||||
|
- router
|
||||||
|
- redis
|
||||||
|
networks:
|
||||||
|
- dagi-network
|
||||||
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "print(\"ok\")"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 3
|
||||||
|
|
||||||
|
|
||||||
|
metrics-poller-node1:
|
||||||
|
build:
|
||||||
|
context: ./gateway-bot
|
||||||
|
dockerfile: Dockerfile
|
||||||
|
container_name: dagi-metrics-poller-node1
|
||||||
|
command: ["python", "-m", "daarion_facade.metrics_poller"]
|
||||||
|
environment:
|
||||||
|
- REDIS_URL=redis://redis:6379/0
|
||||||
|
- MEMORY_SERVICE_URL=http://memory-service:8000
|
||||||
|
- DAARION_METRICS_POLL_INTERVAL_SECONDS=${DAARION_METRICS_POLL_INTERVAL_SECONDS:-10}
|
||||||
|
- DAARION_METRICS_TTL_SECONDS=${DAARION_METRICS_TTL_SECONDS:-60}
|
||||||
|
- DAARION_METRICS_HTTP_CONNECT_TIMEOUT_SECONDS=${DAARION_METRICS_HTTP_CONNECT_TIMEOUT_SECONDS:-2}
|
||||||
|
- DAARION_METRICS_HTTP_TOTAL_TIMEOUT_SECONDS=${DAARION_METRICS_HTTP_TOTAL_TIMEOUT_SECONDS:-5}
|
||||||
|
- DAARION_NODE_COUNT=${DAARION_NODE_COUNT:-1}
|
||||||
|
volumes:
|
||||||
|
- ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro
|
||||||
|
- ${DEPLOY_ROOT:-.}/logs:/app/logs
|
||||||
|
depends_on:
|
||||||
|
- redis
|
||||||
|
- memory-service
|
||||||
|
networks:
|
||||||
|
- dagi-network
|
||||||
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "print(\"ok\")"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 3
|
||||||
# CLAN Consent Outbox Worker (Postgres event-store applier; no execute)
|
# CLAN Consent Outbox Worker (Postgres event-store applier; no execute)
|
||||||
clan-consent-outbox-worker:
|
clan-consent-outbox-worker:
|
||||||
build:
|
build:
|
||||||
@@ -736,10 +820,11 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "9108:9108"
|
- "9108:9108"
|
||||||
environment:
|
environment:
|
||||||
- GATEWAY_URL=http://172.18.0.18:9300
|
- GATEWAY_URL=http://gateway:9300
|
||||||
- PROBE_INTERVAL=60
|
- PROBE_INTERVAL=60
|
||||||
- PROBE_TIMEOUT=30
|
- PROBE_TIMEOUT=30
|
||||||
- METRICS_PORT=9108
|
- METRICS_PORT=9108
|
||||||
|
- SEMANTIC_AGENTS=clan,sofiia,monitor,helion,agromatrix,senpai
|
||||||
networks:
|
networks:
|
||||||
- dagi-network
|
- dagi-network
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|||||||
287
gateway-bot/daarion_facade/metrics_poller.py
Normal file
287
gateway-bot/daarion_facade/metrics_poller.py
Normal file
@@ -0,0 +1,287 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from redis.asyncio import Redis
|
||||||
|
|
||||||
|
from .registry_api import _load_crewai_roles, _load_district_registry, _load_registry
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
||||||
|
logger = logging.getLogger("daarion-metrics-poller")
|
||||||
|
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
|
||||||
|
POLL_INTERVAL_SECONDS = int(os.getenv("DAARION_METRICS_POLL_INTERVAL_SECONDS", "10"))
|
||||||
|
METRICS_TTL_SECONDS = int(os.getenv("DAARION_METRICS_TTL_SECONDS", "60"))
|
||||||
|
HTTP_CONNECT_TIMEOUT_SECONDS = float(os.getenv("DAARION_METRICS_HTTP_CONNECT_TIMEOUT_SECONDS", "2"))
|
||||||
|
HTTP_TOTAL_TIMEOUT_SECONDS = float(os.getenv("DAARION_METRICS_HTTP_TOTAL_TIMEOUT_SECONDS", "5"))
|
||||||
|
NODES_TOTAL = int(os.getenv("DAARION_NODE_COUNT", "1"))
|
||||||
|
MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
|
||||||
|
|
||||||
|
DASHBOARD_KEY = "daarion:metrics:dashboard"
|
||||||
|
DISTRICT_KEY_PREFIX = "daarion:metrics:district"
|
||||||
|
|
||||||
|
_redis: Optional[Redis] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_url(value: str) -> str:
|
||||||
|
value = (value or "").strip()
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
if value.startswith("http://") or value.startswith("https://"):
|
||||||
|
return value
|
||||||
|
return f"https://{value}"
|
||||||
|
|
||||||
|
|
||||||
|
def _health_candidates(district: Dict[str, Any]) -> List[str]:
|
||||||
|
base = _ensure_url(str(district.get("domain") or ""))
|
||||||
|
candidates: List[str] = []
|
||||||
|
|
||||||
|
explicit = str(district.get("health_url") or "").strip()
|
||||||
|
if explicit:
|
||||||
|
candidates.append(_ensure_url(explicit))
|
||||||
|
|
||||||
|
if base:
|
||||||
|
candidates.extend(
|
||||||
|
[
|
||||||
|
f"{base}/.well-known/daarion-health.json",
|
||||||
|
f"{base}/health",
|
||||||
|
f"{base}/v1/health",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
dedup: List[str] = []
|
||||||
|
seen = set()
|
||||||
|
for url in candidates:
|
||||||
|
if url and url not in seen:
|
||||||
|
dedup.append(url)
|
||||||
|
seen.add(url)
|
||||||
|
return dedup
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_agents_online(payload: Dict[str, Any], agents_total: int) -> Optional[int]:
|
||||||
|
raw = payload.get("agents_online")
|
||||||
|
if isinstance(raw, bool):
|
||||||
|
return agents_total if raw else 0
|
||||||
|
if isinstance(raw, int):
|
||||||
|
return max(0, min(raw, agents_total))
|
||||||
|
|
||||||
|
agents = payload.get("agents")
|
||||||
|
if isinstance(agents, list):
|
||||||
|
count = 0
|
||||||
|
for agent in agents:
|
||||||
|
if not isinstance(agent, dict):
|
||||||
|
continue
|
||||||
|
status = str(agent.get("status", "")).lower()
|
||||||
|
if status in {"online", "active", "ok"}:
|
||||||
|
count += 1
|
||||||
|
return min(count, agents_total)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def redis_client() -> Redis:
|
||||||
|
global _redis
|
||||||
|
if _redis is None:
|
||||||
|
_redis = Redis.from_url(REDIS_URL, decode_responses=True)
|
||||||
|
return _redis
|
||||||
|
|
||||||
|
|
||||||
|
async def close_redis() -> None:
|
||||||
|
global _redis
|
||||||
|
if _redis is not None:
|
||||||
|
await _redis.close()
|
||||||
|
_redis = None
|
||||||
|
|
||||||
|
|
||||||
|
async def _fetch_json_with_latency(
|
||||||
|
client: httpx.AsyncClient,
|
||||||
|
url: str,
|
||||||
|
) -> Tuple[bool, Optional[Dict[str, Any]], Optional[float], Optional[str]]:
|
||||||
|
started = time.perf_counter()
|
||||||
|
try:
|
||||||
|
response = await client.get(url)
|
||||||
|
latency_ms = round((time.perf_counter() - started) * 1000, 2)
|
||||||
|
if response.status_code >= 400:
|
||||||
|
return False, None, latency_ms, f"HTTP {response.status_code}"
|
||||||
|
|
||||||
|
data: Optional[Dict[str, Any]] = None
|
||||||
|
try:
|
||||||
|
parsed = response.json()
|
||||||
|
if isinstance(parsed, dict):
|
||||||
|
data = parsed
|
||||||
|
except Exception:
|
||||||
|
data = None
|
||||||
|
|
||||||
|
return True, data, latency_ms, None
|
||||||
|
except Exception as e:
|
||||||
|
latency_ms = round((time.perf_counter() - started) * 1000, 2)
|
||||||
|
return False, None, latency_ms, str(e)
|
||||||
|
|
||||||
|
|
||||||
|
async def _read_memory_vectors(client: httpx.AsyncClient) -> int:
|
||||||
|
try:
|
||||||
|
ok, payload, _, _ = await _fetch_json_with_latency(client, f"{MEMORY_SERVICE_URL}/health")
|
||||||
|
if not ok or not payload:
|
||||||
|
return 0
|
||||||
|
return int(payload.get("vector_store", {}).get("memories", {}).get("vectors_count", 0) or 0)
|
||||||
|
except Exception:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
async def _registry_snapshot() -> Tuple[List[Dict[str, Any]], Dict[str, List[Dict[str, Any]]], int, int]:
|
||||||
|
raw_districts = _load_district_registry().get("districts", [])
|
||||||
|
districts = [d for d in raw_districts if isinstance(d, dict) and d.get("district_id")]
|
||||||
|
|
||||||
|
agents_map = _load_registry().get("agents", {})
|
||||||
|
role_counts = await _load_crewai_roles()
|
||||||
|
|
||||||
|
by_district: Dict[str, List[Dict[str, Any]]] = {}
|
||||||
|
subagents_total = 0
|
||||||
|
|
||||||
|
for aid, cfg in agents_map.items():
|
||||||
|
if not isinstance(cfg, dict):
|
||||||
|
continue
|
||||||
|
aid_str = str(aid)
|
||||||
|
district_id = str(cfg.get("district_id") or "city-core")
|
||||||
|
subagents_total += int(role_counts.get(aid_str, 0))
|
||||||
|
|
||||||
|
by_district.setdefault(district_id, []).append(
|
||||||
|
{
|
||||||
|
"agent_id": aid_str,
|
||||||
|
"status": str(cfg.get("status", "active")),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return districts, by_district, len(agents_map), subagents_total
|
||||||
|
|
||||||
|
|
||||||
|
async def build_dashboard() -> Dict[str, Any]:
|
||||||
|
districts, agents_by_district, agents_total, subagents_total = await _registry_snapshot()
|
||||||
|
timeout = httpx.Timeout(timeout=HTTP_TOTAL_TIMEOUT_SECONDS, connect=HTTP_CONNECT_TIMEOUT_SECONDS)
|
||||||
|
|
||||||
|
by_district: List[Dict[str, Any]] = []
|
||||||
|
districts_online = 0
|
||||||
|
agents_online_total = 0
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
|
||||||
|
memory_vectors = await _read_memory_vectors(client)
|
||||||
|
|
||||||
|
for district in districts:
|
||||||
|
district_id = str(district.get("district_id"))
|
||||||
|
title = district.get("title") or district_id
|
||||||
|
domain = str(district.get("domain") or "")
|
||||||
|
status = district.get("status") or "active"
|
||||||
|
members = agents_by_district.get(district_id, [])
|
||||||
|
agents_total_district = len(members)
|
||||||
|
|
||||||
|
sample = {
|
||||||
|
"district_id": district_id,
|
||||||
|
"title": title,
|
||||||
|
"domain": domain,
|
||||||
|
"status": status,
|
||||||
|
"ok": False,
|
||||||
|
"agents_total": agents_total_district,
|
||||||
|
"agents_online": 0,
|
||||||
|
"latency_ms": None,
|
||||||
|
"last_check_ts": _now_iso(),
|
||||||
|
"error": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
last_error = "No health endpoint configured"
|
||||||
|
for candidate in _health_candidates(district):
|
||||||
|
ok, payload, latency_ms, error_message = await _fetch_json_with_latency(client, candidate)
|
||||||
|
sample["latency_ms"] = latency_ms
|
||||||
|
if ok:
|
||||||
|
sample["ok"] = True
|
||||||
|
sample["error"] = None
|
||||||
|
inferred = _extract_agents_online(payload or {}, agents_total_district)
|
||||||
|
sample["agents_online"] = inferred if inferred is not None else agents_total_district
|
||||||
|
break
|
||||||
|
last_error = error_message or "health check failed"
|
||||||
|
|
||||||
|
if sample["ok"]:
|
||||||
|
districts_online += 1
|
||||||
|
agents_online_total += int(sample.get("agents_online") or 0)
|
||||||
|
else:
|
||||||
|
sample["error"] = {"message": last_error}
|
||||||
|
|
||||||
|
by_district.append(sample)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"global": {
|
||||||
|
"nodes": NODES_TOTAL,
|
||||||
|
"districts": len(districts),
|
||||||
|
"agents": agents_total,
|
||||||
|
"subagents": subagents_total,
|
||||||
|
"memory_vectors": memory_vectors,
|
||||||
|
"districts_online": districts_online,
|
||||||
|
"agents_online": agents_online_total,
|
||||||
|
},
|
||||||
|
"by_district": by_district,
|
||||||
|
"updated_at": _now_iso(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def publish_dashboard(dashboard: Dict[str, Any]) -> None:
|
||||||
|
redis = await redis_client()
|
||||||
|
payload = json.dumps(dashboard, ensure_ascii=False)
|
||||||
|
await redis.set(DASHBOARD_KEY, payload, ex=METRICS_TTL_SECONDS)
|
||||||
|
|
||||||
|
for row in dashboard.get("by_district", []):
|
||||||
|
district_id = row.get("district_id")
|
||||||
|
if not district_id:
|
||||||
|
continue
|
||||||
|
key = f"{DISTRICT_KEY_PREFIX}:{district_id}"
|
||||||
|
await redis.set(key, json.dumps(row, ensure_ascii=False), ex=METRICS_TTL_SECONDS)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_once() -> None:
|
||||||
|
dashboard = await build_dashboard()
|
||||||
|
await publish_dashboard(dashboard)
|
||||||
|
logger.info(
|
||||||
|
"dashboard_updated districts=%s districts_online=%s agents=%s agents_online=%s",
|
||||||
|
dashboard["global"].get("districts"),
|
||||||
|
dashboard["global"].get("districts_online"),
|
||||||
|
dashboard["global"].get("agents"),
|
||||||
|
dashboard["global"].get("agents_online"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def worker_loop() -> None:
|
||||||
|
logger.info(
|
||||||
|
"metrics_poller_started interval=%ss ttl=%ss redis=%s",
|
||||||
|
POLL_INTERVAL_SECONDS,
|
||||||
|
METRICS_TTL_SECONDS,
|
||||||
|
REDIS_URL,
|
||||||
|
)
|
||||||
|
while True:
|
||||||
|
started = time.perf_counter()
|
||||||
|
try:
|
||||||
|
await run_once()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("metrics_poller_cycle_failed")
|
||||||
|
|
||||||
|
elapsed = time.perf_counter() - started
|
||||||
|
sleep_for = max(1.0, POLL_INTERVAL_SECONDS - elapsed)
|
||||||
|
await asyncio.sleep(sleep_for)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
asyncio.run(worker_loop())
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
asyncio.run(close_redis())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
268
gateway-bot/daarion_facade/registry_api.py
Normal file
268
gateway-bot/daarion_facade/registry_api.py
Normal file
@@ -0,0 +1,268 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from fastapi import APIRouter
|
||||||
|
from redis.asyncio import Redis
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/v1", tags=["daarion-facade"])
|
||||||
|
|
||||||
|
REGISTRY_CACHE_TTL = int(os.getenv("REGISTRY_CACHE_TTL", "30"))
|
||||||
|
MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
|
||||||
|
CREWAI_SERVICE_URL = os.getenv("CREWAI_SERVICE_URL", "http://dagi-staging-crewai-service:9010")
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
|
||||||
|
METRICS_DASHBOARD_KEY = "daarion:metrics:dashboard"
|
||||||
|
|
||||||
|
_REGISTRY_CACHE: Dict[str, Any] = {"loaded_at": 0.0, "data": None}
|
||||||
|
_DISTRICT_CACHE: Dict[str, Any] = {"loaded_at": 0.0, "data": None}
|
||||||
|
_CREWAI_CACHE: Dict[str, Any] = {"loaded_at": 0.0, "data": {}}
|
||||||
|
_REDIS: Optional[Redis] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def _registry_paths() -> List[Path]:
|
||||||
|
return [
|
||||||
|
Path("/app/gateway-bot/agent_registry.json"),
|
||||||
|
Path("/opt/microdao-daarion/config/agent_registry.json"),
|
||||||
|
Path(__file__).resolve().parents[1] / "agent_registry.json",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _district_paths() -> List[Path]:
|
||||||
|
return [
|
||||||
|
Path("/app/gateway-bot/district_registry.json"),
|
||||||
|
Path(__file__).resolve().parents[1] / "district_registry.json",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _load_registry() -> Dict[str, Any]:
|
||||||
|
now = time.time()
|
||||||
|
if _REGISTRY_CACHE.get("data") and (now - _REGISTRY_CACHE.get("loaded_at", 0.0) < REGISTRY_CACHE_TTL):
|
||||||
|
return _REGISTRY_CACHE["data"]
|
||||||
|
|
||||||
|
for path in _registry_paths():
|
||||||
|
if path.exists():
|
||||||
|
with path.open("r", encoding="utf-8") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
_REGISTRY_CACHE.update({"loaded_at": now, "data": data})
|
||||||
|
return data
|
||||||
|
|
||||||
|
data = {"agents": {}}
|
||||||
|
_REGISTRY_CACHE.update({"loaded_at": now, "data": data})
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def _load_district_registry() -> Dict[str, Any]:
|
||||||
|
now = time.time()
|
||||||
|
if _DISTRICT_CACHE.get("data") and (now - _DISTRICT_CACHE.get("loaded_at", 0.0) < REGISTRY_CACHE_TTL):
|
||||||
|
return _DISTRICT_CACHE["data"]
|
||||||
|
|
||||||
|
for path in _district_paths():
|
||||||
|
if path.exists():
|
||||||
|
with path.open("r", encoding="utf-8") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
_DISTRICT_CACHE.update({"loaded_at": now, "data": data})
|
||||||
|
return data
|
||||||
|
|
||||||
|
data = {"districts": []}
|
||||||
|
_DISTRICT_CACHE.update({"loaded_at": now, "data": data})
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
async def _redis_client() -> Redis:
|
||||||
|
global _REDIS
|
||||||
|
if _REDIS is None:
|
||||||
|
_REDIS = Redis.from_url(REDIS_URL, decode_responses=True)
|
||||||
|
return _REDIS
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_cached_dashboard() -> Optional[Dict[str, Any]]:
|
||||||
|
try:
|
||||||
|
r = await _redis_client()
|
||||||
|
raw = await r.get(METRICS_DASHBOARD_KEY)
|
||||||
|
if not raw:
|
||||||
|
return None
|
||||||
|
return json.loads(raw)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_crewai_roles() -> Dict[str, int]:
|
||||||
|
now = time.time()
|
||||||
|
if now - _CREWAI_CACHE.get("loaded_at", 0.0) < REGISTRY_CACHE_TTL:
|
||||||
|
return _CREWAI_CACHE.get("data", {})
|
||||||
|
|
||||||
|
out: Dict[str, int] = {}
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=8.0) as client:
|
||||||
|
resp = await client.get(f"{CREWAI_SERVICE_URL}/crew/agents")
|
||||||
|
if resp.status_code == 200:
|
||||||
|
payload = resp.json()
|
||||||
|
for aid, info in payload.items():
|
||||||
|
default_roles = info.get("default_roles")
|
||||||
|
out[str(aid)] = int(default_roles) if isinstance(default_roles, int) else 0
|
||||||
|
except Exception:
|
||||||
|
out = {}
|
||||||
|
|
||||||
|
_CREWAI_CACHE.update({"loaded_at": now, "data": out})
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/registry/agents")
|
||||||
|
async def get_agents() -> Dict[str, Any]:
|
||||||
|
reg = _load_registry()
|
||||||
|
agents = reg.get("agents", {}) if isinstance(reg, dict) else {}
|
||||||
|
role_counts = await _load_crewai_roles()
|
||||||
|
|
||||||
|
items: List[Dict[str, Any]] = []
|
||||||
|
for agent_id, cfg in agents.items():
|
||||||
|
if not isinstance(cfg, dict):
|
||||||
|
continue
|
||||||
|
domains = cfg.get("domains") or []
|
||||||
|
district_id = cfg.get("district_id") or "city-core"
|
||||||
|
items.append(
|
||||||
|
{
|
||||||
|
"agent_id": agent_id,
|
||||||
|
"title": cfg.get("display_name") or agent_id,
|
||||||
|
"role": cfg.get("canonical_role") or "",
|
||||||
|
"domain_primary": domains[0] if domains else "general",
|
||||||
|
"domain_aliases": domains[1:] if len(domains) > 1 else [],
|
||||||
|
"visibility": cfg.get("visibility", "public"),
|
||||||
|
"status": cfg.get("status", "active"),
|
||||||
|
"team": {"subagents_total": role_counts.get(agent_id, 0)},
|
||||||
|
"district_id": district_id,
|
||||||
|
"avatar_url": cfg.get("avatar_url"),
|
||||||
|
"health_url": cfg.get("health_url"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {"items": items, "total": len(items)}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/registry/districts")
|
||||||
|
async def get_districts() -> Dict[str, Any]:
|
||||||
|
agents_payload = await get_agents()
|
||||||
|
agents = agents_payload.get("items", [])
|
||||||
|
by_district: Dict[str, List[Dict[str, Any]]] = {}
|
||||||
|
for a in agents:
|
||||||
|
by_district.setdefault(a.get("district_id", "city-core"), []).append(a)
|
||||||
|
|
||||||
|
catalog = _load_district_registry().get("districts", [])
|
||||||
|
catalog_by_id: Dict[str, Dict[str, Any]] = {
|
||||||
|
str(d.get("district_id")): d for d in catalog if isinstance(d, dict) and d.get("district_id")
|
||||||
|
}
|
||||||
|
|
||||||
|
district_ids = sorted(set(catalog_by_id.keys()) | set(by_district.keys()))
|
||||||
|
items: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
|
for district_id in district_ids:
|
||||||
|
members = by_district.get(district_id, [])
|
||||||
|
base = catalog_by_id.get(district_id, {})
|
||||||
|
domain = base.get("domain") or ("daarion.city" if district_id == "city-core" else f"{district_id}.daarion.city")
|
||||||
|
|
||||||
|
lead_agent_id = base.get("lead_agent_id")
|
||||||
|
if not lead_agent_id:
|
||||||
|
if district_id == "city-core" and any(m.get("agent_id") == "daarwizz" for m in members):
|
||||||
|
lead_agent_id = "daarwizz"
|
||||||
|
elif members:
|
||||||
|
lead_agent_id = members[0].get("agent_id")
|
||||||
|
else:
|
||||||
|
lead_agent_id = None
|
||||||
|
|
||||||
|
items.append(
|
||||||
|
{
|
||||||
|
"district_id": district_id,
|
||||||
|
"title": base.get("title") or district_id.replace("-", " ").title(),
|
||||||
|
"domain": domain,
|
||||||
|
"status": base.get("status", "active"),
|
||||||
|
"logo_url": base.get("logo_url"),
|
||||||
|
"health_url": base.get("health_url"),
|
||||||
|
"well_known": {
|
||||||
|
"manifest": f"https://{domain}/.well-known/daarion-district.json",
|
||||||
|
"health": f"https://{domain}/.well-known/daarion-health.json",
|
||||||
|
"capabilities": f"https://{domain}/.well-known/daarion-capabilities.json",
|
||||||
|
},
|
||||||
|
"lead_agent_id": lead_agent_id,
|
||||||
|
"agents_total": len(members),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {"items": items, "total": len(items)}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/metrics")
|
||||||
|
async def get_metrics() -> Dict[str, Any]:
|
||||||
|
agents_payload = await get_agents()
|
||||||
|
districts_payload = await get_districts()
|
||||||
|
agents = agents_payload.get("items", [])
|
||||||
|
|
||||||
|
memory_vectors = 0
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||||
|
resp = await client.get(f"{MEMORY_SERVICE_URL}/health")
|
||||||
|
if resp.status_code == 200:
|
||||||
|
data = resp.json()
|
||||||
|
memory_vectors = int(
|
||||||
|
data.get("vector_store", {})
|
||||||
|
.get("memories", {})
|
||||||
|
.get("vectors_count", 0)
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
memory_vectors = 0
|
||||||
|
|
||||||
|
return {
|
||||||
|
"nodes": 1,
|
||||||
|
"districts": districts_payload.get("total", 0),
|
||||||
|
"agents": len(agents),
|
||||||
|
"subagents": sum(int((a.get("team") or {}).get("subagents_total", 0)) for a in agents),
|
||||||
|
"memory_vectors": memory_vectors,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/metrics/dashboard")
|
||||||
|
async def get_metrics_dashboard() -> Dict[str, Any]:
|
||||||
|
cached = await _load_cached_dashboard()
|
||||||
|
if cached:
|
||||||
|
return cached
|
||||||
|
|
||||||
|
metrics = await get_metrics()
|
||||||
|
districts_payload = await get_districts()
|
||||||
|
districts = districts_payload.get("items", [])
|
||||||
|
|
||||||
|
by_district = []
|
||||||
|
for d in districts:
|
||||||
|
by_district.append(
|
||||||
|
{
|
||||||
|
"district_id": d.get("district_id"),
|
||||||
|
"title": d.get("title"),
|
||||||
|
"domain": d.get("domain"),
|
||||||
|
"status": d.get("status"),
|
||||||
|
"ok": None,
|
||||||
|
"agents_total": d.get("agents_total", 0),
|
||||||
|
"agents_online": None,
|
||||||
|
"latency_ms": None,
|
||||||
|
"last_check_ts": None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"global": {
|
||||||
|
"nodes": metrics.get("nodes", 1),
|
||||||
|
"districts": metrics.get("districts", 0),
|
||||||
|
"agents": metrics.get("agents", 0),
|
||||||
|
"subagents": metrics.get("subagents", 0),
|
||||||
|
"memory_vectors": metrics.get("memory_vectors", 0),
|
||||||
|
"districts_online": 0,
|
||||||
|
"agents_online": 0,
|
||||||
|
},
|
||||||
|
"by_district": by_district,
|
||||||
|
"updated_at": _now_iso(),
|
||||||
|
"source": "fallback_registry",
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user