#!/usr/bin/env python3 """ DAARION Node Guardian Self-Healing Loop Періодично перевіряє стан ноди та виконує self-healing якщо потрібно. Запускається як фоновий процес на кожній ноді. Використання: python scripts/node-guardian-loop.py python scripts/node-guardian-loop.py --node-id node-2-macbook-m4max python scripts/node-guardian-loop.py --interval 300 # 5 хвилин Environment variables: CITY_SERVICE_URL - URL city-service NODE_ID - ID ноди NODE_NAME - Назва ноди (для self-registration) NODE_ENVIRONMENT - production/development NODE_ROLES - Ролі через кому """ import argparse import asyncio import json import logging import os import sys from datetime import datetime, timezone from typing import Dict, Any, Optional try: import httpx except ImportError: print("❌ httpx not installed. Run: pip install httpx") sys.exit(1) # ============================================================================== # Configuration # ============================================================================== logging.basicConfig( level=logging.INFO, format='%(asctime)s [NODE-GUARDIAN] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) DEFAULT_INTERVAL = 60 # seconds DEFAULT_CITY_URL = "http://localhost:7001" # ============================================================================== # Self-Healing Logic # ============================================================================== class NodeGuardian: """Node Guardian — self-healing agent for DAARION nodes""" def __init__( self, node_id: str, node_name: str, city_url: str, environment: str = "development", roles: list = None, hostname: str = None ): self.node_id = node_id self.node_name = node_name self.city_url = city_url.rstrip("/") self.environment = environment self.roles = roles or [] self.hostname = hostname self.client = httpx.AsyncClient(timeout=10.0) self.healing_attempts = 0 self.last_successful_check = None async def close(self): await self.client.aclose() async def check_visibility(self) -> bool: """Перевірити чи нода видима в Node Directory""" try: response = await self.client.get( f"{self.city_url}/city/internal/node/{self.node_id}/directory-check" ) if response.status_code == 200: data = response.json() return data.get("visible_in_directory", False) return False except Exception as e: logger.error(f"Visibility check failed: {e}") return False async def get_self_healing_status(self) -> Dict[str, Any]: """Отримати статус self-healing""" try: response = await self.client.get( f"{self.city_url}/city/internal/node/{self.node_id}/self-healing/status" ) if response.status_code == 200: return response.json() return {"registered": False, "status": "unknown"} except Exception as e: logger.error(f"Status check failed: {e}") return {"registered": False, "status": "error", "error": str(e)} async def self_register(self) -> bool: """Виконати самореєстрацію""" try: payload = { "id": self.node_id, "name": self.node_name, "hostname": self.hostname, "environment": self.environment, "roles": self.roles } response = await self.client.post( f"{self.city_url}/city/internal/nodes/register-or-update", json=payload ) if response.status_code == 200: data = response.json() if data.get("success"): logger.info(f"✅ Self-registration successful: {data.get('message')}") return True else: logger.warning(f"Self-registration returned false: {data}") else: logger.error(f"Self-registration failed: HTTP {response.status_code}") return False except Exception as e: logger.error(f"Self-registration error: {e}") return False async def send_heartbeat(self, metrics: Dict = None) -> bool: """Відправити heartbeat""" try: payload = {"metrics": metrics or {}} response = await self.client.post( f"{self.city_url}/city/internal/node/{self.node_id}/heartbeat", json=payload ) if response.status_code == 200: data = response.json() if data.get("should_self_register"): logger.warning("⚠️ Server requests self-registration") return await self.self_register() if data.get("success"): return True return False except Exception as e: logger.error(f"Heartbeat failed: {e}") return False async def trigger_healing(self) -> Dict[str, Any]: """Тригернути self-healing через API""" try: response = await self.client.post( f"{self.city_url}/city/internal/node/{self.node_id}/self-healing/trigger" ) if response.status_code == 200: return response.json() else: return {"error": f"HTTP {response.status_code}"} except Exception as e: return {"error": str(e)} async def collect_metrics(self) -> Dict[str, Any]: """Зібрати метрики ноди та Swapper""" metrics = { "cpu_usage": 0.0, # TODO: Implement real metrics "gpu_vram_used": 0, "ram_used": 0, "disk_used": 0, "agent_count_router": 0, "agent_count_system": 0, "dagi_router_url": "http://dagi-router:9102", # Swapper defaults "swapper_healthy": False, "swapper_models_loaded": 0, "swapper_models_total": 0, "swapper_state": {} } # Collect Swapper Metrics swapper_url = os.getenv("SWAPPER_URL", "http://swapper-service:8890") try: # Check healthz try: r = await self.client.get(f"{swapper_url}/healthz", timeout=3.0) metrics["swapper_healthy"] = (r.status_code == 200) except Exception: metrics["swapper_healthy"] = False # Check models try: r = await self.client.get(f"{swapper_url}/v1/models", timeout=5.0) if r.status_code == 200: data = r.json() models = data.get("models", []) metrics["swapper_models_total"] = len(models) metrics["swapper_models_loaded"] = sum(1 for m in models if m.get("loaded") is True) metrics["swapper_state"] = data except Exception as e: logger.warning(f"Failed to fetch Swapper models: {e}") pass except Exception as e: logger.warning(f"Swapper metrics collection failed: {e}") return metrics async def run_health_check(self) -> Dict[str, Any]: """ Виконати повну перевірку здоров'я ноди. Returns: { "healthy": bool, "checks": { "visible_in_directory": bool, "registered": bool, "has_guardian": bool, "has_steward": bool, "heartbeat_fresh": bool }, "actions_taken": [] } """ result = { "healthy": True, "checks": {}, "actions_taken": [], "timestamp": datetime.now(timezone.utc).isoformat() } # 1. Check visibility in directory visible = await self.check_visibility() result["checks"]["visible_in_directory"] = visible if not visible: result["healthy"] = False logger.warning("⚠️ Node not visible in directory, attempting self-registration...") registered = await self.self_register() result["actions_taken"].append({ "action": "self_register", "success": registered }) if registered: # Re-check visibility visible = await self.check_visibility() result["checks"]["visible_in_directory_after_heal"] = visible # 2. Get detailed status status = await self.get_self_healing_status() result["checks"]["registered"] = status.get("registered", False) result["checks"]["has_guardian"] = status.get("has_guardian", False) result["checks"]["has_steward"] = status.get("has_steward", False) result["checks"]["agent_count_router"] = status.get("agent_count_router", 0) result["checks"]["agent_count_system"] = status.get("agent_count_system", 0) # 3. Check if healing needed based on status if status.get("self_healing_status") == "error": result["healthy"] = False logger.warning("⚠️ Node in error state, triggering healing...") heal_result = await self.trigger_healing() result["actions_taken"].append({ "action": "trigger_healing", "result": heal_result }) # 4. Send heartbeat with metrics metrics = await self.collect_metrics() heartbeat_ok = await self.send_heartbeat(metrics) result["checks"]["heartbeat_sent"] = heartbeat_ok if heartbeat_ok: self.last_successful_check = datetime.now(timezone.utc) # Update healthy status if result["actions_taken"]: # If we took actions, check if any failed failed_actions = [a for a in result["actions_taken"] if not a.get("success", True)] if failed_actions: result["healthy"] = False return result # ============================================================================== # Main Loop # ============================================================================== async def run_guardian_loop( node_id: str, node_name: str, city_url: str, environment: str, roles: list, hostname: str, interval: int ): """Run the Node Guardian self-healing loop""" guardian = NodeGuardian( node_id=node_id, node_name=node_name, city_url=city_url, environment=environment, roles=roles, hostname=hostname ) logger.info("=" * 60) logger.info("DAARION Node Guardian Starting") logger.info("=" * 60) logger.info(f" Node ID: {node_id}") logger.info(f" Node Name: {node_name}") logger.info(f" Environment: {environment}") logger.info(f" City Service: {city_url}") logger.info(f" Interval: {interval}s") logger.info("=" * 60) # Initial check logger.info("Running initial health check...") result = await guardian.run_health_check() if result["healthy"]: logger.info("✅ Initial check passed") else: logger.warning("⚠️ Initial check found issues:") for action in result.get("actions_taken", []): logger.warning(f" - {action}") # Main loop try: while True: await asyncio.sleep(interval) logger.info(f"Running periodic health check...") result = await guardian.run_health_check() if result["healthy"]: logger.info(f"✅ Health check passed") else: logger.warning(f"⚠️ Health check found issues") for action in result.get("actions_taken", []): logger.info(f" Action: {action['action']} - {action.get('success', 'done')}") except KeyboardInterrupt: logger.info("Shutting down Node Guardian...") except Exception as e: logger.error(f"Guardian loop error: {e}") raise finally: await guardian.close() def main(): parser = argparse.ArgumentParser( description="DAARION Node Guardian Self-Healing Loop" ) parser.add_argument( "--node-id", default=os.getenv("NODE_ID"), help="Node ID (default: $NODE_ID)" ) parser.add_argument( "--node-name", default=os.getenv("NODE_NAME"), help="Node name (default: $NODE_NAME)" ) parser.add_argument( "--city-url", default=os.getenv("CITY_SERVICE_URL", DEFAULT_CITY_URL), help="City service URL" ) parser.add_argument( "--environment", default=os.getenv("NODE_ENVIRONMENT", "development"), help="Node environment" ) parser.add_argument( "--roles", default=os.getenv("NODE_ROLES", "gpu,ai_runtime"), help="Node roles (comma-separated)" ) parser.add_argument( "--hostname", default=os.getenv("NODE_HOSTNAME"), help="Node hostname" ) parser.add_argument( "--interval", type=int, default=int(os.getenv("GUARDIAN_INTERVAL", DEFAULT_INTERVAL)), help=f"Check interval in seconds (default: {DEFAULT_INTERVAL})" ) parser.add_argument( "--once", action="store_true", help="Run single check and exit" ) args = parser.parse_args() # Validate required params if not args.node_id: logger.error("NODE_ID is required (--node-id or $NODE_ID)") sys.exit(1) if not args.node_name: args.node_name = f"Node {args.node_id}" roles = [r.strip() for r in args.roles.split(",") if r.strip()] if args.once: # Single check mode async def single_check(): guardian = NodeGuardian( node_id=args.node_id, node_name=args.node_name, city_url=args.city_url, environment=args.environment, roles=roles, hostname=args.hostname ) result = await guardian.run_health_check() await guardian.close() print(json.dumps(result, indent=2)) if not result["healthy"]: sys.exit(1) asyncio.run(single_check()) else: # Loop mode asyncio.run(run_guardian_loop( node_id=args.node_id, node_name=args.node_name, city_url=args.city_url, environment=args.environment, roles=roles, hostname=args.hostname, interval=args.interval )) if __name__ == "__main__": main()