From 9b9a72ffbd03dc1f843f6731b3cc2a369146e35c Mon Sep 17 00:00:00 2001 From: Apple Date: Mon, 1 Dec 2025 08:01:53 -0800 Subject: [PATCH] feat: full node isolation - use node-specific swapper_url and router_url from DB - Add migration 041_node_local_endpoints.sql - Add get_node_endpoints() to repo_city.py - Update routes_city.py to use DB endpoints instead of hardcoded URLs - Update node-guardian-loop.py to use NODE_SWAPPER_URL/NODE_ROUTER_URL env vars - Update launchd plist for NODE2 with router URL --- migrations/041_node_local_endpoints.sql | 30 +++++++++++++++++ scripts/node-guardian-loop.py | 44 +++++++++++++++++++------ services/city-service/repo_city.py | 34 ++++++++++++++++++- services/city-service/routes_city.py | 32 +++++++++--------- 4 files changed, 113 insertions(+), 27 deletions(-) create mode 100644 migrations/041_node_local_endpoints.sql diff --git a/migrations/041_node_local_endpoints.sql b/migrations/041_node_local_endpoints.sql new file mode 100644 index 00000000..f016518d --- /dev/null +++ b/migrations/041_node_local_endpoints.sql @@ -0,0 +1,30 @@ +-- Migration: Add local endpoint URLs for each node +-- Purpose: Allow each node to have its own Swapper and Router URLs for proper isolation + +-- Add endpoint columns to node_cache +ALTER TABLE node_cache + ADD COLUMN IF NOT EXISTS router_url text, + ADD COLUMN IF NOT EXISTS swapper_url text; + +COMMENT ON COLUMN node_cache.router_url IS 'Full URL of DAGI Router for this node'; +COMMENT ON COLUMN node_cache.swapper_url IS 'Full URL of Swapper Service for this node'; + +-- Set default values for NODE1 (Docker-based) +UPDATE node_cache +SET + router_url = 'http://dagi-router:9102', + swapper_url = 'http://swapper-service:8890' +WHERE node_id = 'node-1-hetzner-gex44' + AND router_url IS NULL; + +-- Set default values for NODE2 (localhost-based) +UPDATE node_cache +SET + router_url = 'http://localhost:9102', + swapper_url = 'http://localhost:8890' +WHERE node_id = 'node-2-macbook-m4max' + AND router_url IS NULL; + +-- Create index for faster lookups +CREATE INDEX IF NOT EXISTS idx_node_cache_endpoints ON node_cache (node_id) WHERE router_url IS NOT NULL; + diff --git a/scripts/node-guardian-loop.py b/scripts/node-guardian-loop.py index d746b581..e5ad4745 100755 --- a/scripts/node-guardian-loop.py +++ b/scripts/node-guardian-loop.py @@ -16,6 +16,9 @@ Environment variables: NODE_NAME - Назва ноди (для self-registration) NODE_ENVIRONMENT - production/development NODE_ROLES - Ролі через кому + NODE_SWAPPER_URL - URL Swapper Service (node-specific) + NODE_ROUTER_URL - URL DAGI Router (node-specific) + SWAPPER_URL - Legacy alias for NODE_SWAPPER_URL """ import argparse @@ -63,7 +66,9 @@ class NodeGuardian: city_url: str, environment: str = "development", roles: list = None, - hostname: str = None + hostname: str = None, + swapper_url: str = None, + router_url: str = None ): self.node_id = node_id self.node_name = node_name @@ -72,6 +77,20 @@ class NodeGuardian: self.roles = roles or [] self.hostname = hostname + # Node-specific service URLs + # Priority: explicit param > NODE_*_URL env > SWAPPER_URL env > defaults + self.swapper_url = ( + swapper_url or + os.getenv("NODE_SWAPPER_URL") or + os.getenv("SWAPPER_URL") or + "http://swapper-service:8890" + ) + self.router_url = ( + router_url or + os.getenv("NODE_ROUTER_URL") or + "http://dagi-router:9102" + ) + self.client = httpx.AsyncClient(timeout=10.0) self.healing_attempts = 0 self.last_successful_check = None @@ -144,11 +163,15 @@ class NodeGuardian: async def send_heartbeat(self, metrics: Dict = None) -> bool: """Відправити heartbeat""" try: - payload = {"metrics": metrics or {}} + # Add node-specific URLs to metrics for storage in node_cache + metrics_with_urls = metrics or {} + metrics_with_urls["swapper_url"] = self.swapper_url + metrics_with_urls["router_url"] = self.router_url + + payload = {"metrics": metrics_with_urls} # Log key info for debugging node isolation - swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890") - logger.info(f"📤 Sending heartbeat: node_id={self.node_id}, swapper_url={swapper_url}") + logger.info(f"📤 Sending heartbeat: node_id={self.node_id}, swapper_url={self.swapper_url}, router_url={self.router_url}") response = await self.client.post( f"{self.city_url}/city/internal/node/{self.node_id}/heartbeat", @@ -193,7 +216,7 @@ class NodeGuardian: "disk_used": 0, "agent_count_router": 0, "agent_count_system": 0, - "dagi_router_url": "http://dagi-router:9102", + "dagi_router_url": self.router_url, # Use node-specific URL # Swapper defaults "swapper_healthy": False, "swapper_models_loaded": 0, @@ -201,12 +224,11 @@ class NodeGuardian: "swapper_state": {} } - # Collect Swapper Metrics - swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890") + # Collect Swapper Metrics using node-specific URL try: # Check health (Swapper uses /health, not /healthz) try: - r = await self.client.get(f"{swapper_url}/health", timeout=3.0) + r = await self.client.get(f"{self.swapper_url}/health", timeout=3.0) if r.status_code == 200: health_data = r.json() # Swapper can return "status": "healthy" or "ok" @@ -219,7 +241,7 @@ class NodeGuardian: # Check models (Swapper uses /models, not /v1/models) try: - r = await self.client.get(f"{swapper_url}/models", timeout=5.0) + r = await self.client.get(f"{self.swapper_url}/models", timeout=5.0) if r.status_code == 200: data = r.json() models = data.get("models", []) @@ -229,7 +251,7 @@ class NodeGuardian: metrics["swapper_state"] = data logger.debug(f"🧠 Swapper metrics: healthy={metrics['swapper_healthy']}, loaded={metrics['swapper_models_loaded']}/{metrics['swapper_models_total']}") except Exception as e: - logger.warning(f"Failed to fetch Swapper models from {swapper_url}: {e}") + logger.warning(f"Failed to fetch Swapper models from {self.swapper_url}: {e}") except Exception as e: logger.warning(f"Swapper metrics collection failed: {e}") @@ -347,6 +369,8 @@ async def run_guardian_loop( logger.info(f" Node Name: {node_name}") logger.info(f" Environment: {environment}") logger.info(f" City Service: {city_url}") + logger.info(f" Swapper URL: {guardian.swapper_url}") + logger.info(f" Router URL: {guardian.router_url}") logger.info(f" Interval: {interval}s") logger.info("=" * 60) diff --git a/services/city-service/repo_city.py b/services/city-service/repo_city.py index e1d66179..f68821ae 100644 --- a/services/city-service/repo_city.py +++ b/services/city-service/repo_city.py @@ -3287,6 +3287,8 @@ async def get_node_metrics_current(node_id: str) -> Optional[Dict[str, Any]]: COALESCE(swapper_healthy, false) as swapper_healthy, COALESCE(swapper_models_loaded, 0) as swapper_models_loaded, COALESCE(swapper_models_total, 0) as swapper_models_total, + router_url, + swapper_url, updated_at FROM node_cache WHERE node_id = $1 @@ -3341,6 +3343,34 @@ async def get_node_metrics_current(node_id: str) -> Optional[Dict[str, Any]]: return result +async def get_node_endpoints(node_id: str) -> Dict[str, str]: + """ + Отримати URL endpoints для конкретної ноди. + Якщо в БД немає значень — підставляє дефолти для NODE1. + """ + pool = await get_pool() + + row = await pool.fetchrow(""" + SELECT router_url, swapper_url + FROM node_cache + WHERE node_id = $1 + """, node_id) + + # Default values (NODE1 Docker-based) + defaults = { + "router_url": "http://dagi-router:9102", + "swapper_url": "http://swapper-service:8890" + } + + if not row: + return defaults + + return { + "router_url": row["router_url"] or defaults["router_url"], + "swapper_url": row["swapper_url"] or defaults["swapper_url"] + } + + async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]: """ Отримати розширені метрики ноди (включаючи Swapper). @@ -3353,7 +3383,9 @@ async def get_node_metrics(node_id: str) -> Optional[Dict[str, Any]]: swapper_healthy, swapper_models_loaded, swapper_models_total, - swapper_state + swapper_state, + router_url, + swapper_url FROM node_cache WHERE node_id = $1 """, node_id) diff --git a/services/city-service/routes_city.py b/services/city-service/routes_city.py index efd08608..4cd75ae9 100644 --- a/services/city-service/routes_city.py +++ b/services/city-service/routes_city.py @@ -4087,17 +4087,14 @@ async def get_dagi_router_health(node_id: str): """ Get DAGI Router health status for a node. Always returns 200 with status="down" if router is unavailable. + Uses node-specific router_url from node_cache. """ import httpx import time - # Node-specific router URLs - NODE_ROUTER_URLS = { - "node-1-hetzner-gex44": "http://dagi-router:9102", - "node-2-macbook-m4max": "http://localhost:9102", # Local router on NODE2 - } - - base_url = NODE_ROUTER_URLS.get(node_id) + # Get router URL from database (node-specific) + endpoints = await repo_city.get_node_endpoints(node_id) + base_url = endpoints.get("router_url") if not base_url: return { @@ -4125,9 +4122,12 @@ async def get_dagi_router_health(node_id: str): } data = resp.json() + # Router can return "healthy" or "ok" + status_val = data.get("status", "").lower() + is_healthy = status_val in ("healthy", "ok") return { "node_id": node_id, - "status": "up" if data.get("status") == "healthy" else "degraded", + "status": "up" if is_healthy else "degraded", "version": data.get("version"), "agent_count": data.get("agent_count", 0), "latency_ms": round(latency_ms, 2) @@ -4149,15 +4149,13 @@ async def get_dagi_router_agents(node_id: str): Get list of agents for a node. Since DAGI Router doesn't expose /agents endpoint, we use DB agents and check router health to determine status. + Uses node-specific router_url from node_cache. """ import httpx - NODE_ROUTER_URLS = { - "node-1-hetzner-gex44": "http://dagi-router:9102", - "node-2-macbook-m4max": "http://localhost:9102", - } - - base_url = NODE_ROUTER_URLS.get(node_id) + # Get router URL from database (node-specific) + endpoints = await repo_city.get_node_endpoints(node_id) + base_url = endpoints.get("router_url") router_healthy = False # Check if router is healthy @@ -4167,9 +4165,11 @@ async def get_dagi_router_agents(node_id: str): resp = await client.get(f"{base_url}/health") if resp.status_code == 200: data = resp.json() - router_healthy = data.get("status") == "healthy" + # Router can return "healthy" or "ok" + status = data.get("status", "").lower() + router_healthy = status in ("healthy", "ok") except Exception as e: - logger.warning(f"Failed to check router health for {node_id}: {e}") + logger.warning(f"Failed to check router health for {node_id} at {base_url}: {e}") # Get agents from DB for this node try: