diff --git a/migrations/042_node_cache_router_metrics.sql b/migrations/042_node_cache_router_metrics.sql new file mode 100644 index 00000000..ee539bc9 --- /dev/null +++ b/migrations/042_node_cache_router_metrics.sql @@ -0,0 +1,71 @@ +-- Migration: Add router health metrics to node_cache +-- Purpose: Store router health status collected by node-guardian + +ALTER TABLE node_cache + ADD COLUMN IF NOT EXISTS router_healthy boolean DEFAULT false, + ADD COLUMN IF NOT EXISTS router_version text; + +COMMENT ON COLUMN node_cache.router_healthy IS 'Whether DAGI Router is healthy on this node'; +COMMENT ON COLUMN node_cache.router_version IS 'Version of DAGI Router on this node'; + +-- Update fn_node_heartbeat to handle router metrics +CREATE OR REPLACE FUNCTION fn_node_heartbeat( + p_node_id text, + p_metrics jsonb DEFAULT '{}'::jsonb +) RETURNS jsonb AS $$ +DECLARE + v_result jsonb; + v_node_exists boolean; + v_swapper_state jsonb; +BEGIN + -- Check if node exists + SELECT EXISTS(SELECT 1 FROM node_cache WHERE node_id = p_node_id) INTO v_node_exists; + + IF NOT v_node_exists THEN + RETURN jsonb_build_object( + 'success', false, + 'should_self_register', true, + 'message', 'Node not found in cache' + ); + END IF; + + -- Handle swapper_state - only update if provided and not empty + v_swapper_state := CASE + WHEN p_metrics ? 'swapper_state' AND p_metrics->'swapper_state' != '{}'::jsonb + THEN p_metrics->'swapper_state' + ELSE NULL + END; + + -- Update node_cache with metrics + UPDATE node_cache SET + last_heartbeat = NOW(), + status = 'online', + cpu_usage = COALESCE((p_metrics->>'cpu_usage')::float, cpu_usage), + gpu_vram_used = COALESCE((p_metrics->>'gpu_vram_used')::int, gpu_vram_used), + ram_used = COALESCE((p_metrics->>'ram_used')::bigint, ram_used), + disk_used = COALESCE((p_metrics->>'disk_used')::bigint, disk_used), + agent_count_router = COALESCE((p_metrics->>'agent_count_router')::int, agent_count_router), + agent_count_system = COALESCE((p_metrics->>'agent_count_system')::int, agent_count_system), + dagi_router_url = COALESCE(p_metrics->>'dagi_router_url', dagi_router_url), + -- Swapper metrics + swapper_healthy = COALESCE((p_metrics->>'swapper_healthy')::boolean, swapper_healthy), + swapper_models_loaded = COALESCE((p_metrics->>'swapper_models_loaded')::int, swapper_models_loaded), + swapper_models_total = COALESCE((p_metrics->>'swapper_models_total')::int, swapper_models_total), + swapper_state = COALESCE(v_swapper_state, swapper_state), + -- Router metrics + router_healthy = COALESCE((p_metrics->>'router_healthy')::boolean, router_healthy), + router_version = COALESCE(p_metrics->>'router_version', router_version), + -- Node-specific URLs (update if provided) + swapper_url = COALESCE(p_metrics->>'swapper_url', swapper_url), + router_url = COALESCE(p_metrics->>'router_url', router_url), + updated_at = NOW() + WHERE node_id = p_node_id; + + RETURN jsonb_build_object( + 'success', true, + 'node_id', p_node_id, + 'timestamp', NOW() + ); +END; +$$ LANGUAGE plpgsql; + diff --git a/scripts/node-guardian-loop.py b/scripts/node-guardian-loop.py index e5ad4745..0a8549d6 100755 --- a/scripts/node-guardian-loop.py +++ b/scripts/node-guardian-loop.py @@ -255,6 +255,20 @@ class NodeGuardian: except Exception as e: logger.warning(f"Swapper metrics collection failed: {e}") + + # Collect Router Metrics using node-specific URL + metrics["router_healthy"] = False + metrics["router_version"] = None + try: + r = await self.client.get(f"{self.router_url}/health", timeout=3.0) + if r.status_code == 200: + data = r.json() + status = data.get("status", "").lower() + metrics["router_healthy"] = status in ("healthy", "ok") + metrics["router_version"] = data.get("version") + logger.debug(f"🔀 Router metrics: healthy={metrics['router_healthy']}, version={metrics['router_version']}") + except Exception as e: + logger.debug(f"Router health check failed: {e}") return metrics diff --git a/services/city-service/repo_city.py b/services/city-service/repo_city.py index f68821ae..000c5841 100644 --- a/services/city-service/repo_city.py +++ b/services/city-service/repo_city.py @@ -3373,7 +3373,7 @@ async def get_node_endpoints(node_id: str) -> Dict[str, str]: async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]: """ - Отримати розширені метрики ноди (включаючи Swapper). + Отримати розширені метрики ноди (включаючи Swapper та Router). """ pool = await get_pool() @@ -3385,7 +3385,9 @@ async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]: swapper_models_total, swapper_state, router_url, - swapper_url + swapper_url, + router_healthy, + router_version FROM node_cache WHERE node_id = $1 """, node_id) diff --git a/services/city-service/routes_city.py b/services/city-service/routes_city.py index 4cd75ae9..8359caf4 100644 --- a/services/city-service/routes_city.py +++ b/services/city-service/routes_city.py @@ -4086,13 +4086,29 @@ async def get_node_swapper_detail(node_id: str): async def get_dagi_router_health(node_id: str): """ Get DAGI Router health status for a node. - Always returns 200 with status="down" if router is unavailable. - Uses node-specific router_url from node_cache. + First checks node_cache for cached router_healthy status (from node-guardian). + Falls back to direct health check if node is local (NODE1). """ import httpx import time - # Get router URL from database (node-specific) + # First, try to get cached router health from node_cache + # This is populated by node-guardian which has direct access to the router + try: + metrics = await repo_city.get_node_metrics(node_id) + if metrics and metrics.get("router_healthy") is not None: + return { + "node_id": node_id, + "status": "up" if metrics.get("router_healthy") else "down", + "version": metrics.get("router_version"), + "agent_count": 0, # TODO: get from node_cache + "latency_ms": None, + "source": "node_cache" + } + except Exception as e: + logger.debug(f"Failed to get cached router health for {node_id}: {e}") + + # Fallback: try direct health check (only works for NODE1 which is local to city-service) endpoints = await repo_city.get_node_endpoints(node_id) base_url = endpoints.get("router_url")