#!/usr/bin/env python3 """ DAGI Agent Registry CLI Tool Usage: ./tools/agents list - List all agents ./tools/agents validate - Validate registry consistency ./tools/agents generate - Generate configs from registry ./tools/agents smoke --id - Run smoke test for agent """ import argparse import hashlib import json import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional try: import yaml except ImportError: print("ERROR: PyYAML required. Install: pip install pyyaml") sys.exit(1) # Paths BASE_DIR = Path(__file__).parent.parent REGISTRY_PATH = BASE_DIR / "config" / "agent_registry.yml" GATEWAY_DIR = BASE_DIR / "gateway-bot" ROUTER_CONFIG = BASE_DIR / "services" / "router" / "router-config.yml" CREWAI_DIR = BASE_DIR / "services" / "crewai-service" / "app" class Colors: GREEN = "\033[92m" RED = "\033[91m" YELLOW = "\033[93m" BLUE = "\033[94m" CYAN = "\033[96m" RESET = "\033[0m" BOLD = "\033[1m" def load_registry() -> Dict[str, Any]: if not REGISTRY_PATH.exists(): print(f"{Colors.RED}ERROR: Registry not found: {REGISTRY_PATH}{Colors.RESET}") sys.exit(1) with open(REGISTRY_PATH) as f: return yaml.safe_load(f) def cmd_list(args): registry = load_registry() agents = registry.get("agents", []) print(f"\n{Colors.BOLD}DAGI Agent Registry{Colors.RESET}") ver = registry.get('version', 'unknown') print(f"Version: {ver}") print(f"Total agents: {len(agents)}\n") top_level = [a for a in agents if a.get("class") == "top_level"] internal = [a for a in agents if a.get("class") == "internal"] print(f"{Colors.CYAN}=== TOP-LEVEL AGENTS ({len(top_level)}) ==={Colors.RESET}") header = f"{'ID':<15} {'Display':<20} {'Visibility':<10} {'Telegram':<10} Role" print(header) print("-" * 100) for a in top_level: vis_color = Colors.GREEN if a.get("visibility") == "public" else Colors.YELLOW role = a.get('canonical_role', '')[:45] aid = a['id'] dname = a.get('display_name', '') vis = a.get('visibility', '') tg = a.get('telegram_mode', '') print(f"{aid:<15} {dname:<20} {vis_color}{vis:<10}{Colors.RESET} {tg:<10} {role}...") print(f"\n{Colors.CYAN}=== INTERNAL AGENTS ({len(internal)}) ==={Colors.RESET}") header2 = f"{'ID':<15} {'Display':<20} {'Scope':<12} Role" print(header2) print("-" * 80) for a in internal: role = a.get('canonical_role', '')[:40] aid = a['id'] dname = a.get('display_name', '') scope = a.get('scope', '') print(f"{aid:<15} {dname:<20} {scope:<12} {role}...") print() def cmd_validate(args): registry = load_registry() agents = registry.get("agents", []) errors = [] warnings = [] print(f"\n{Colors.BOLD}Validating Agent Registry...{Colors.RESET}\n") required_fields = ["id", "display_name", "class", "visibility", "scope", "canonical_role", "domains", "routing", "llm_profile"] ids_seen = set() for agent in agents: agent_id = agent.get("id", "UNKNOWN") for field in required_fields: if field not in agent: errors.append(f"{agent_id}: Missing required field '{field}'") if agent_id in ids_seen: errors.append(f"{agent_id}: Duplicate agent ID") ids_seen.add(agent_id) agent_class = agent.get("class") if agent_class not in ["top_level", "internal"]: errors.append(f"{agent_id}: Invalid class '{agent_class}'") vis = agent.get("visibility") if vis not in ["public", "private", "internal"]: errors.append(f"{agent_id}: Invalid visibility '{vis}'") tg_mode = agent.get("telegram_mode") if tg_mode not in ["public", "whitelist", "off"]: errors.append(f"{agent_id}: Invalid telegram_mode '{tg_mode}'") llm_profile = agent.get("llm_profile") if llm_profile and llm_profile not in registry.get("llm_profiles", {}): warnings.append(f"{agent_id}: LLM profile '{llm_profile}' not defined in registry") if agent.get("class") == "top_level" and agent.get("telegram_mode") != "off": prompt_file = agent.get("prompt_file") if prompt_file: prompt_path = GATEWAY_DIR / prompt_file if not prompt_path.exists(): warnings.append(f"{agent_id}: Prompt file not found: {prompt_file}") routing = agent.get("routing", {}) if not routing.get("keywords"): warnings.append(f"{agent_id}: No routing keywords defined") if "handoff_contract" not in agent: warnings.append(f"{agent_id}: No handoff_contract defined") # Visibility enforcement checks if agent.get("visibility") == "private" and agent.get("telegram_mode") == "public": errors.append(f"{agent_id}: visibility=private but telegram_mode=public (must be whitelist/off)") if agent.get("scope") == "node_local" and "node_binding" not in agent: errors.append(f"{agent_id}: scope=node_local but missing node_binding") # Keywords minimum check for top_level if agent.get("class") == "top_level": kw_count = len(routing.get("keywords", [])) if kw_count < 3: warnings.append(f"{agent_id}: Only {kw_count} routing keywords (recommend >= 3)") # top_level must be CrewAI orchestrator crewai = agent.get("crewai", {}) if not crewai.get("orchestrator", False): errors.append(f"{agent_id}: top_level agent must have crewai.orchestrator=true") if errors: print(f"{Colors.RED}ERRORS ({len(errors)}):{Colors.RESET}") for e in errors: print(f" X {e}") if warnings: print(f"\n{Colors.YELLOW}WARNINGS ({len(warnings)}):{Colors.RESET}") for w in warnings: print(f" ! {w}") if not errors and not warnings: print(f"{Colors.GREEN}All validations passed!{Colors.RESET}") elif not errors: print(f"\n{Colors.GREEN}No errors (but {len(warnings)} warnings){Colors.RESET}") else: print(f"\n{Colors.RED}Validation failed with {len(errors)} errors{Colors.RESET}") sys.exit(1) print() def cmd_generate(args): registry = load_registry() agents = registry.get("agents", []) flags = registry.get("feature_flags", {}) print(f"\n{Colors.BOLD}Generating configs from registry...{Colors.RESET}\n") # Generate metadata try: git_commit = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], capture_output=True, text=True, cwd=BASE_DIR ).stdout.strip() except Exception: git_commit = "unknown" registry_hash = hashlib.sha256(str(registry).encode()).hexdigest()[:16] generated_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") generated_files = [] if flags.get("generate_prompts", True): gateway_registry = { "schema_version": registry.get("schema_version", 1), "version": registry.get("version"), "generated_at": generated_at, "git_commit": git_commit, "registry_fingerprint": registry_hash, "agents": {} } for agent in agents: if agent.get("telegram_mode") != "off": gateway_registry["agents"][agent["id"]] = { "display_name": agent.get("display_name"), "canonical_role": agent.get("canonical_role"), "prompt_file": agent.get("prompt_file"), "telegram_mode": agent.get("telegram_mode"), "visibility": agent.get("visibility"), "domains": agent.get("domains", []), "mentor": agent.get("mentor"), } gateway_json = GATEWAY_DIR / "agent_registry.json" with open(gateway_json, "w") as f: json.dump(gateway_registry, f, indent=2, ensure_ascii=False) generated_files.append(str(gateway_json)) print(f" {Colors.GREEN}OK{Colors.RESET} {gateway_json}") if flags.get("generate_router_config", True): router_agents = {} for agent in agents: dname = agent.get('display_name', '') crole = agent.get('canonical_role', '')[:50] router_agents[agent["id"]] = { "description": f"{dname} - {crole}", "default_llm": agent.get("llm_profile", "fast"), "routing_priority": agent.get("routing", {}).get("priority", 50), "keywords": agent.get("routing", {}).get("keywords", []), "domains": agent.get("domains", []), "class": agent.get("class"), "visibility": agent.get("visibility"), } router_json = BASE_DIR / "config" / "router_agents.json" with open(router_json, "w") as f: json.dump(router_agents, f, indent=2, ensure_ascii=False) generated_files.append(str(router_json)) print(f" {Colors.GREEN}OK{Colors.RESET} {router_json}") if flags.get("generate_crewai_config", True): crewai_config = { "orchestrators": [], "workers": [], "teams": {} } for agent in agents: crewai = agent.get("crewai", {}) if crewai.get("enabled", False): agent_entry = { "id": agent["id"], "display_name": agent.get("display_name"), "role": agent.get("canonical_role"), "can_orchestrate": crewai.get("orchestrator", False), "domains": agent.get("domains", []), } if crewai.get("orchestrator"): crewai_config["orchestrators"].append(agent_entry) else: crewai_config["workers"].append(agent_entry) if crewai.get("team"): dname = agent.get('display_name', '') crewai_config["teams"][agent["id"]] = { "team_name": f"{dname} Team", "members": crewai["team"] } crewai_json = BASE_DIR / "config" / "crewai_agents.json" with open(crewai_json, "w") as f: json.dump(crewai_config, f, indent=2, ensure_ascii=False) generated_files.append(str(crewai_json)) print(f" {Colors.GREEN}OK{Colors.RESET} {crewai_json}") print(f"\n{Colors.GREEN}Generated {len(generated_files)} files{Colors.RESET}\n") def cmd_smoke(args): registry = load_registry() agents = {a["id"]: a for a in registry.get("agents", [])} agent_id = args.id if agent_id not in agents: print(f"{Colors.RED}ERROR: Agent '{agent_id}' not found in registry{Colors.RESET}") print(f"Available: {list(agents.keys())}") sys.exit(1) agent = agents[agent_id] print(f"\n{Colors.BOLD}Smoke Test: {agent_id}{Colors.RESET}") print(f"Role: {agent.get('canonical_role')}") print(f"Visibility: {agent.get('visibility')}") print(f"Telegram: {agent.get('telegram_mode')}\n") tests_passed = 0 tests_total = 0 tests_total += 1 prompt_file = agent.get("prompt_file") if prompt_file: prompt_path = GATEWAY_DIR / prompt_file if prompt_path.exists(): print(f" {Colors.GREEN}OK{Colors.RESET} Prompt file exists: {prompt_file}") tests_passed += 1 else: print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file missing: {prompt_file}") else: if agent.get("telegram_mode") == "off": print(f" {Colors.GREEN}OK{Colors.RESET} No prompt file (telegram_mode=off)") tests_passed += 1 else: print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file not configured") tests_total += 1 keywords = agent.get("routing", {}).get("keywords", []) if keywords: print(f" {Colors.GREEN}OK{Colors.RESET} Routing keywords: {len(keywords)} defined") tests_passed += 1 else: print(f" {Colors.YELLOW}WARN{Colors.RESET} No routing keywords") tests_total += 1 crewai = agent.get("crewai", {}) if crewai.get("enabled"): team_size = len(crewai.get("team", [])) print(f" {Colors.GREEN}OK{Colors.RESET} CrewAI enabled (team: {team_size} members)") tests_passed += 1 else: print(f" {Colors.YELLOW}WARN{Colors.RESET} CrewAI disabled") print(f"\n{Colors.BOLD}Result: {tests_passed}/{tests_total} tests passed{Colors.RESET}\n") def cmd_check(args): """CI check: validate + generate + verify no drift.""" print(f"\n{Colors.BOLD}CI Check: validate + no-drift test{Colors.RESET}\n") # Step 1: Validate print(f"{Colors.CYAN}[1/3] Validating registry...{Colors.RESET}") try: cmd_validate(args) except SystemExit as e: if e.code != 0: print(f"{Colors.RED}CI Check FAILED: validation errors{Colors.RESET}") sys.exit(1) # Step 2: Generate print(f"{Colors.CYAN}[2/3] Generating configs...{Colors.RESET}") cmd_generate(args) # Step 3: Check for structural drift (ignore timestamp/git_commit) print(f"{Colors.CYAN}[3/3] Checking for structural drift...{Colors.RESET}") try: # Use git diff but ignore generated_at and git_commit lines result = subprocess.run( ["git", "diff", "-I", "generated_at", "-I", "git_commit", "--exit-code", "gateway-bot/agent_registry.json", "config/router_agents.json", "config/crewai_agents.json"], capture_output=True, text=True, cwd=BASE_DIR ) if result.returncode != 0: # Check if only timestamp/commit changed diff_lines = [l for l in result.stdout.split('\n') if l.startswith('+') or l.startswith('-')] structural_changes = [l for l in diff_lines if 'generated_at' not in l and 'git_commit' not in l and not l.startswith('+++') and not l.startswith('---')] if structural_changes: print(f"\n{Colors.RED}CI Check FAILED: Structural changes detected!{Colors.RESET}") print(f"Run 'python3 tools/agents generate' and commit the changes.") print(f"\nStructural diff:\n" + '\n'.join(structural_changes[:20])) sys.exit(1) else: print(f" {Colors.GREEN}OK{Colors.RESET} No structural drift (only timestamp/commit metadata)") else: print(f" {Colors.GREEN}OK{Colors.RESET} No drift detected") except Exception as e: print(f"{Colors.YELLOW}Warning: Could not check git diff: {e}{Colors.RESET}") print(f"\n{Colors.GREEN}CI Check PASSED{Colors.RESET}\n") def main(): parser = argparse.ArgumentParser(description="DAGI Agent Registry CLI") subparsers = parser.add_subparsers(dest="command", help="Commands") subparsers.add_parser("list", help="List all agents") subparsers.add_parser("validate", help="Validate registry") subparsers.add_parser("generate", help="Generate configs from registry") subparsers.add_parser("check", help="CI check: validate + no-drift test") smoke_parser = subparsers.add_parser("smoke", help="Run smoke test") smoke_parser.add_argument("--id", required=True, help="Agent ID") args = parser.parse_args() if args.command == "list": cmd_list(args) elif args.command == "validate": cmd_validate(args) elif args.command == "generate": cmd_generate(args) elif args.command == "check": cmd_check(args) elif args.command == "smoke": cmd_smoke(args) else: parser.print_help() if __name__ == "__main__": main()