#!/usr/bin/env python3 """ E2E Agent Prober - перевіряє що агенти відповідають Експортує Prometheus метрики на :9108/metrics """ import asyncio import time import os import logging from datetime import datetime from prometheus_client import start_http_server, Counter, Gauge, Histogram import httpx logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # Configuration GATEWAY_URL = os.getenv("GATEWAY_URL", "http://gateway:9300") ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000") PROBE_INTERVAL = int(os.getenv("PROBE_INTERVAL", "60")) # seconds PROBE_TIMEOUT = int(os.getenv("PROBE_TIMEOUT", "30")) # seconds SEMANTIC_TIMEOUT = int(os.getenv("SEMANTIC_TIMEOUT", "45")) # seconds METRICS_PORT = int(os.getenv("METRICS_PORT", "9108")) SEMANTIC_PROBE_ENABLED = os.getenv("SEMANTIC_PROBE_ENABLED", "true").lower() == "true" SEMANTIC_AGENTS = [a.strip() for a in os.getenv("SEMANTIC_AGENTS", "clan,sofiia,monitor").split(",") if a.strip()] SEMANTIC_PROMPT = os.getenv("SEMANTIC_PROMPT", "Коротко: хто такий DAARWIZZ?") SEMANTIC_EXPECT_KEYWORD = os.getenv("SEMANTIC_EXPECT_KEYWORD", "daarwizz").lower() MONITOR_EXPECT_LOCAL = os.getenv("MONITOR_EXPECT_LOCAL", "true").lower() == "true" # Prometheus metrics agent_e2e_success = Gauge('agent_e2e_success', 'Whether last E2E probe succeeded', ['target']) agent_e2e_latency = Gauge('agent_e2e_latency_seconds', 'Latency of last E2E probe', ['target']) agent_e2e_failures_total = Counter('agent_e2e_failures_total', 'Total E2E probe failures', ['target', 'reason']) agent_e2e_runs_total = Counter('agent_e2e_runs_total', 'Total E2E probe runs', ['target']) # Histogram for latency distribution agent_e2e_latency_histogram = Histogram( 'agent_e2e_latency_histogram_seconds', 'E2E probe latency distribution', ['target'], buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60] ) async def probe_gateway_health() -> tuple[bool, float, str]: """Probe gateway /health endpoint""" start = time.time() try: async with httpx.AsyncClient(timeout=PROBE_TIMEOUT) as client: resp = await client.get(f"{GATEWAY_URL}/health") latency = time.time() - start if resp.status_code == 200: data = resp.json() if data.get("status") == "healthy": return True, latency, "" else: return False, latency, f"unhealthy: {data.get('status')}" else: return False, latency, f"http_{resp.status_code}" except httpx.TimeoutException: return False, time.time() - start, "timeout" except Exception as e: return False, time.time() - start, f"error: {str(e)[:50]}" async def probe_agent_ping() -> tuple[bool, float, str]: """Probe gateway /debug/agent_ping endpoint (E2E through router)""" start = time.time() try: async with httpx.AsyncClient(timeout=PROBE_TIMEOUT) as client: resp = await client.post( f"{GATEWAY_URL}/debug/agent_ping", json={"probe": True, "timestamp": datetime.utcnow().isoformat()} ) latency = time.time() - start if resp.status_code == 200: data = resp.json() if data.get("success"): return True, latency, "" else: return False, latency, data.get("error", "unknown") else: return False, latency, f"http_{resp.status_code}" except httpx.TimeoutException: return False, time.time() - start, "timeout" except Exception as e: return False, time.time() - start, f"error: {str(e)[:50]}" async def probe_webhook_echo() -> tuple[bool, float, str]: """ Probe via webhook endpoint with minimal test message. This tests the full path: gateway -> router -> completion """ start = time.time() try: # Minimal Telegram-like update that gateway can process test_update = { "update_id": int(time.time()), "message": { "message_id": 1, "from": {"id": 0, "is_bot": True, "first_name": "E2EProber"}, "chat": {"id": 0, "type": "private"}, "date": int(time.time()), "text": "/health" # Simple health check command } } async with httpx.AsyncClient(timeout=PROBE_TIMEOUT) as client: # Use helion webhook as it's the most tested resp = await client.post( f"{GATEWAY_URL}/helion/telegram/webhook", json=test_update ) latency = time.time() - start if resp.status_code == 200: return True, latency, "" else: return False, latency, f"http_{resp.status_code}" except httpx.TimeoutException: return False, time.time() - start, "timeout" except Exception as e: return False, time.time() - start, f"error: {str(e)[:50]}" async def probe_agent_semantic(agent_id: str) -> tuple[bool, float, str]: """Probe semantic response via router infer and assert DAARWIZZ awareness.""" start = time.time() try: payload = { "prompt": SEMANTIC_PROMPT, "max_tokens": 180, "temperature": 0.1, "metadata": { "agent_id": agent_id, "user_id": "tg:0", "chat_id": "0", "username": "e2e-prober", "raw_user_text": SEMANTIC_PROMPT, }, } async with httpx.AsyncClient(timeout=SEMANTIC_TIMEOUT) as client: resp = await client.post(f"{ROUTER_URL}/v1/agents/{agent_id}/infer", json=payload) latency = time.time() - start if resp.status_code != 200: return False, latency, f"http_{resp.status_code}" data = resp.json() answer = str(data.get("response") or "") backend = str(data.get("backend") or "") model = str(data.get("model") or "") answer_lc = answer.lower() if SEMANTIC_EXPECT_KEYWORD not in answer_lc and "даар" not in answer_lc: return False, latency, "no_daarwizz_in_answer" if MONITOR_EXPECT_LOCAL and agent_id == "monitor": local_ok = ("ollama" in backend.lower()) or model.lower().startswith("qwen") if not local_ok: return False, latency, f"monitor_nonlocal_backend:{backend}:{model}" return True, latency, "" except httpx.TimeoutException: return False, time.time() - start, "timeout" except Exception as e: return False, time.time() - start, f"error: {str(e)[:50]}" def record_probe(target: str, success: bool, latency: float, reason: str): """Record probe metrics and log line.""" agent_e2e_runs_total.labels(target=target).inc() agent_e2e_success.labels(target=target).set(1 if success else 0) agent_e2e_latency.labels(target=target).set(latency) agent_e2e_latency_histogram.labels(target=target).observe(latency) if not success: agent_e2e_failures_total.labels(target=target, reason=reason).inc() logger.info(f"{target}: success={success}, latency={latency:.3f}s, reason={reason}") async def run_probes(): """Run all probes and update metrics""" # Probe 1: Gateway health success, latency, reason = await probe_gateway_health() record_probe("gateway_health", success, latency, reason) # Probe 2: Agent ping (if endpoint exists) success, latency, reason = await probe_agent_ping() record_probe("agent_ping", success, latency, reason) # Probe 3: Webhook E2E (full path test) success, latency, reason = await probe_webhook_echo() record_probe("webhook_e2e", success, latency, reason) # Probe 4+: semantic checks for selected agents (parallel) if SEMANTIC_PROBE_ENABLED and SEMANTIC_AGENTS: results = await asyncio.gather(*(probe_agent_semantic(agent_id) for agent_id in SEMANTIC_AGENTS)) matrix = [] for agent_id, (success, latency, reason) in zip(SEMANTIC_AGENTS, results): record_probe(f"semantic_{agent_id}", success, latency, reason) matrix.append(f"{agent_id}:{'PASS' if success else 'FAIL'}") logger.info("semantic_matrix: " + " | ".join(matrix)) async def main(): logger.info("Starting E2E Agent Prober") logger.info(f" GATEWAY_URL: {GATEWAY_URL}") logger.info(f" ROUTER_URL: {ROUTER_URL}") logger.info(f" PROBE_INTERVAL: {PROBE_INTERVAL}s") logger.info(f" PROBE_TIMEOUT: {PROBE_TIMEOUT}s") logger.info(f" METRICS_PORT: {METRICS_PORT}") logger.info(f" SEMANTIC_TIMEOUT: {SEMANTIC_TIMEOUT}s") logger.info(f" SEMANTIC_PROBE_ENABLED: {SEMANTIC_PROBE_ENABLED}") logger.info(f" SEMANTIC_AGENTS: {','.join(SEMANTIC_AGENTS)}") # Start Prometheus metrics server start_http_server(METRICS_PORT) logger.info(f"Prometheus metrics available at :{METRICS_PORT}/metrics") # Initial probe await run_probes() # Continuous probing while True: await asyncio.sleep(PROBE_INTERVAL) try: await run_probes() except Exception as e: logger.error(f"Probe error: {e}") if __name__ == "__main__": asyncio.run(main())