Files
microdao-daarion/scripts/dagi_agent_audit.py
Apple bca81dc719 feat: Node Self-Healing, DAGI Audit, Agent Prompts, Infra Invariants
### Backend (city-service)
- Node Registry + Self-Healing API (migration 039)
- Improved get_all_nodes() with robust fallback for node_registry/node_cache
- Agent Prompts Runtime API for DAGI Router integration
- DAGI Router Audit endpoints (phantom/stale detection)
- Node Agents API (Guardian/Steward)
- Node metrics extended (CPU/GPU/RAM/Disk)

### Frontend (apps/web)
- Node Directory with improved error handling
- Node Cabinet with metrics cards
- DAGI Router Card component
- Node Metrics Card component
- useDAGIAudit hook

### Scripts
- check-invariants.py - deploy verification
- node-bootstrap.sh - node self-registration
- node-guardian-loop.py - continuous self-healing
- dagi_agent_audit.py - DAGI audit utility

### Migrations
- 034: Agent prompts seed
- 035: Agent DAGI audit
- 036: Node metrics extended
- 037: Node agents complete
- 038: Agent prompts full coverage
- 039: Node registry self-healing

### Tests
- test_infra_smoke.py
- test_agent_prompts_runtime.py
- test_dagi_router_api.py

### Documentation
- DEPLOY_CHECKLIST_2024_11_30.md
- Multiple TASK_PHASE docs
2025-11-30 13:52:01 -08:00

483 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DAGI Agent Audit Script
Порівнює агентів з DAGI Router config та БД microdao.
Виявляє:
- Active: агенти є в обох системах
- Phantom: агенти є в Router, але немає в БД
- Stale: агенти є в БД, але немає в Router
Використання:
python scripts/dagi_agent_audit.py --node node1
python scripts/dagi_agent_audit.py --node node2
python scripts/dagi_agent_audit.py --all
"""
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set, Any, Optional
import yaml
# Додати root проєкту до path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
try:
import asyncpg
except ImportError:
print("❌ asyncpg not installed. Run: pip install asyncpg")
sys.exit(1)
# ==============================================================================
# Configuration
# ==============================================================================
NODE_CONFIG = {
"node1": {
"id": "node-1-hetzner-gex44",
"router_config": PROJECT_ROOT / "router-config.yml",
"router_url": "http://localhost:9102", # На NODE1
"description": "Production Server (Hetzner)"
},
"node2": {
"id": "node-2-macbook-m4max",
"router_config": PROJECT_ROOT / "router-config.yml", # Локальний config
"router_url": "http://localhost:9102", # На NODE2
"description": "Development Node (MacBook)"
}
}
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion")
# ==============================================================================
# Router Config Parser
# ==============================================================================
def parse_router_config(config_path: Path) -> Dict[str, Any]:
"""Парсити router-config.yml"""
if not config_path.exists():
print(f"⚠️ Router config not found: {config_path}")
return {"agents": {}}
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
def get_router_agents(config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Витягти список агентів з конфігу Router"""
agents_config = config.get("agents", {})
agents = []
for agent_id, agent_data in agents_config.items():
agents.append({
"id": agent_id,
"name": agent_id, # В конфігу ім'я = ключ
"description": agent_data.get("description", ""),
"default_llm": agent_data.get("default_llm", ""),
"tools": [t.get("id") for t in agent_data.get("tools", [])],
"source": "router_config"
})
return agents
# ==============================================================================
# Database Access
# ==============================================================================
async def get_db_agents(node_id: str, database_url: str) -> List[Dict[str, Any]]:
"""Отримати агентів з БД для конкретної ноди"""
conn = await asyncpg.connect(database_url)
try:
# Спочатку спробуємо по node_id, якщо є
query = """
SELECT
id::text,
external_id,
COALESCE(name, display_name) as name,
kind,
node_id,
status,
COALESCE(is_active, true) as is_active,
created_at,
updated_at
FROM agents
WHERE COALESCE(is_archived, false) = false
AND COALESCE(is_test, false) = false
AND deleted_at IS NULL
ORDER BY name
"""
rows = await conn.fetch(query)
agents = []
for row in rows:
agents.append({
"id": row["id"],
"external_id": row["external_id"],
"name": row["name"],
"kind": row["kind"],
"node_id": row["node_id"],
"status": row["status"],
"is_active": row["is_active"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
"source": "database"
})
return agents
finally:
await conn.close()
async def update_agent_last_seen(agent_ids: List[str], database_url: str):
"""Оновити last_seen_at для агентів"""
if not agent_ids:
return
conn = await asyncpg.connect(database_url)
try:
# Перевіримо чи є колонка last_seen_at
col_check = await conn.fetchval("""
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'agents' AND column_name = 'last_seen_at'
)
""")
if col_check:
await conn.execute("""
UPDATE agents
SET last_seen_at = NOW()
WHERE id = ANY($1::uuid[])
""", agent_ids)
print(f"✅ Updated last_seen_at for {len(agent_ids)} agents")
else:
print("⚠️ Column last_seen_at doesn't exist yet (migration needed)")
finally:
await conn.close()
# ==============================================================================
# Audit Logic
# ==============================================================================
def normalize_agent_name(name: str) -> str:
"""Нормалізувати ім'я агента для порівняння"""
return name.lower().replace(" ", "").replace("-", "").replace("_", "")
def match_agents(router_agents: List[Dict], db_agents: List[Dict]) -> Dict[str, Any]:
"""
Зіставити агентів Router та БД.
Повертає словник з категоріями.
"""
# Створюємо індекси
router_by_name = {normalize_agent_name(a["name"]): a for a in router_agents}
router_by_id = {a["id"]: a for a in router_agents}
db_by_name = {normalize_agent_name(a["name"]): a for a in db_agents}
db_by_external_id = {}
for a in db_agents:
if a.get("external_id"):
# Витягти останню частину external_id (agent:daarwizz -> daarwizz)
ext_id = a["external_id"].split(":")[-1] if ":" in a["external_id"] else a["external_id"]
db_by_external_id[ext_id.lower()] = a
active = []
phantom = []
stale = []
matched_db_ids = set()
# Перевірити кожного агента з Router
for r_agent in router_agents:
r_name_norm = normalize_agent_name(r_agent["name"])
r_id_norm = r_agent["id"].lower()
# Шукаємо відповідність в БД
db_match = None
# По external_id
if r_id_norm in db_by_external_id:
db_match = db_by_external_id[r_id_norm]
# По імені
elif r_name_norm in db_by_name:
db_match = db_by_name[r_name_norm]
if db_match:
active.append({
"router": r_agent,
"db": db_match,
"status": "active"
})
matched_db_ids.add(db_match["id"])
else:
phantom.append({
"router": r_agent,
"db": None,
"status": "phantom",
"reason": "In Router config but not in DB"
})
# Знайти stale агентів (є в БД, немає в Router)
for db_agent in db_agents:
if db_agent["id"] not in matched_db_ids:
# Перевірити чи це агент ноди
# (деякі агенти можуть бути системними і не в Router)
stale.append({
"router": None,
"db": db_agent,
"status": "stale",
"reason": "In DB but not in Router config"
})
return {
"active": active,
"phantom": phantom,
"stale": stale,
"summary": {
"router_total": len(router_agents),
"db_total": len(db_agents),
"active_count": len(active),
"phantom_count": len(phantom),
"stale_count": len(stale)
}
}
# ==============================================================================
# Report Generation
# ==============================================================================
def generate_report(
node_id: str,
node_config: Dict[str, Any],
audit_result: Dict[str, Any]
) -> Dict[str, Any]:
"""Згенерувати JSON-звіт"""
report = {
"node_id": node_id,
"node_description": node_config.get("description", ""),
"timestamp": datetime.utcnow().isoformat() + "Z",
"summary": audit_result["summary"],
"active_agents": [
{
"router_id": a["router"]["id"],
"router_name": a["router"]["name"],
"db_id": a["db"]["id"],
"db_name": a["db"]["name"],
"db_external_id": a["db"].get("external_id"),
"kind": a["db"].get("kind"),
"status": a["db"].get("status", "unknown")
}
for a in audit_result["active"]
],
"phantom_agents": [
{
"router_id": a["router"]["id"],
"router_name": a["router"]["name"],
"description": a["router"].get("description", ""),
"reason": a["reason"]
}
for a in audit_result["phantom"]
],
"stale_agents": [
{
"db_id": a["db"]["id"],
"db_name": a["db"]["name"],
"db_external_id": a["db"].get("external_id"),
"kind": a["db"].get("kind"),
"reason": a["reason"]
}
for a in audit_result["stale"]
]
}
return report
def print_report(report: Dict[str, Any], verbose: bool = False):
"""Вивести звіт на консоль"""
print("\n" + "=" * 70)
print(f"🔍 DAGI AGENT AUDIT REPORT")
print(f" Node: {report['node_id']}")
print(f" Time: {report['timestamp']}")
print("=" * 70)
summary = report["summary"]
print(f"\n📊 Summary:")
print(f" Router agents: {summary['router_total']}")
print(f" DB agents: {summary['db_total']}")
print(f" ✅ Active: {summary['active_count']}")
print(f" 👻 Phantom: {summary['phantom_count']}")
print(f" 📦 Stale: {summary['stale_count']}")
if report["active_agents"]:
print(f"\n✅ ACTIVE AGENTS ({len(report['active_agents'])}):")
for a in report["active_agents"][:10]: # Показати перші 10
print(f"{a['router_name']} ({a['kind'] or 'unknown'}) - {a['status']}")
if len(report["active_agents"]) > 10:
print(f" ... and {len(report['active_agents']) - 10} more")
if report["phantom_agents"]:
print(f"\n👻 PHANTOM AGENTS (in Router, not in DB) ({len(report['phantom_agents'])}):")
for a in report["phantom_agents"]:
print(f" ⚠️ {a['router_name']} - {a['reason']}")
if verbose and a.get('description'):
print(f" Description: {a['description']}")
if report["stale_agents"]:
print(f"\n📦 STALE AGENTS (in DB, not in Router) ({len(report['stale_agents'])}):")
for a in report["stale_agents"][:10]: # Показати перші 10
print(f" 📌 {a['db_name']} ({a['kind'] or 'unknown'}) - {a['reason']}")
if len(report["stale_agents"]) > 10:
print(f" ... and {len(report['stale_agents']) - 10} more")
print("\n" + "=" * 70)
def save_report(report: Dict[str, Any], output_dir: Path):
"""Зберегти звіт у файл"""
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"dagi-audit-{report['node_id']}-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
filepath = output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n📄 Report saved to: {filepath}")
# Також зберегти "latest" версію
latest_path = output_dir / f"dagi-audit-{report['node_id']}-latest.json"
with open(latest_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# ==============================================================================
# Main
# ==============================================================================
async def audit_node(node_key: str, config: Dict[str, Any], args) -> Dict[str, Any]:
"""Виконати аудит для однієї ноди"""
print(f"\n🔍 Auditing {node_key}: {config['description']}...")
# 1. Отримати агентів з Router config
router_config = parse_router_config(config["router_config"])
router_agents = get_router_agents(router_config)
print(f" 📋 Found {len(router_agents)} agents in router-config.yml")
# 2. Отримати агентів з БД
db_url = args.database_url or DATABASE_URL
try:
db_agents = await get_db_agents(config["id"], db_url)
print(f" 📋 Found {len(db_agents)} agents in database")
except Exception as e:
print(f" ❌ Database error: {e}")
db_agents = []
# 3. Зіставити
audit_result = match_agents(router_agents, db_agents)
# 4. Генерувати звіт
report = generate_report(node_key, config, audit_result)
# 5. Вивести звіт
print_report(report, verbose=args.verbose)
# 6. Зберегти звіт
if args.output:
save_report(report, Path(args.output))
else:
save_report(report, PROJECT_ROOT / "logs" / "audit")
# 7. Оновити last_seen_at для active агентів
if args.update_seen and audit_result["active"]:
active_ids = [a["db"]["id"] for a in audit_result["active"]]
await update_agent_last_seen(active_ids, db_url)
return report
async def main():
parser = argparse.ArgumentParser(description="DAGI Agent Audit")
parser.add_argument(
"--node",
choices=["node1", "node2", "all"],
default="all",
help="Node to audit (default: all)"
)
parser.add_argument(
"--database-url",
help=f"Database URL (default: {DATABASE_URL})"
)
parser.add_argument(
"--output", "-o",
help="Output directory for reports (default: logs/audit)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output"
)
parser.add_argument(
"--update-seen",
action="store_true",
help="Update last_seen_at for active agents"
)
parser.add_argument(
"--json",
action="store_true",
help="Output only JSON (no console colors)"
)
args = parser.parse_args()
reports = []
if args.node == "all":
for node_key, config in NODE_CONFIG.items():
report = await audit_node(node_key, config, args)
reports.append(report)
else:
config = NODE_CONFIG.get(args.node)
if not config:
print(f"❌ Unknown node: {args.node}")
sys.exit(1)
report = await audit_node(args.node, config, args)
reports.append(report)
# Вивести JSON якщо потрібно
if args.json:
print(json.dumps(reports, indent=2))
# Підсумок
print("\n" + "=" * 70)
print("🎯 AUDIT COMPLETE")
for r in reports:
s = r["summary"]
status = "" if s["phantom_count"] == 0 and s["stale_count"] == 0 else "⚠️"
print(f" {status} {r['node_id']}: {s['active_count']} active, {s['phantom_count']} phantom, {s['stale_count']} stale")
print("=" * 70 + "\n")
if __name__ == "__main__":
asyncio.run(main())