feat: add router health metrics to node_cache and node-guardian
- Add migration 042_node_cache_router_metrics.sql - Node guardian now collects router health and sends in heartbeat - City-service uses cached router_healthy from node_cache - This allows NODE2 router status to be displayed correctly
This commit is contained in:
71
migrations/042_node_cache_router_metrics.sql
Normal file
71
migrations/042_node_cache_router_metrics.sql
Normal file
@@ -0,0 +1,71 @@
|
||||
-- Migration: Add router health metrics to node_cache
|
||||
-- Purpose: Store router health status collected by node-guardian
|
||||
|
||||
ALTER TABLE node_cache
|
||||
ADD COLUMN IF NOT EXISTS router_healthy boolean DEFAULT false,
|
||||
ADD COLUMN IF NOT EXISTS router_version text;
|
||||
|
||||
COMMENT ON COLUMN node_cache.router_healthy IS 'Whether DAGI Router is healthy on this node';
|
||||
COMMENT ON COLUMN node_cache.router_version IS 'Version of DAGI Router on this node';
|
||||
|
||||
-- Update fn_node_heartbeat to handle router metrics
|
||||
CREATE OR REPLACE FUNCTION fn_node_heartbeat(
|
||||
p_node_id text,
|
||||
p_metrics jsonb DEFAULT '{}'::jsonb
|
||||
) RETURNS jsonb AS $$
|
||||
DECLARE
|
||||
v_result jsonb;
|
||||
v_node_exists boolean;
|
||||
v_swapper_state jsonb;
|
||||
BEGIN
|
||||
-- Check if node exists
|
||||
SELECT EXISTS(SELECT 1 FROM node_cache WHERE node_id = p_node_id) INTO v_node_exists;
|
||||
|
||||
IF NOT v_node_exists THEN
|
||||
RETURN jsonb_build_object(
|
||||
'success', false,
|
||||
'should_self_register', true,
|
||||
'message', 'Node not found in cache'
|
||||
);
|
||||
END IF;
|
||||
|
||||
-- Handle swapper_state - only update if provided and not empty
|
||||
v_swapper_state := CASE
|
||||
WHEN p_metrics ? 'swapper_state' AND p_metrics->'swapper_state' != '{}'::jsonb
|
||||
THEN p_metrics->'swapper_state'
|
||||
ELSE NULL
|
||||
END;
|
||||
|
||||
-- Update node_cache with metrics
|
||||
UPDATE node_cache SET
|
||||
last_heartbeat = NOW(),
|
||||
status = 'online',
|
||||
cpu_usage = COALESCE((p_metrics->>'cpu_usage')::float, cpu_usage),
|
||||
gpu_vram_used = COALESCE((p_metrics->>'gpu_vram_used')::int, gpu_vram_used),
|
||||
ram_used = COALESCE((p_metrics->>'ram_used')::bigint, ram_used),
|
||||
disk_used = COALESCE((p_metrics->>'disk_used')::bigint, disk_used),
|
||||
agent_count_router = COALESCE((p_metrics->>'agent_count_router')::int, agent_count_router),
|
||||
agent_count_system = COALESCE((p_metrics->>'agent_count_system')::int, agent_count_system),
|
||||
dagi_router_url = COALESCE(p_metrics->>'dagi_router_url', dagi_router_url),
|
||||
-- Swapper metrics
|
||||
swapper_healthy = COALESCE((p_metrics->>'swapper_healthy')::boolean, swapper_healthy),
|
||||
swapper_models_loaded = COALESCE((p_metrics->>'swapper_models_loaded')::int, swapper_models_loaded),
|
||||
swapper_models_total = COALESCE((p_metrics->>'swapper_models_total')::int, swapper_models_total),
|
||||
swapper_state = COALESCE(v_swapper_state, swapper_state),
|
||||
-- Router metrics
|
||||
router_healthy = COALESCE((p_metrics->>'router_healthy')::boolean, router_healthy),
|
||||
router_version = COALESCE(p_metrics->>'router_version', router_version),
|
||||
-- Node-specific URLs (update if provided)
|
||||
swapper_url = COALESCE(p_metrics->>'swapper_url', swapper_url),
|
||||
router_url = COALESCE(p_metrics->>'router_url', router_url),
|
||||
updated_at = NOW()
|
||||
WHERE node_id = p_node_id;
|
||||
|
||||
RETURN jsonb_build_object(
|
||||
'success', true,
|
||||
'node_id', p_node_id,
|
||||
'timestamp', NOW()
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
@@ -256,6 +256,20 @@ class NodeGuardian:
|
||||
except Exception as e:
|
||||
logger.warning(f"Swapper metrics collection failed: {e}")
|
||||
|
||||
# Collect Router Metrics using node-specific URL
|
||||
metrics["router_healthy"] = False
|
||||
metrics["router_version"] = None
|
||||
try:
|
||||
r = await self.client.get(f"{self.router_url}/health", timeout=3.0)
|
||||
if r.status_code == 200:
|
||||
data = r.json()
|
||||
status = data.get("status", "").lower()
|
||||
metrics["router_healthy"] = status in ("healthy", "ok")
|
||||
metrics["router_version"] = data.get("version")
|
||||
logger.debug(f"🔀 Router metrics: healthy={metrics['router_healthy']}, version={metrics['router_version']}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Router health check failed: {e}")
|
||||
|
||||
return metrics
|
||||
|
||||
async def run_health_check(self) -> Dict[str, Any]:
|
||||
|
||||
@@ -3373,7 +3373,7 @@ async def get_node_endpoints(node_id: str) -> Dict[str, str]:
|
||||
|
||||
async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Отримати розширені метрики ноди (включаючи Swapper).
|
||||
Отримати розширені метрики ноди (включаючи Swapper та Router).
|
||||
"""
|
||||
pool = await get_pool()
|
||||
|
||||
@@ -3385,7 +3385,9 @@ async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]:
|
||||
swapper_models_total,
|
||||
swapper_state,
|
||||
router_url,
|
||||
swapper_url
|
||||
swapper_url,
|
||||
router_healthy,
|
||||
router_version
|
||||
FROM node_cache
|
||||
WHERE node_id = $1
|
||||
""", node_id)
|
||||
|
||||
@@ -4086,13 +4086,29 @@ async def get_node_swapper_detail(node_id: str):
|
||||
async def get_dagi_router_health(node_id: str):
|
||||
"""
|
||||
Get DAGI Router health status for a node.
|
||||
Always returns 200 with status="down" if router is unavailable.
|
||||
Uses node-specific router_url from node_cache.
|
||||
First checks node_cache for cached router_healthy status (from node-guardian).
|
||||
Falls back to direct health check if node is local (NODE1).
|
||||
"""
|
||||
import httpx
|
||||
import time
|
||||
|
||||
# Get router URL from database (node-specific)
|
||||
# First, try to get cached router health from node_cache
|
||||
# This is populated by node-guardian which has direct access to the router
|
||||
try:
|
||||
metrics = await repo_city.get_node_metrics(node_id)
|
||||
if metrics and metrics.get("router_healthy") is not None:
|
||||
return {
|
||||
"node_id": node_id,
|
||||
"status": "up" if metrics.get("router_healthy") else "down",
|
||||
"version": metrics.get("router_version"),
|
||||
"agent_count": 0, # TODO: get from node_cache
|
||||
"latency_ms": None,
|
||||
"source": "node_cache"
|
||||
}
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to get cached router health for {node_id}: {e}")
|
||||
|
||||
# Fallback: try direct health check (only works for NODE1 which is local to city-service)
|
||||
endpoints = await repo_city.get_node_endpoints(node_id)
|
||||
base_url = endpoints.get("router_url")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user