#!/usr/bin/env python3 """ DAARION Infrastructure Invariants Check Перевіряє критичні інваріанти системи після кожного деплою. Якщо хоч один інваріант порушено — скрипт повертає exit code 1. Використання: python scripts/check-invariants.py python scripts/check-invariants.py --base-url http://localhost:7001 python scripts/check-invariants.py --node node-1-hetzner-gex44 Інваріанти перевіряються: 1. Ноди (NODE1, NODE2): metrics, heartbeat, agent counts 2. Node Agents: Guardian + Steward з core prompts 3. Core Agents: DAARWIZZ, DARIA, DARIO, SOUL, Spirit, Logic, Helion, GREENFOOD 4. DAGI Router: агенти, audit статус 5. System Prompts: наявність core для критичних агентів """ import argparse import sys import json from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass, field from enum import Enum try: import requests except ImportError: print("❌ requests not installed. Run: pip install requests") sys.exit(1) # ============================================================================== # Configuration # ============================================================================== class Severity(Enum): CRITICAL = "CRITICAL" WARNING = "WARNING" INFO = "INFO" @dataclass class InvariantError: """Помилка інваріанту""" invariant: str message: str severity: Severity = Severity.CRITICAL details: Optional[Dict] = None @dataclass class InvariantResult: """Результат перевірки інваріанту""" name: str passed: bool message: str severity: Severity = Severity.CRITICAL details: Optional[Dict] = None @dataclass class CheckResults: """Загальні результати перевірки""" passed: List[InvariantResult] = field(default_factory=list) failed: List[InvariantResult] = field(default_factory=list) warnings: List[InvariantResult] = field(default_factory=list) @property def has_critical_failures(self) -> bool: return any(r.severity == Severity.CRITICAL for r in self.failed) @property def total_checks(self) -> int: return len(self.passed) + len(self.failed) + len(self.warnings) # Node IDs NODE1_ID = "node-1-hetzner-gex44" NODE2_ID = "node-2-macbook-m4max" # Core agents that MUST exist with prompts CORE_AGENTS = [ {"slug": "daarwizz", "name": "DAARWIZZ", "required_prompts": ["core"]}, {"slug": "agent-daarwizz", "name": "DAARWIZZ", "required_prompts": ["core"]}, {"slug": "microdao-orchestrator", "name": "MicroDAO Orchestrator", "required_prompts": ["core"]}, {"slug": "agent-microdao-orchestrator", "name": "MicroDAO Orchestrator", "required_prompts": ["core"]}, {"slug": "devtools", "name": "DevTools", "required_prompts": ["core"]}, {"slug": "agent-devtools", "name": "DevTools", "required_prompts": ["core"]}, {"slug": "soul", "name": "SOUL", "required_prompts": ["core"]}, {"slug": "agent-soul", "name": "SOUL", "required_prompts": ["core"]}, {"slug": "greenfood", "name": "GREENFOOD", "required_prompts": ["core"]}, {"slug": "agent-greenfood", "name": "GREENFOOD", "required_prompts": ["core"]}, {"slug": "helion", "name": "Helion", "required_prompts": ["core"]}, {"slug": "agent-helion", "name": "Helion", "required_prompts": ["core"]}, {"slug": "druid", "name": "DRUID", "required_prompts": ["core"]}, {"slug": "agent-druid", "name": "DRUID", "required_prompts": ["core"]}, {"slug": "nutra", "name": "NUTRA", "required_prompts": ["core"]}, {"slug": "agent-nutra", "name": "NUTRA", "required_prompts": ["core"]}, {"slug": "monitor", "name": "Monitor", "required_prompts": ["core"]}, {"slug": "agent-monitor", "name": "Monitor", "required_prompts": ["core"]}, ] # Node agents that MUST exist NODE_AGENTS = [ {"node_id": NODE1_ID, "slug": "monitor-node1", "kind": "node_guardian", "name": "Node Guardian NODE1"}, {"node_id": NODE1_ID, "slug": "node-steward-node1", "kind": "node_steward", "name": "Node Steward NODE1"}, {"node_id": NODE2_ID, "slug": "monitor-node2", "kind": "node_guardian", "name": "Node Guardian NODE2"}, {"node_id": NODE2_ID, "slug": "node-steward-node2", "kind": "node_steward", "name": "Node Steward NODE2"}, ] # Thresholds MAX_HEARTBEAT_AGE_MINUTES = 10 MAX_PHANTOM_AGENTS = 20 MAX_STALE_AGENTS = 20 # ============================================================================== # API Client # ============================================================================== class APIClient: """HTTP client for city-service API""" def __init__(self, base_url: str, timeout: int = 10): self.base_url = base_url.rstrip("/") self.timeout = timeout def get(self, path: str) -> Tuple[Optional[Dict], Optional[str]]: """GET request, returns (data, error)""" url = f"{self.base_url}{path}" try: response = requests.get(url, timeout=self.timeout) if response.status_code == 200: return response.json(), None elif response.status_code == 404: return None, f"Not found: {path}" else: return None, f"HTTP {response.status_code}: {response.text[:200]}" except requests.exceptions.ConnectionError: return None, f"Connection error: {url}" except requests.exceptions.Timeout: return None, f"Timeout: {url}" except Exception as e: return None, str(e) def post(self, path: str, data: Dict) -> Tuple[Optional[Dict], Optional[str]]: """POST request, returns (data, error)""" url = f"{self.base_url}{path}" try: response = requests.post(url, json=data, timeout=self.timeout) if response.status_code == 200: return response.json(), None else: return None, f"HTTP {response.status_code}: {response.text[:200]}" except Exception as e: return None, str(e) # ============================================================================== # Invariant Checks # ============================================================================== def check_node_exists(client: APIClient, node_id: str, results: CheckResults): """Перевірити що нода існує і має базові метрики""" inv_name = f"Node exists: {node_id}" data, error = client.get(f"/city/internal/node/{node_id}/metrics/current") if error: results.failed.append(InvariantResult( name=inv_name, passed=False, message=f"Cannot fetch node metrics: {error}", severity=Severity.CRITICAL )) return None if not data: results.failed.append(InvariantResult( name=inv_name, passed=False, message=f"Node {node_id} not found in system", severity=Severity.CRITICAL )) return None results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Node exists: {data.get('node_name', node_id)}" )) return data def check_node_metrics(client: APIClient, node_id: str, metrics: Dict, results: CheckResults): """Перевірити метрики ноди""" # Check agent counts agent_count_router = metrics.get("agent_count_router", 0) agent_count_system = metrics.get("agent_count_system", 0) inv_name = f"Node {node_id}: agent_count_router" if agent_count_router >= 1: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Router has {agent_count_router} agents" )) else: results.failed.append(InvariantResult( name=inv_name, passed=False, message=f"Router has 0 agents (expected >= 1)", severity=Severity.CRITICAL )) inv_name = f"Node {node_id}: agent_count_system" if agent_count_system >= 1: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"System has {agent_count_system} agents" )) else: results.failed.append(InvariantResult( name=inv_name, passed=False, message=f"System has 0 agents (expected >= 1)", severity=Severity.CRITICAL )) # Check GPU for NODE1 (production) if node_id == NODE1_ID: gpu_model = metrics.get("gpu_model") gpu_memory = metrics.get("gpu_memory_total", 0) inv_name = f"Node {node_id}: GPU configured" if gpu_model and gpu_memory > 0: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"GPU: {gpu_model}, VRAM: {gpu_memory}MB" )) else: results.warnings.append(InvariantResult( name=inv_name, passed=False, message="GPU not configured (may be expected for this node)", severity=Severity.WARNING )) # Check heartbeat last_heartbeat = metrics.get("last_heartbeat") if last_heartbeat: inv_name = f"Node {node_id}: heartbeat fresh" try: hb_time = datetime.fromisoformat(last_heartbeat.replace("Z", "+00:00")) age = datetime.now(timezone.utc) - hb_time age_minutes = age.total_seconds() / 60 if age_minutes <= MAX_HEARTBEAT_AGE_MINUTES: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Last heartbeat: {age_minutes:.1f} minutes ago" )) else: results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"Heartbeat stale: {age_minutes:.1f} minutes ago (max: {MAX_HEARTBEAT_AGE_MINUTES})", severity=Severity.WARNING )) except Exception as e: results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"Cannot parse heartbeat: {e}", severity=Severity.WARNING )) def check_node_agents(client: APIClient, node_id: str, results: CheckResults): """Перевірити Node Guardian та Steward""" data, error = client.get(f"/city/internal/node/{node_id}/agents") if error: results.failed.append(InvariantResult( name=f"Node {node_id}: fetch agents", passed=False, message=f"Cannot fetch node agents: {error}", severity=Severity.CRITICAL )) return # Check Guardian guardian = data.get("guardian") inv_name = f"Node {node_id}: Node Guardian exists" if guardian: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Guardian: {guardian.get('name', guardian.get('id'))}" )) else: results.failed.append(InvariantResult( name=inv_name, passed=False, message="Node Guardian not found", severity=Severity.CRITICAL )) # Check Steward steward = data.get("steward") inv_name = f"Node {node_id}: Node Steward exists" if steward: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Steward: {steward.get('name', steward.get('id'))}" )) else: results.failed.append(InvariantResult( name=inv_name, passed=False, message="Node Steward not found", severity=Severity.CRITICAL )) # Check total agents total = data.get("total", 0) inv_name = f"Node {node_id}: has agents" if total >= 1: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Node has {total} agents" )) else: results.failed.append(InvariantResult( name=inv_name, passed=False, message="Node has 0 agents", severity=Severity.CRITICAL )) def check_dagi_router(client: APIClient, node_id: str, results: CheckResults): """Перевірити DAGI Router стан""" data, error = client.get(f"/city/internal/node/{node_id}/dagi-router/agents") if error: results.warnings.append(InvariantResult( name=f"Node {node_id}: DAGI Router check", passed=False, message=f"Cannot fetch DAGI Router agents: {error}", severity=Severity.WARNING )) return summary = data.get("summary", {}) # Check router has agents router_total = summary.get("router_total", 0) inv_name = f"Node {node_id}: DAGI Router has agents" if router_total >= 1: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Router has {router_total} agents configured" )) else: results.warnings.append(InvariantResult( name=inv_name, passed=False, message="DAGI Router has 0 agents", severity=Severity.WARNING )) # Check phantom agents phantom_count = summary.get("phantom", 0) inv_name = f"Node {node_id}: phantom agents limit" if phantom_count <= MAX_PHANTOM_AGENTS: if phantom_count > 0: results.warnings.append(InvariantResult( name=inv_name, passed=True, message=f"Phantom agents: {phantom_count} (consider syncing)", severity=Severity.INFO )) else: results.passed.append(InvariantResult( name=inv_name, passed=True, message="No phantom agents" )) else: results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"Too many phantom agents: {phantom_count} (max: {MAX_PHANTOM_AGENTS})", severity=Severity.WARNING )) # Check stale agents stale_count = summary.get("stale", 0) inv_name = f"Node {node_id}: stale agents limit" if stale_count <= MAX_STALE_AGENTS: if stale_count > 0: results.warnings.append(InvariantResult( name=inv_name, passed=True, message=f"Stale agents: {stale_count} (consider cleanup)", severity=Severity.INFO )) else: results.passed.append(InvariantResult( name=inv_name, passed=True, message="No stale agents" )) else: results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"Too many stale agents: {stale_count} (max: {MAX_STALE_AGENTS})", severity=Severity.WARNING )) def check_core_agents_prompts(client: APIClient, results: CheckResults): """Перевірити що core агенти мають system prompts""" # Collect all agent IDs we need to check agent_ids = [a["slug"] for a in CORE_AGENTS] # Batch check prompts status data, error = client.post("/city/internal/agents/prompts/status", {"agent_ids": agent_ids}) if error: results.warnings.append(InvariantResult( name="Core agents: prompts status", passed=False, message=f"Cannot check prompts status: {error}", severity=Severity.WARNING )) return status = data.get("status", {}) # Check each core agent (group by name to avoid duplicate checks) checked_names = set() for agent in CORE_AGENTS: if agent["name"] in checked_names: continue slug = agent["slug"] has_prompts = status.get(slug, False) inv_name = f"Core agent: {agent['name']} has prompts" if has_prompts: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Agent {agent['name']} has system prompts" )) checked_names.add(agent["name"]) else: # Try alternative slug alt_slug = slug.replace("agent-", "") if slug.startswith("agent-") else f"agent-{slug}" if status.get(alt_slug, False): results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Agent {agent['name']} has system prompts (alt slug)" )) checked_names.add(agent["name"]) else: # Don't fail, just warn - prompts may not be migrated yet results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"Agent {agent['name']} missing system prompts", severity=Severity.WARNING )) checked_names.add(agent["name"]) def check_healthz(client: APIClient, results: CheckResults): """Перевірити /health endpoint""" data, error = client.get("/health") inv_name = "City service: /health" if error: results.failed.append(InvariantResult( name=inv_name, passed=False, message=f"Health check failed: {error}", severity=Severity.CRITICAL )) else: status = data.get("status", "unknown") if data else "unknown" if status in ["ok", "healthy"]: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"City service healthy: {status}" )) else: results.failed.append(InvariantResult( name=inv_name, passed=False, message=f"City service unhealthy: {status}", severity=Severity.CRITICAL )) def check_node_self_healing(client: APIClient, node_id: str, results: CheckResults): """Перевірити self-healing статус ноди""" data, error = client.get(f"/city/internal/node/{node_id}/self-healing/status") if error: results.warnings.append(InvariantResult( name=f"Node {node_id}: self-healing status", passed=False, message=f"Cannot fetch self-healing status: {error}", severity=Severity.WARNING )) return # Check if registered inv_name = f"Node {node_id}: registered in node_registry" if data.get("registered"): results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Node registered: {data.get('name', node_id)}" )) else: results.warnings.append(InvariantResult( name=inv_name, passed=False, message="Node not in node_registry (run migration 039)", severity=Severity.WARNING )) # Check self-healing status sh_status = data.get("self_healing_status", "unknown") inv_name = f"Node {node_id}: self-healing status" if sh_status == "healthy": results.passed.append(InvariantResult( name=inv_name, passed=True, message="Self-healing status: healthy" )) elif sh_status == "error": results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"Self-healing status: error", severity=Severity.WARNING )) else: results.passed.append(InvariantResult( name=inv_name, passed=True, message=f"Self-healing status: {sh_status}" )) def check_nodes_needing_healing(client: APIClient, results: CheckResults): """Перевірити чи є ноди, які потребують healing""" data, error = client.get("/city/internal/nodes/needing-healing") if error: results.warnings.append(InvariantResult( name="System: nodes needing healing", passed=False, message=f"Cannot check: {error}", severity=Severity.WARNING )) return nodes = data.get("nodes", []) total = data.get("total", 0) inv_name = "System: nodes needing healing" if total == 0: results.passed.append(InvariantResult( name=inv_name, passed=True, message="No nodes need healing" )) else: reasons = [n.get("healing_reason", "unknown") for n in nodes[:3]] results.warnings.append(InvariantResult( name=inv_name, passed=False, message=f"{total} node(s) need healing: {', '.join(reasons)}", severity=Severity.WARNING )) # ============================================================================== # Main # ============================================================================== def run_all_checks(base_url: str, node_filter: Optional[str] = None) -> CheckResults: """Запустити всі перевірки інваріантів""" client = APIClient(base_url) results = CheckResults() print(f"\n{'='*60}") print("DAARION Infrastructure Invariants Check") print(f"{'='*60}") print(f"Base URL: {base_url}") print(f"Time: {datetime.now().isoformat()}") print(f"{'='*60}\n") # Health check first print("🔍 Checking city-service health...") check_healthz(client, results) # Determine which nodes to check nodes_to_check = [] if node_filter: nodes_to_check = [node_filter] else: nodes_to_check = [NODE1_ID, NODE2_ID] # Check each node for node_id in nodes_to_check: print(f"\n🔍 Checking node: {node_id}") # Node exists and metrics metrics = check_node_exists(client, node_id, results) if metrics: check_node_metrics(client, node_id, metrics, results) # Node agents (Guardian/Steward) check_node_agents(client, node_id, results) # DAGI Router check_dagi_router(client, node_id, results) # Self-healing status check_node_self_healing(client, node_id, results) # Core agents prompts print("\n🔍 Checking core agents prompts...") check_core_agents_prompts(client, results) # System-wide checks print("\n🔍 Checking system-wide self-healing...") check_nodes_needing_healing(client, results) return results def print_results(results: CheckResults): """Вивести результати перевірки""" print(f"\n{'='*60}") print("RESULTS") print(f"{'='*60}\n") # Passed if results.passed: print(f"✅ PASSED ({len(results.passed)}):") for r in results.passed: print(f" ✓ {r.name}: {r.message}") # Warnings if results.warnings: print(f"\n⚠️ WARNINGS ({len(results.warnings)}):") for r in results.warnings: print(f" ⚠ {r.name}: {r.message}") # Failed if results.failed: print(f"\n❌ FAILED ({len(results.failed)}):") for r in results.failed: severity = f"[{r.severity.value}]" if r.severity else "" print(f" ✗ {severity} {r.name}: {r.message}") # Summary print(f"\n{'='*60}") print("SUMMARY") print(f"{'='*60}") print(f" Total checks: {results.total_checks}") print(f" Passed: {len(results.passed)}") print(f" Warnings: {len(results.warnings)}") print(f" Failed: {len(results.failed)}") if results.has_critical_failures: print(f"\n❌ INVARIANT CHECK FAILED - Critical issues found!") return 1 elif results.failed: print(f"\n⚠️ INVARIANT CHECK PASSED with warnings") return 0 # Non-critical failures don't fail the deploy else: print(f"\n✅ ALL INVARIANTS PASSED") return 0 def main(): parser = argparse.ArgumentParser( description="DAARION Infrastructure Invariants Check" ) parser.add_argument( "--base-url", default="http://daarion-city-service:7001", help="Base URL of city-service API" ) parser.add_argument( "--node", help="Check only specific node (e.g., node-1-hetzner-gex44)" ) parser.add_argument( "--json", action="store_true", help="Output results as JSON" ) args = parser.parse_args() # Run checks results = run_all_checks(args.base_url, args.node) # Output if args.json: output = { "passed": [{"name": r.name, "message": r.message} for r in results.passed], "warnings": [{"name": r.name, "message": r.message} for r in results.warnings], "failed": [{"name": r.name, "message": r.message, "severity": r.severity.value} for r in results.failed], "success": not results.has_critical_failures } print(json.dumps(output, indent=2)) sys.exit(0 if not results.has_critical_failures else 1) else: exit_code = print_results(results) sys.exit(exit_code) if __name__ == "__main__": main()