Files
microdao-daarion/scripts/check-invariants.py

759 lines
26 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DAARION Infrastructure Invariants Check
Перевіряє критичні інваріанти системи після кожного деплою.
Якщо хоч один інваріант порушено — скрипт повертає exit code 1.
Використання:
python scripts/check-invariants.py
python scripts/check-invariants.py --base-url http://localhost:7001
python scripts/check-invariants.py --node node-1-hetzner-gex44
Інваріанти перевіряються:
1. Ноди (NODE1, NODE2): metrics, heartbeat, agent counts
2. Node Agents: Guardian + Steward з core prompts
3. Core Agents: DAARWIZZ, DARIA, DARIO, SOUL, Spirit, Logic, Helion, GREENFOOD
4. DAGI Router: агенти, audit статус
5. System Prompts: наявність core для критичних агентів
"""
import argparse
import sys
import json
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
try:
import requests
except ImportError:
print("❌ requests not installed. Run: pip install requests")
sys.exit(1)
# ==============================================================================
# Configuration
# ==============================================================================
class Severity(Enum):
CRITICAL = "CRITICAL"
WARNING = "WARNING"
INFO = "INFO"
@dataclass
class InvariantError:
"""Помилка інваріанту"""
invariant: str
message: str
severity: Severity = Severity.CRITICAL
details: Optional[Dict] = None
@dataclass
class InvariantResult:
"""Результат перевірки інваріанту"""
name: str
passed: bool
message: str
severity: Severity = Severity.CRITICAL
details: Optional[Dict] = None
@dataclass
class CheckResults:
"""Загальні результати перевірки"""
passed: List[InvariantResult] = field(default_factory=list)
failed: List[InvariantResult] = field(default_factory=list)
warnings: List[InvariantResult] = field(default_factory=list)
@property
def has_critical_failures(self) -> bool:
return any(r.severity == Severity.CRITICAL for r in self.failed)
@property
def total_checks(self) -> int:
return len(self.passed) + len(self.failed) + len(self.warnings)
# Node IDs
NODE1_ID = "node-1-hetzner-gex44"
NODE2_ID = "node-2-macbook-m4max"
# Core agents that MUST exist with prompts
CORE_AGENTS = [
{"slug": "daarwizz", "name": "DAARWIZZ", "required_prompts": ["core"]},
{"slug": "agent-daarwizz", "name": "DAARWIZZ", "required_prompts": ["core"]},
{"slug": "microdao-orchestrator", "name": "MicroDAO Orchestrator", "required_prompts": ["core"]},
{"slug": "agent-microdao-orchestrator", "name": "MicroDAO Orchestrator", "required_prompts": ["core"]},
{"slug": "devtools", "name": "DevTools", "required_prompts": ["core"]},
{"slug": "agent-devtools", "name": "DevTools", "required_prompts": ["core"]},
{"slug": "soul", "name": "SOUL", "required_prompts": ["core"]},
{"slug": "agent-soul", "name": "SOUL", "required_prompts": ["core"]},
{"slug": "greenfood", "name": "GREENFOOD", "required_prompts": ["core"]},
{"slug": "agent-greenfood", "name": "GREENFOOD", "required_prompts": ["core"]},
{"slug": "helion", "name": "Helion", "required_prompts": ["core"]},
{"slug": "agent-helion", "name": "Helion", "required_prompts": ["core"]},
{"slug": "druid", "name": "DRUID", "required_prompts": ["core"]},
{"slug": "agent-druid", "name": "DRUID", "required_prompts": ["core"]},
{"slug": "nutra", "name": "NUTRA", "required_prompts": ["core"]},
{"slug": "agent-nutra", "name": "NUTRA", "required_prompts": ["core"]},
{"slug": "monitor", "name": "Monitor", "required_prompts": ["core"]},
{"slug": "agent-monitor", "name": "Monitor", "required_prompts": ["core"]},
]
# Node agents that MUST exist
NODE_AGENTS = [
{"node_id": NODE1_ID, "slug": "monitor-node1", "kind": "node_guardian", "name": "Node Guardian NODE1"},
{"node_id": NODE1_ID, "slug": "node-steward-node1", "kind": "node_steward", "name": "Node Steward NODE1"},
{"node_id": NODE2_ID, "slug": "monitor-node2", "kind": "node_guardian", "name": "Node Guardian NODE2"},
{"node_id": NODE2_ID, "slug": "node-steward-node2", "kind": "node_steward", "name": "Node Steward NODE2"},
]
# Thresholds
MAX_HEARTBEAT_AGE_MINUTES = 10
MAX_PHANTOM_AGENTS = 20
MAX_STALE_AGENTS = 20
# ==============================================================================
# API Client
# ==============================================================================
class APIClient:
"""HTTP client for city-service API"""
def __init__(self, base_url: str, timeout: int = 10):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
def get(self, path: str) -> Tuple[Optional[Dict], Optional[str]]:
"""GET request, returns (data, error)"""
url = f"{self.base_url}{path}"
try:
response = requests.get(url, timeout=self.timeout)
if response.status_code == 200:
return response.json(), None
elif response.status_code == 404:
return None, f"Not found: {path}"
else:
return None, f"HTTP {response.status_code}: {response.text[:200]}"
except requests.exceptions.ConnectionError:
return None, f"Connection error: {url}"
except requests.exceptions.Timeout:
return None, f"Timeout: {url}"
except Exception as e:
return None, str(e)
def post(self, path: str, data: Dict) -> Tuple[Optional[Dict], Optional[str]]:
"""POST request, returns (data, error)"""
url = f"{self.base_url}{path}"
try:
response = requests.post(url, json=data, timeout=self.timeout)
if response.status_code == 200:
return response.json(), None
else:
return None, f"HTTP {response.status_code}: {response.text[:200]}"
except Exception as e:
return None, str(e)
# ==============================================================================
# Invariant Checks
# ==============================================================================
def check_node_exists(client: APIClient, node_id: str, results: CheckResults):
"""Перевірити що нода існує і має базові метрики"""
inv_name = f"Node exists: {node_id}"
data, error = client.get(f"/city/internal/node/{node_id}/metrics/current")
if error:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Cannot fetch node metrics: {error}",
severity=Severity.CRITICAL
))
return None
if not data:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Node {node_id} not found in system",
severity=Severity.CRITICAL
))
return None
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Node exists: {data.get('node_name', node_id)}"
))
return data
def check_node_metrics(client: APIClient, node_id: str, metrics: Dict, results: CheckResults):
"""Перевірити метрики ноди"""
# Check agent counts
agent_count_router = metrics.get("agent_count_router", 0)
agent_count_system = metrics.get("agent_count_system", 0)
inv_name = f"Node {node_id}: agent_count_router"
if agent_count_router >= 1:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Router has {agent_count_router} agents"
))
else:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Router has 0 agents (expected >= 1)",
severity=Severity.CRITICAL
))
inv_name = f"Node {node_id}: agent_count_system"
if agent_count_system >= 1:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"System has {agent_count_system} agents"
))
else:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message=f"System has 0 agents (expected >= 1)",
severity=Severity.CRITICAL
))
# Check GPU for NODE1 (production)
if node_id == NODE1_ID:
gpu_model = metrics.get("gpu_model")
gpu_memory = metrics.get("gpu_memory_total", 0)
inv_name = f"Node {node_id}: GPU configured"
if gpu_model and gpu_memory > 0:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"GPU: {gpu_model}, VRAM: {gpu_memory}MB"
))
else:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message="GPU not configured (may be expected for this node)",
severity=Severity.WARNING
))
# Check heartbeat
last_heartbeat = metrics.get("last_heartbeat")
if last_heartbeat:
inv_name = f"Node {node_id}: heartbeat fresh"
try:
hb_time = datetime.fromisoformat(last_heartbeat.replace("Z", "+00:00"))
age = datetime.now(timezone.utc) - hb_time
age_minutes = age.total_seconds() / 60
if age_minutes <= MAX_HEARTBEAT_AGE_MINUTES:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Last heartbeat: {age_minutes:.1f} minutes ago"
))
else:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Heartbeat stale: {age_minutes:.1f} minutes ago (max: {MAX_HEARTBEAT_AGE_MINUTES})",
severity=Severity.WARNING
))
except Exception as e:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Cannot parse heartbeat: {e}",
severity=Severity.WARNING
))
def check_node_agents(client: APIClient, node_id: str, results: CheckResults):
"""Перевірити Node Guardian та Steward"""
data, error = client.get(f"/city/internal/node/{node_id}/agents")
if error:
results.failed.append(InvariantResult(
name=f"Node {node_id}: fetch agents",
passed=False,
message=f"Cannot fetch node agents: {error}",
severity=Severity.CRITICAL
))
return
# Check Guardian
guardian = data.get("guardian")
inv_name = f"Node {node_id}: Node Guardian exists"
if guardian:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Guardian: {guardian.get('name', guardian.get('id'))}"
))
else:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message="Node Guardian not found",
severity=Severity.CRITICAL
))
# Check Steward
steward = data.get("steward")
inv_name = f"Node {node_id}: Node Steward exists"
if steward:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Steward: {steward.get('name', steward.get('id'))}"
))
else:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message="Node Steward not found",
severity=Severity.CRITICAL
))
# Check total agents
total = data.get("total", 0)
inv_name = f"Node {node_id}: has agents"
if total >= 1:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Node has {total} agents"
))
else:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message="Node has 0 agents",
severity=Severity.CRITICAL
))
def check_dagi_router(client: APIClient, node_id: str, results: CheckResults):
"""Перевірити DAGI Router стан"""
data, error = client.get(f"/city/internal/node/{node_id}/dagi-router/agents")
if error:
results.warnings.append(InvariantResult(
name=f"Node {node_id}: DAGI Router check",
passed=False,
message=f"Cannot fetch DAGI Router agents: {error}",
severity=Severity.WARNING
))
return
summary = data.get("summary", {})
# Check router has agents
router_total = summary.get("router_total", 0)
inv_name = f"Node {node_id}: DAGI Router has agents"
if router_total >= 1:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Router has {router_total} agents configured"
))
else:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message="DAGI Router has 0 agents",
severity=Severity.WARNING
))
# Check phantom agents
phantom_count = summary.get("phantom", 0)
inv_name = f"Node {node_id}: phantom agents limit"
if phantom_count <= MAX_PHANTOM_AGENTS:
if phantom_count > 0:
results.warnings.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Phantom agents: {phantom_count} (consider syncing)",
severity=Severity.INFO
))
else:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message="No phantom agents"
))
else:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Too many phantom agents: {phantom_count} (max: {MAX_PHANTOM_AGENTS})",
severity=Severity.WARNING
))
# Check stale agents
stale_count = summary.get("stale", 0)
inv_name = f"Node {node_id}: stale agents limit"
if stale_count <= MAX_STALE_AGENTS:
if stale_count > 0:
results.warnings.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Stale agents: {stale_count} (consider cleanup)",
severity=Severity.INFO
))
else:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message="No stale agents"
))
else:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Too many stale agents: {stale_count} (max: {MAX_STALE_AGENTS})",
severity=Severity.WARNING
))
def check_core_agents_prompts(client: APIClient, results: CheckResults):
"""Перевірити що core агенти мають system prompts"""
# Collect all agent IDs we need to check
agent_ids = [a["slug"] for a in CORE_AGENTS]
# Batch check prompts status
data, error = client.post("/city/internal/agents/prompts/status", {"agent_ids": agent_ids})
if error:
results.warnings.append(InvariantResult(
name="Core agents: prompts status",
passed=False,
message=f"Cannot check prompts status: {error}",
severity=Severity.WARNING
))
return
status = data.get("status", {})
# Check each core agent (group by name to avoid duplicate checks)
checked_names = set()
for agent in CORE_AGENTS:
if agent["name"] in checked_names:
continue
slug = agent["slug"]
has_prompts = status.get(slug, False)
inv_name = f"Core agent: {agent['name']} has prompts"
if has_prompts:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Agent {agent['name']} has system prompts"
))
checked_names.add(agent["name"])
else:
# Try alternative slug
alt_slug = slug.replace("agent-", "") if slug.startswith("agent-") else f"agent-{slug}"
if status.get(alt_slug, False):
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Agent {agent['name']} has system prompts (alt slug)"
))
checked_names.add(agent["name"])
else:
# Don't fail, just warn - prompts may not be migrated yet
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Agent {agent['name']} missing system prompts",
severity=Severity.WARNING
))
checked_names.add(agent["name"])
def check_healthz(client: APIClient, results: CheckResults):
"""Перевірити /health endpoint"""
data, error = client.get("/health")
inv_name = "City service: /health"
if error:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Health check failed: {error}",
severity=Severity.CRITICAL
))
else:
status = data.get("status", "unknown") if data else "unknown"
if status in ["ok", "healthy"]:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"City service healthy: {status}"
))
else:
results.failed.append(InvariantResult(
name=inv_name,
passed=False,
message=f"City service unhealthy: {status}",
severity=Severity.CRITICAL
))
def check_node_self_healing(client: APIClient, node_id: str, results: CheckResults):
"""Перевірити self-healing статус ноди"""
data, error = client.get(f"/city/internal/node/{node_id}/self-healing/status")
if error:
results.warnings.append(InvariantResult(
name=f"Node {node_id}: self-healing status",
passed=False,
message=f"Cannot fetch self-healing status: {error}",
severity=Severity.WARNING
))
return
# Check if registered
inv_name = f"Node {node_id}: registered in node_registry"
if data.get("registered"):
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Node registered: {data.get('name', node_id)}"
))
else:
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message="Node not in node_registry (run migration 039)",
severity=Severity.WARNING
))
# Check self-healing status
sh_status = data.get("self_healing_status", "unknown")
inv_name = f"Node {node_id}: self-healing status"
if sh_status == "healthy":
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message="Self-healing status: healthy"
))
elif sh_status == "error":
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"Self-healing status: error",
severity=Severity.WARNING
))
else:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message=f"Self-healing status: {sh_status}"
))
def check_nodes_needing_healing(client: APIClient, results: CheckResults):
"""Перевірити чи є ноди, які потребують healing"""
data, error = client.get("/city/internal/nodes/needing-healing")
if error:
results.warnings.append(InvariantResult(
name="System: nodes needing healing",
passed=False,
message=f"Cannot check: {error}",
severity=Severity.WARNING
))
return
nodes = data.get("nodes", [])
total = data.get("total", 0)
inv_name = "System: nodes needing healing"
if total == 0:
results.passed.append(InvariantResult(
name=inv_name,
passed=True,
message="No nodes need healing"
))
else:
reasons = [n.get("healing_reason", "unknown") for n in nodes[:3]]
results.warnings.append(InvariantResult(
name=inv_name,
passed=False,
message=f"{total} node(s) need healing: {', '.join(reasons)}",
severity=Severity.WARNING
))
# ==============================================================================
# Main
# ==============================================================================
def run_all_checks(base_url: str, node_filter: Optional[str] = None) -> CheckResults:
"""Запустити всі перевірки інваріантів"""
client = APIClient(base_url)
results = CheckResults()
print(f"\n{'='*60}")
print("DAARION Infrastructure Invariants Check")
print(f"{'='*60}")
print(f"Base URL: {base_url}")
print(f"Time: {datetime.now().isoformat()}")
print(f"{'='*60}\n")
# Health check first
print("🔍 Checking city-service health...")
check_healthz(client, results)
# Determine which nodes to check
nodes_to_check = []
if node_filter:
nodes_to_check = [node_filter]
else:
nodes_to_check = [NODE1_ID, NODE2_ID]
# Check each node
for node_id in nodes_to_check:
print(f"\n🔍 Checking node: {node_id}")
# Node exists and metrics
metrics = check_node_exists(client, node_id, results)
if metrics:
check_node_metrics(client, node_id, metrics, results)
# Node agents (Guardian/Steward)
check_node_agents(client, node_id, results)
# DAGI Router
check_dagi_router(client, node_id, results)
# Self-healing status
check_node_self_healing(client, node_id, results)
# Core agents prompts
print("\n🔍 Checking core agents prompts...")
check_core_agents_prompts(client, results)
# System-wide checks
print("\n🔍 Checking system-wide self-healing...")
check_nodes_needing_healing(client, results)
return results
def print_results(results: CheckResults):
"""Вивести результати перевірки"""
print(f"\n{'='*60}")
print("RESULTS")
print(f"{'='*60}\n")
# Passed
if results.passed:
print(f"✅ PASSED ({len(results.passed)}):")
for r in results.passed:
print(f"{r.name}: {r.message}")
# Warnings
if results.warnings:
print(f"\n⚠️ WARNINGS ({len(results.warnings)}):")
for r in results.warnings:
print(f"{r.name}: {r.message}")
# Failed
if results.failed:
print(f"\n❌ FAILED ({len(results.failed)}):")
for r in results.failed:
severity = f"[{r.severity.value}]" if r.severity else ""
print(f"{severity} {r.name}: {r.message}")
# Summary
print(f"\n{'='*60}")
print("SUMMARY")
print(f"{'='*60}")
print(f" Total checks: {results.total_checks}")
print(f" Passed: {len(results.passed)}")
print(f" Warnings: {len(results.warnings)}")
print(f" Failed: {len(results.failed)}")
if results.has_critical_failures:
print(f"\n❌ INVARIANT CHECK FAILED - Critical issues found!")
return 1
elif results.failed:
print(f"\n⚠️ INVARIANT CHECK PASSED with warnings")
return 0 # Non-critical failures don't fail the deploy
else:
print(f"\n✅ ALL INVARIANTS PASSED")
return 0
def main():
parser = argparse.ArgumentParser(
description="DAARION Infrastructure Invariants Check"
)
parser.add_argument(
"--base-url",
default="http://daarion-city-service:7001",
help="Base URL of city-service API"
)
parser.add_argument(
"--node",
help="Check only specific node (e.g., node-1-hetzner-gex44)"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
args = parser.parse_args()
# Run checks
results = run_all_checks(args.base_url, args.node)
# Output
if args.json:
output = {
"passed": [{"name": r.name, "message": r.message} for r in results.passed],
"warnings": [{"name": r.name, "message": r.message} for r in results.warnings],
"failed": [{"name": r.name, "message": r.message, "severity": r.severity.value} for r in results.failed],
"success": not results.has_critical_failures
}
print(json.dumps(output, indent=2))
sys.exit(0 if not results.has_critical_failures else 1)
else:
exit_code = print_results(results)
sys.exit(exit_code)
if __name__ == "__main__":
main()