#!/usr/bin/env python3 """ DAGI Agent Registry CLI Tool Usage: ./tools/agents list - List all agents ./tools/agents validate - Validate registry consistency ./tools/agents generate - Generate configs from registry ./tools/agents smoke --id - Run smoke test for agent """ import argparse import hashlib import json import os import subprocess import sys import re from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional try: import yaml except ImportError: print("ERROR: PyYAML required. Install: pip install pyyaml") sys.exit(1) # Paths BASE_DIR = Path(__file__).parent.parent REGISTRY_PATH = BASE_DIR / "config" / "agent_registry.yml" GATEWAY_DIR = BASE_DIR / "gateway-bot" ROUTER_CONFIG = BASE_DIR / "services" / "router" / "router-config.yml" CREWAI_DIR = BASE_DIR / "services" / "crewai-service" / "app" CREWAI_TEAMS_GENERATED = BASE_DIR / "config" / "crewai_teams.generated.yml" class Colors: GREEN = "\033[92m" RED = "\033[91m" YELLOW = "\033[93m" BLUE = "\033[94m" CYAN = "\033[96m" RESET = "\033[0m" BOLD = "\033[1m" def load_registry() -> Dict[str, Any]: if not REGISTRY_PATH.exists(): print(f"{Colors.RED}ERROR: Registry not found: {REGISTRY_PATH}{Colors.RESET}") sys.exit(1) with open(REGISTRY_PATH) as f: return yaml.safe_load(f) def _slugify(value: str) -> str: s = (value or "").strip().lower() s = re.sub(r"[^a-z0-9]+", "_", s) s = re.sub(r"_+", "_", s).strip("_") return s or "role" def _legacy_crewai_to_orchestration(agent: Dict[str, Any]) -> Dict[str, Any]: """ Backward-compatible adapter from legacy `crewai` block to new `orchestration`. """ legacy = agent.get("crewai", {}) or {} enabled = bool(legacy.get("enabled", False)) orchestrator = bool(legacy.get("orchestrator", False)) team = legacy.get("team", []) or [] if not enabled or not orchestrator: mode = "llm_only" elif team: mode = "hybrid" else: # legacy "enabled but no team" usually means orchestration via A2A only mode = "hybrid" default_profile = { "team_name": f"{agent.get('display_name', agent.get('id', 'agent'))} Team", "parallel_roles": True, "max_concurrency": 3, "synthesis": { "role_context": f"{agent.get('display_name', agent.get('id', 'agent'))} Orchestrator", "llm_profile": agent.get("llm_profile", "reasoning"), }, "team": [], "delegation": { "enabled": bool(legacy.get("can_delegate_to_all", False)), "forbid_self": True, "max_hops": 2, "allow_top_level_agents": [], }, } for member in team: role_name = member.get("role", "") if isinstance(member, dict) else str(member) default_profile["team"].append( { "id": _slugify(role_name), "role_context": role_name, "llm_profile": "reasoning", } ) return { "mode": mode, "crew": { "enabled": enabled and orchestrator, "default_profile": "default", "profiles": {"default": default_profile}, }, "a2a": { "enabled": bool(legacy.get("can_delegate_to_all", False)), "allow_top_level_agents": ["all_top_level"] if legacy.get("can_delegate_to_all", False) else [], "max_hops": 2, "forbid_self": True, }, "response_contract": { "user_visible_speaker": "self", "crew_roles_user_visible": False, }, } def get_orchestration(agent: Dict[str, Any]) -> Dict[str, Any]: """ Return normalized orchestration object. Prefers `orchestration`, falls back to legacy `crewai`. """ if isinstance(agent.get("orchestration"), dict): return agent["orchestration"] return _legacy_crewai_to_orchestration(agent) def get_default_profile_config(orchestration: Dict[str, Any]) -> Dict[str, Any]: crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {} profiles = crew.get("profiles", {}) if isinstance(crew, dict) else {} default_profile = crew.get("default_profile", "default") if isinstance(profiles, dict) and default_profile in profiles: return profiles[default_profile] or {} if isinstance(profiles, dict) and "default" in profiles: return profiles["default"] or {} return {} def cmd_list(args): registry = load_registry() agents = registry.get("agents", []) print(f"\n{Colors.BOLD}DAGI Agent Registry{Colors.RESET}") ver = registry.get('version', 'unknown') print(f"Version: {ver}") print(f"Total agents: {len(agents)}\n") top_level = [a for a in agents if a.get("class") == "top_level"] internal = [a for a in agents if a.get("class") == "internal"] print(f"{Colors.CYAN}=== TOP-LEVEL AGENTS ({len(top_level)}) ==={Colors.RESET}") header = f"{'ID':<15} {'Display':<20} {'Visibility':<10} {'Telegram':<10} Role" print(header) print("-" * 100) for a in top_level: vis_color = Colors.GREEN if a.get("visibility") == "public" else Colors.YELLOW role = a.get('canonical_role', '')[:45] aid = a['id'] dname = a.get('display_name', '') vis = a.get('visibility', '') tg = a.get('telegram_mode', '') print(f"{aid:<15} {dname:<20} {vis_color}{vis:<10}{Colors.RESET} {tg:<10} {role}...") print(f"\n{Colors.CYAN}=== INTERNAL AGENTS ({len(internal)}) ==={Colors.RESET}") header2 = f"{'ID':<15} {'Display':<20} {'Scope':<12} Role" print(header2) print("-" * 80) for a in internal: role = a.get('canonical_role', '')[:40] aid = a['id'] dname = a.get('display_name', '') scope = a.get('scope', '') print(f"{aid:<15} {dname:<20} {scope:<12} {role}...") print() def cmd_validate(args): registry = load_registry() agents = registry.get("agents", []) errors = [] warnings = [] print(f"\n{Colors.BOLD}Validating Agent Registry...{Colors.RESET}\n") required_fields = ["id", "display_name", "class", "visibility", "scope", "canonical_role", "domains", "routing", "llm_profile"] ids_seen = set() for agent in agents: agent_id = agent.get("id", "UNKNOWN") for field in required_fields: if field not in agent: errors.append(f"{agent_id}: Missing required field '{field}'") if agent_id in ids_seen: errors.append(f"{agent_id}: Duplicate agent ID") ids_seen.add(agent_id) agent_class = agent.get("class") if agent_class not in ["top_level", "internal"]: errors.append(f"{agent_id}: Invalid class '{agent_class}'") vis = agent.get("visibility") if vis not in ["public", "private", "internal"]: errors.append(f"{agent_id}: Invalid visibility '{vis}'") tg_mode = agent.get("telegram_mode") if tg_mode not in ["public", "whitelist", "off"]: errors.append(f"{agent_id}: Invalid telegram_mode '{tg_mode}'") llm_profile = agent.get("llm_profile") if llm_profile and llm_profile not in registry.get("llm_profiles", {}): warnings.append(f"{agent_id}: LLM profile '{llm_profile}' not defined in registry") if agent.get("class") == "top_level" and agent.get("telegram_mode") != "off": prompt_file = agent.get("prompt_file") if prompt_file: prompt_path = GATEWAY_DIR / prompt_file if not prompt_path.exists(): warnings.append(f"{agent_id}: Prompt file not found: {prompt_file}") routing = agent.get("routing", {}) if not routing.get("keywords"): warnings.append(f"{agent_id}: No routing keywords defined") if "handoff_contract" not in agent: warnings.append(f"{agent_id}: No handoff_contract defined") # Visibility enforcement checks if agent.get("visibility") == "private" and agent.get("telegram_mode") == "public": errors.append(f"{agent_id}: visibility=private but telegram_mode=public (must be whitelist/off)") if agent.get("scope") == "node_local" and "node_binding" not in agent: errors.append(f"{agent_id}: scope=node_local but missing node_binding") # Keywords minimum check for top_level if agent.get("class") == "top_level": kw_count = len(routing.get("keywords", [])) if kw_count < 3: warnings.append(f"{agent_id}: Only {kw_count} routing keywords (recommend >= 3)") orchestration = get_orchestration(agent) mode = orchestration.get("mode", "llm_only") crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {} crew_enabled = bool(crew.get("enabled", False)) profiles = crew.get("profiles", {}) if isinstance(crew, dict) else {} default_profile = crew.get("default_profile", "default") if mode not in ["llm_only", "crew_only", "hybrid"]: errors.append(f"{agent_id}: Invalid orchestration.mode '{mode}'") if mode in ["crew_only", "hybrid"] and not crew_enabled: errors.append(f"{agent_id}: mode={mode} requires orchestration.crew.enabled=true") if crew_enabled: if not isinstance(profiles, dict) or not profiles: errors.append(f"{agent_id}: crew.enabled=true but no crew.profiles defined") elif default_profile not in profiles: errors.append(f"{agent_id}: default_profile '{default_profile}' missing in crew.profiles") else: p = profiles.get(default_profile) or {} team = p.get("team", []) if isinstance(p, dict) else [] delegation = p.get("delegation", {}) if isinstance(p, dict) else {} # allow delegation-only orchestrators, but otherwise team must exist if not team and not delegation.get("enabled", False): errors.append( f"{agent_id}: default crew profile has empty team and delegation disabled " f"(nothing to orchestrate)" ) rc = orchestration.get("response_contract", {}) if isinstance(orchestration, dict) else {} if rc and rc.get("crew_roles_user_visible", False): errors.append(f"{agent_id}: response_contract.crew_roles_user_visible must be false") if errors: print(f"{Colors.RED}ERRORS ({len(errors)}):{Colors.RESET}") for e in errors: print(f" X {e}") if warnings: print(f"\n{Colors.YELLOW}WARNINGS ({len(warnings)}):{Colors.RESET}") for w in warnings: print(f" ! {w}") if not errors and not warnings: print(f"{Colors.GREEN}All validations passed!{Colors.RESET}") elif not errors: print(f"\n{Colors.GREEN}No errors (but {len(warnings)} warnings){Colors.RESET}") else: print(f"\n{Colors.RED}Validation failed with {len(errors)} errors{Colors.RESET}") sys.exit(1) print() def cmd_generate(args): registry = load_registry() agents = registry.get("agents", []) flags = registry.get("feature_flags", {}) print(f"\n{Colors.BOLD}Generating configs from registry...{Colors.RESET}\n") # Generate metadata try: git_commit = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], capture_output=True, text=True, cwd=BASE_DIR ).stdout.strip() except Exception: git_commit = "unknown" registry_hash = hashlib.sha256(str(registry).encode()).hexdigest()[:16] generated_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") generated_files = [] if flags.get("generate_prompts", True): gateway_registry = { "schema_version": registry.get("schema_version", 1), "version": registry.get("version"), "generated_at": generated_at, "git_commit": git_commit, "registry_fingerprint": registry_hash, "agents": {} } for agent in agents: if agent.get("telegram_mode") != "off": gateway_registry["agents"][agent["id"]] = { "display_name": agent.get("display_name"), "canonical_role": agent.get("canonical_role"), "prompt_file": agent.get("prompt_file"), "telegram_mode": agent.get("telegram_mode"), "visibility": agent.get("visibility"), "domains": agent.get("domains", []), "mentor": agent.get("mentor"), } gateway_json = GATEWAY_DIR / "agent_registry.json" with open(gateway_json, "w") as f: json.dump(gateway_registry, f, indent=2, ensure_ascii=False) generated_files.append(str(gateway_json)) print(f" {Colors.GREEN}OK{Colors.RESET} {gateway_json}") if flags.get("generate_router_config", True): router_agents = {} for agent in agents: dname = agent.get('display_name', '') crole = agent.get('canonical_role', '')[:50] router_agents[agent["id"]] = { "description": f"{dname} - {crole}", "default_llm": agent.get("llm_profile", "fast"), "routing_priority": agent.get("routing", {}).get("priority", 50), "keywords": agent.get("routing", {}).get("keywords", []), "domains": agent.get("domains", []), "class": agent.get("class"), "visibility": agent.get("visibility"), } router_json = BASE_DIR / "config" / "router_agents.json" with open(router_json, "w") as f: json.dump(router_agents, f, indent=2, ensure_ascii=False) generated_files.append(str(router_json)) print(f" {Colors.GREEN}OK{Colors.RESET} {router_json}") if flags.get("generate_crewai_config", True): crewai_config = { "orchestrators": [], "workers": [], "teams": {} } existing_crewai = {} existing_crewai_path = BASE_DIR / "config" / "crewai_agents.json" if existing_crewai_path.exists(): try: with open(existing_crewai_path, "r", encoding="utf-8") as f: existing_crewai = json.load(f) except Exception: existing_crewai = {} for agent in agents: has_explicit_orchestration = isinstance(agent.get("orchestration"), dict) if has_explicit_orchestration: orchestration = get_orchestration(agent) mode = orchestration.get("mode", "llm_only") crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {} crew_enabled = bool(crew.get("enabled", False)) and mode in ["crew_only", "hybrid"] else: # Strict backward compatibility for legacy registry entries. legacy = agent.get("crewai", {}) or {} mode = "hybrid" if legacy.get("enabled", False) else "llm_only" crew_enabled = bool(legacy.get("enabled", False)) if crew_enabled: agent_entry = { "id": agent["id"], "display_name": agent.get("display_name"), "role": agent.get("canonical_role"), "can_orchestrate": bool( get_orchestration(agent).get("mode", "llm_only") != "llm_only" if has_explicit_orchestration else (agent.get("crewai", {}) or {}).get("orchestrator", False) ), "domains": agent.get("domains", []), } if has_explicit_orchestration: is_orchestrator = agent.get("class") == "top_level" else: is_orchestrator = bool((agent.get("crewai", {}) or {}).get("orchestrator", False)) if is_orchestrator: crewai_config["orchestrators"].append(agent_entry) else: crewai_config["workers"].append(agent_entry) if has_explicit_orchestration: orchestration = get_orchestration(agent) profile_cfg = get_default_profile_config(orchestration) team_members = profile_cfg.get("team", []) if isinstance(profile_cfg, dict) else [] team_name = profile_cfg.get("team_name", f"{agent.get('display_name', agent['id'])} Team") if team_members: # Router needs lightweight list; keep role names for compatibility. members_summary = [] for m in team_members: if isinstance(m, dict): members_summary.append( { "role": m.get("role_context", m.get("id", "role")), "skills": m.get("skills", []), } ) else: members_summary.append({"role": str(m), "skills": []}) crewai_config["teams"][agent["id"]] = { "team_name": team_name, "members": members_summary, } else: # Preserve legacy team payload (including skills) if present. legacy_team = (agent.get("crewai", {}) or {}).get("team", []) if existing_crewai.get("teams", {}).get(agent["id"]): # Keep pre-existing generated team shape to avoid accidental shrinking. crewai_config["teams"][agent["id"]] = existing_crewai["teams"][agent["id"]] elif legacy_team: crewai_config["teams"][agent["id"]] = { "team_name": f"{agent.get('display_name', agent['id'])} Team", "members": legacy_team, } crewai_json = BASE_DIR / "config" / "crewai_agents.json" with open(crewai_json, "w") as f: json.dump(crewai_config, f, indent=2, ensure_ascii=False) generated_files.append(str(crewai_json)) print(f" {Colors.GREEN}OK{Colors.RESET} {crewai_json}") if flags.get("generate_crewai_teams", False): teams_doc = { "schema_version": 1, "version": registry.get("version", "generated"), "description": "Generated from config/agent_registry.yml (orchestration.crew.*)", } for agent in agents: if agent.get("class") != "top_level": continue # Canary-safe generation: only agents with explicit orchestration block # are emitted to generated teams file. if not isinstance(agent.get("orchestration"), dict): continue orchestration = get_orchestration(agent) mode = orchestration.get("mode", "llm_only") crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {} crew_enabled = bool(crew.get("enabled", False)) and mode in ["crew_only", "hybrid"] if not crew_enabled: continue profiles = crew.get("profiles", {}) if not isinstance(profiles, dict) or not profiles: continue teams_doc[agent["id"]] = { "profiles": profiles, "default_profile": crew.get("default_profile", "default"), } hints = crew.get("profile_hints") if hints: teams_doc[agent["id"]]["profile_hints"] = hints with open(CREWAI_TEAMS_GENERATED, "w") as f: yaml.safe_dump(teams_doc, f, sort_keys=False, allow_unicode=True) generated_files.append(str(CREWAI_TEAMS_GENERATED)) print(f" {Colors.GREEN}OK{Colors.RESET} {CREWAI_TEAMS_GENERATED}") print(f"\n{Colors.GREEN}Generated {len(generated_files)} files{Colors.RESET}\n") def cmd_smoke(args): registry = load_registry() agents = {a["id"]: a for a in registry.get("agents", [])} agent_id = args.id if agent_id not in agents: print(f"{Colors.RED}ERROR: Agent '{agent_id}' not found in registry{Colors.RESET}") print(f"Available: {list(agents.keys())}") sys.exit(1) agent = agents[agent_id] print(f"\n{Colors.BOLD}Smoke Test: {agent_id}{Colors.RESET}") print(f"Role: {agent.get('canonical_role')}") print(f"Visibility: {agent.get('visibility')}") print(f"Telegram: {agent.get('telegram_mode')}\n") tests_passed = 0 tests_total = 0 tests_total += 1 prompt_file = agent.get("prompt_file") if prompt_file: prompt_path = GATEWAY_DIR / prompt_file if prompt_path.exists(): print(f" {Colors.GREEN}OK{Colors.RESET} Prompt file exists: {prompt_file}") tests_passed += 1 else: print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file missing: {prompt_file}") else: if agent.get("telegram_mode") == "off": print(f" {Colors.GREEN}OK{Colors.RESET} No prompt file (telegram_mode=off)") tests_passed += 1 else: print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file not configured") tests_total += 1 keywords = agent.get("routing", {}).get("keywords", []) if keywords: print(f" {Colors.GREEN}OK{Colors.RESET} Routing keywords: {len(keywords)} defined") tests_passed += 1 else: print(f" {Colors.YELLOW}WARN{Colors.RESET} No routing keywords") tests_total += 1 crewai = agent.get("crewai", {}) if crewai.get("enabled"): team_size = len(crewai.get("team", [])) print(f" {Colors.GREEN}OK{Colors.RESET} CrewAI enabled (team: {team_size} members)") tests_passed += 1 else: print(f" {Colors.YELLOW}WARN{Colors.RESET} CrewAI disabled") print(f"\n{Colors.BOLD}Result: {tests_passed}/{tests_total} tests passed{Colors.RESET}\n") def cmd_check(args): """CI check: validate + generate + verify no drift.""" print(f"\n{Colors.BOLD}CI Check: validate + no-drift test{Colors.RESET}\n") # Step 1: Validate print(f"{Colors.CYAN}[1/3] Validating registry...{Colors.RESET}") try: cmd_validate(args) except SystemExit as e: if e.code != 0: print(f"{Colors.RED}CI Check FAILED: validation errors{Colors.RESET}") sys.exit(1) # Step 2: Generate print(f"{Colors.CYAN}[2/3] Generating configs...{Colors.RESET}") cmd_generate(args) # Step 3: Check for structural drift (ignore timestamp/git_commit) print(f"{Colors.CYAN}[3/3] Checking for structural drift...{Colors.RESET}") try: # Use git diff but ignore generated_at and git_commit lines result = subprocess.run( ["git", "diff", "-I", "generated_at", "-I", "git_commit", "--exit-code", "gateway-bot/agent_registry.json", "config/router_agents.json", "config/crewai_agents.json"], capture_output=True, text=True, cwd=BASE_DIR ) if result.returncode != 0: # Check if only timestamp/commit changed diff_lines = [l for l in result.stdout.split('\n') if l.startswith('+') or l.startswith('-')] structural_changes = [l for l in diff_lines if 'generated_at' not in l and 'git_commit' not in l and not l.startswith('+++') and not l.startswith('---')] if structural_changes: print(f"\n{Colors.RED}CI Check FAILED: Structural changes detected!{Colors.RESET}") print(f"Run 'python3 tools/agents generate' and commit the changes.") print(f"\nStructural diff:\n" + '\n'.join(structural_changes[:20])) sys.exit(1) else: print(f" {Colors.GREEN}OK{Colors.RESET} No structural drift (only timestamp/commit metadata)") else: print(f" {Colors.GREEN}OK{Colors.RESET} No drift detected") except Exception as e: print(f"{Colors.YELLOW}Warning: Could not check git diff: {e}{Colors.RESET}") print(f"\n{Colors.GREEN}CI Check PASSED{Colors.RESET}\n") def main(): parser = argparse.ArgumentParser(description="DAGI Agent Registry CLI") subparsers = parser.add_subparsers(dest="command", help="Commands") subparsers.add_parser("list", help="List all agents") subparsers.add_parser("validate", help="Validate registry") subparsers.add_parser("generate", help="Generate configs from registry") subparsers.add_parser("check", help="CI check: validate + no-drift test") smoke_parser = subparsers.add_parser("smoke", help="Run smoke test") smoke_parser.add_argument("--id", required=True, help="Agent ID") args = parser.parse_args() if args.command == "list": cmd_list(args) elif args.command == "validate": cmd_validate(args) elif args.command == "generate": cmd_generate(args) elif args.command == "check": cmd_check(args) elif args.command == "smoke": cmd_smoke(args) else: parser.print_help() if __name__ == "__main__": main()