Files
microdao-daarion/tools/agents
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

431 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DAGI Agent Registry CLI Tool
Usage:
./tools/agents list - List all agents
./tools/agents validate - Validate registry consistency
./tools/agents generate - Generate configs from registry
./tools/agents smoke --id <agent_id> - Run smoke test for agent
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
try:
import yaml
except ImportError:
print("ERROR: PyYAML required. Install: pip install pyyaml")
sys.exit(1)
# Paths
BASE_DIR = Path(__file__).parent.parent
REGISTRY_PATH = BASE_DIR / "config" / "agent_registry.yml"
GATEWAY_DIR = BASE_DIR / "gateway-bot"
ROUTER_CONFIG = BASE_DIR / "services" / "router" / "router-config.yml"
CREWAI_DIR = BASE_DIR / "services" / "crewai-service" / "app"
class Colors:
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def load_registry() -> Dict[str, Any]:
if not REGISTRY_PATH.exists():
print(f"{Colors.RED}ERROR: Registry not found: {REGISTRY_PATH}{Colors.RESET}")
sys.exit(1)
with open(REGISTRY_PATH) as f:
return yaml.safe_load(f)
def cmd_list(args):
registry = load_registry()
agents = registry.get("agents", [])
print(f"\n{Colors.BOLD}DAGI Agent Registry{Colors.RESET}")
ver = registry.get('version', 'unknown')
print(f"Version: {ver}")
print(f"Total agents: {len(agents)}\n")
top_level = [a for a in agents if a.get("class") == "top_level"]
internal = [a for a in agents if a.get("class") == "internal"]
print(f"{Colors.CYAN}=== TOP-LEVEL AGENTS ({len(top_level)}) ==={Colors.RESET}")
header = f"{'ID':<15} {'Display':<20} {'Visibility':<10} {'Telegram':<10} Role"
print(header)
print("-" * 100)
for a in top_level:
vis_color = Colors.GREEN if a.get("visibility") == "public" else Colors.YELLOW
role = a.get('canonical_role', '')[:45]
aid = a['id']
dname = a.get('display_name', '')
vis = a.get('visibility', '')
tg = a.get('telegram_mode', '')
print(f"{aid:<15} {dname:<20} {vis_color}{vis:<10}{Colors.RESET} {tg:<10} {role}...")
print(f"\n{Colors.CYAN}=== INTERNAL AGENTS ({len(internal)}) ==={Colors.RESET}")
header2 = f"{'ID':<15} {'Display':<20} {'Scope':<12} Role"
print(header2)
print("-" * 80)
for a in internal:
role = a.get('canonical_role', '')[:40]
aid = a['id']
dname = a.get('display_name', '')
scope = a.get('scope', '')
print(f"{aid:<15} {dname:<20} {scope:<12} {role}...")
print()
def cmd_validate(args):
registry = load_registry()
agents = registry.get("agents", [])
errors = []
warnings = []
print(f"\n{Colors.BOLD}Validating Agent Registry...{Colors.RESET}\n")
required_fields = ["id", "display_name", "class", "visibility", "scope",
"canonical_role", "domains", "routing", "llm_profile"]
ids_seen = set()
for agent in agents:
agent_id = agent.get("id", "UNKNOWN")
for field in required_fields:
if field not in agent:
errors.append(f"{agent_id}: Missing required field '{field}'")
if agent_id in ids_seen:
errors.append(f"{agent_id}: Duplicate agent ID")
ids_seen.add(agent_id)
agent_class = agent.get("class")
if agent_class not in ["top_level", "internal"]:
errors.append(f"{agent_id}: Invalid class '{agent_class}'")
vis = agent.get("visibility")
if vis not in ["public", "private", "internal"]:
errors.append(f"{agent_id}: Invalid visibility '{vis}'")
tg_mode = agent.get("telegram_mode")
if tg_mode not in ["public", "whitelist", "off"]:
errors.append(f"{agent_id}: Invalid telegram_mode '{tg_mode}'")
llm_profile = agent.get("llm_profile")
if llm_profile and llm_profile not in registry.get("llm_profiles", {}):
warnings.append(f"{agent_id}: LLM profile '{llm_profile}' not defined in registry")
if agent.get("class") == "top_level" and agent.get("telegram_mode") != "off":
prompt_file = agent.get("prompt_file")
if prompt_file:
prompt_path = GATEWAY_DIR / prompt_file
if not prompt_path.exists():
warnings.append(f"{agent_id}: Prompt file not found: {prompt_file}")
routing = agent.get("routing", {})
if not routing.get("keywords"):
warnings.append(f"{agent_id}: No routing keywords defined")
if "handoff_contract" not in agent:
warnings.append(f"{agent_id}: No handoff_contract defined")
# Visibility enforcement checks
if agent.get("visibility") == "private" and agent.get("telegram_mode") == "public":
errors.append(f"{agent_id}: visibility=private but telegram_mode=public (must be whitelist/off)")
if agent.get("scope") == "node_local" and "node_binding" not in agent:
errors.append(f"{agent_id}: scope=node_local but missing node_binding")
# Keywords minimum check for top_level
if agent.get("class") == "top_level":
kw_count = len(routing.get("keywords", []))
if kw_count < 3:
warnings.append(f"{agent_id}: Only {kw_count} routing keywords (recommend >= 3)")
# top_level must be CrewAI orchestrator
crewai = agent.get("crewai", {})
if not crewai.get("orchestrator", False):
errors.append(f"{agent_id}: top_level agent must have crewai.orchestrator=true")
if errors:
print(f"{Colors.RED}ERRORS ({len(errors)}):{Colors.RESET}")
for e in errors:
print(f" X {e}")
if warnings:
print(f"\n{Colors.YELLOW}WARNINGS ({len(warnings)}):{Colors.RESET}")
for w in warnings:
print(f" ! {w}")
if not errors and not warnings:
print(f"{Colors.GREEN}All validations passed!{Colors.RESET}")
elif not errors:
print(f"\n{Colors.GREEN}No errors (but {len(warnings)} warnings){Colors.RESET}")
else:
print(f"\n{Colors.RED}Validation failed with {len(errors)} errors{Colors.RESET}")
sys.exit(1)
print()
def cmd_generate(args):
registry = load_registry()
agents = registry.get("agents", [])
flags = registry.get("feature_flags", {})
print(f"\n{Colors.BOLD}Generating configs from registry...{Colors.RESET}\n")
# Generate metadata
try:
git_commit = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
capture_output=True, text=True, cwd=BASE_DIR
).stdout.strip()
except Exception:
git_commit = "unknown"
registry_hash = hashlib.sha256(str(registry).encode()).hexdigest()[:16]
generated_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
generated_files = []
if flags.get("generate_prompts", True):
gateway_registry = {
"schema_version": registry.get("schema_version", 1),
"version": registry.get("version"),
"generated_at": generated_at,
"git_commit": git_commit,
"registry_fingerprint": registry_hash,
"agents": {}
}
for agent in agents:
if agent.get("telegram_mode") != "off":
gateway_registry["agents"][agent["id"]] = {
"display_name": agent.get("display_name"),
"canonical_role": agent.get("canonical_role"),
"prompt_file": agent.get("prompt_file"),
"telegram_mode": agent.get("telegram_mode"),
"visibility": agent.get("visibility"),
"domains": agent.get("domains", []),
"mentor": agent.get("mentor"),
}
gateway_json = GATEWAY_DIR / "agent_registry.json"
with open(gateway_json, "w") as f:
json.dump(gateway_registry, f, indent=2, ensure_ascii=False)
generated_files.append(str(gateway_json))
print(f" {Colors.GREEN}OK{Colors.RESET} {gateway_json}")
if flags.get("generate_router_config", True):
router_agents = {}
for agent in agents:
dname = agent.get('display_name', '')
crole = agent.get('canonical_role', '')[:50]
router_agents[agent["id"]] = {
"description": f"{dname} - {crole}",
"default_llm": agent.get("llm_profile", "fast"),
"routing_priority": agent.get("routing", {}).get("priority", 50),
"keywords": agent.get("routing", {}).get("keywords", []),
"domains": agent.get("domains", []),
"class": agent.get("class"),
"visibility": agent.get("visibility"),
}
router_json = BASE_DIR / "config" / "router_agents.json"
with open(router_json, "w") as f:
json.dump(router_agents, f, indent=2, ensure_ascii=False)
generated_files.append(str(router_json))
print(f" {Colors.GREEN}OK{Colors.RESET} {router_json}")
if flags.get("generate_crewai_config", True):
crewai_config = {
"orchestrators": [],
"workers": [],
"teams": {}
}
for agent in agents:
crewai = agent.get("crewai", {})
if crewai.get("enabled", False):
agent_entry = {
"id": agent["id"],
"display_name": agent.get("display_name"),
"role": agent.get("canonical_role"),
"can_orchestrate": crewai.get("orchestrator", False),
"domains": agent.get("domains", []),
}
if crewai.get("orchestrator"):
crewai_config["orchestrators"].append(agent_entry)
else:
crewai_config["workers"].append(agent_entry)
if crewai.get("team"):
dname = agent.get('display_name', '')
crewai_config["teams"][agent["id"]] = {
"team_name": f"{dname} Team",
"members": crewai["team"]
}
crewai_json = BASE_DIR / "config" / "crewai_agents.json"
with open(crewai_json, "w") as f:
json.dump(crewai_config, f, indent=2, ensure_ascii=False)
generated_files.append(str(crewai_json))
print(f" {Colors.GREEN}OK{Colors.RESET} {crewai_json}")
print(f"\n{Colors.GREEN}Generated {len(generated_files)} files{Colors.RESET}\n")
def cmd_smoke(args):
registry = load_registry()
agents = {a["id"]: a for a in registry.get("agents", [])}
agent_id = args.id
if agent_id not in agents:
print(f"{Colors.RED}ERROR: Agent '{agent_id}' not found in registry{Colors.RESET}")
print(f"Available: {list(agents.keys())}")
sys.exit(1)
agent = agents[agent_id]
print(f"\n{Colors.BOLD}Smoke Test: {agent_id}{Colors.RESET}")
print(f"Role: {agent.get('canonical_role')}")
print(f"Visibility: {agent.get('visibility')}")
print(f"Telegram: {agent.get('telegram_mode')}\n")
tests_passed = 0
tests_total = 0
tests_total += 1
prompt_file = agent.get("prompt_file")
if prompt_file:
prompt_path = GATEWAY_DIR / prompt_file
if prompt_path.exists():
print(f" {Colors.GREEN}OK{Colors.RESET} Prompt file exists: {prompt_file}")
tests_passed += 1
else:
print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file missing: {prompt_file}")
else:
if agent.get("telegram_mode") == "off":
print(f" {Colors.GREEN}OK{Colors.RESET} No prompt file (telegram_mode=off)")
tests_passed += 1
else:
print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file not configured")
tests_total += 1
keywords = agent.get("routing", {}).get("keywords", [])
if keywords:
print(f" {Colors.GREEN}OK{Colors.RESET} Routing keywords: {len(keywords)} defined")
tests_passed += 1
else:
print(f" {Colors.YELLOW}WARN{Colors.RESET} No routing keywords")
tests_total += 1
crewai = agent.get("crewai", {})
if crewai.get("enabled"):
team_size = len(crewai.get("team", []))
print(f" {Colors.GREEN}OK{Colors.RESET} CrewAI enabled (team: {team_size} members)")
tests_passed += 1
else:
print(f" {Colors.YELLOW}WARN{Colors.RESET} CrewAI disabled")
print(f"\n{Colors.BOLD}Result: {tests_passed}/{tests_total} tests passed{Colors.RESET}\n")
def cmd_check(args):
"""CI check: validate + generate + verify no drift."""
print(f"\n{Colors.BOLD}CI Check: validate + no-drift test{Colors.RESET}\n")
# Step 1: Validate
print(f"{Colors.CYAN}[1/3] Validating registry...{Colors.RESET}")
try:
cmd_validate(args)
except SystemExit as e:
if e.code != 0:
print(f"{Colors.RED}CI Check FAILED: validation errors{Colors.RESET}")
sys.exit(1)
# Step 2: Generate
print(f"{Colors.CYAN}[2/3] Generating configs...{Colors.RESET}")
cmd_generate(args)
# Step 3: Check for structural drift (ignore timestamp/git_commit)
print(f"{Colors.CYAN}[3/3] Checking for structural drift...{Colors.RESET}")
try:
# Use git diff but ignore generated_at and git_commit lines
result = subprocess.run(
["git", "diff",
"-I", "generated_at",
"-I", "git_commit",
"--exit-code",
"gateway-bot/agent_registry.json",
"config/router_agents.json",
"config/crewai_agents.json"],
capture_output=True, text=True, cwd=BASE_DIR
)
if result.returncode != 0:
# Check if only timestamp/commit changed
diff_lines = [l for l in result.stdout.split('\n')
if l.startswith('+') or l.startswith('-')]
structural_changes = [l for l in diff_lines
if 'generated_at' not in l and 'git_commit' not in l
and not l.startswith('+++') and not l.startswith('---')]
if structural_changes:
print(f"\n{Colors.RED}CI Check FAILED: Structural changes detected!{Colors.RESET}")
print(f"Run 'python3 tools/agents generate' and commit the changes.")
print(f"\nStructural diff:\n" + '\n'.join(structural_changes[:20]))
sys.exit(1)
else:
print(f" {Colors.GREEN}OK{Colors.RESET} No structural drift (only timestamp/commit metadata)")
else:
print(f" {Colors.GREEN}OK{Colors.RESET} No drift detected")
except Exception as e:
print(f"{Colors.YELLOW}Warning: Could not check git diff: {e}{Colors.RESET}")
print(f"\n{Colors.GREEN}CI Check PASSED{Colors.RESET}\n")
def main():
parser = argparse.ArgumentParser(description="DAGI Agent Registry CLI")
subparsers = parser.add_subparsers(dest="command", help="Commands")
subparsers.add_parser("list", help="List all agents")
subparsers.add_parser("validate", help="Validate registry")
subparsers.add_parser("generate", help="Generate configs from registry")
subparsers.add_parser("check", help="CI check: validate + no-drift test")
smoke_parser = subparsers.add_parser("smoke", help="Run smoke test")
smoke_parser.add_argument("--id", required=True, help="Agent ID")
args = parser.parse_args()
if args.command == "list":
cmd_list(args)
elif args.command == "validate":
cmd_validate(args)
elif args.command == "generate":
cmd_generate(args)
elif args.command == "check":
cmd_check(args)
elif args.command == "smoke":
cmd_smoke(args)
else:
parser.print_help()
if __name__ == "__main__":
main()