Files

644 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""
DAGI Agent Registry CLI Tool
Usage:
./tools/agents list - List all agents
./tools/agents validate - Validate registry consistency
./tools/agents generate - Generate configs from registry
./tools/agents smoke --id <agent_id> - Run smoke test for agent
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
try:
import yaml
except ImportError:
print("ERROR: PyYAML required. Install: pip install pyyaml")
sys.exit(1)
# Paths
BASE_DIR = Path(__file__).parent.parent
REGISTRY_PATH = BASE_DIR / "config" / "agent_registry.yml"
GATEWAY_DIR = BASE_DIR / "gateway-bot"
ROUTER_CONFIG = BASE_DIR / "services" / "router" / "router-config.yml"
CREWAI_DIR = BASE_DIR / "services" / "crewai-service" / "app"
CREWAI_TEAMS_GENERATED = BASE_DIR / "config" / "crewai_teams.generated.yml"
class Colors:
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
def load_registry() -> Dict[str, Any]:
if not REGISTRY_PATH.exists():
print(f"{Colors.RED}ERROR: Registry not found: {REGISTRY_PATH}{Colors.RESET}")
sys.exit(1)
with open(REGISTRY_PATH) as f:
return yaml.safe_load(f)
def _slugify(value: str) -> str:
s = (value or "").strip().lower()
s = re.sub(r"[^a-z0-9]+", "_", s)
s = re.sub(r"_+", "_", s).strip("_")
return s or "role"
def _legacy_crewai_to_orchestration(agent: Dict[str, Any]) -> Dict[str, Any]:
"""
Backward-compatible adapter from legacy `crewai` block to new `orchestration`.
"""
legacy = agent.get("crewai", {}) or {}
enabled = bool(legacy.get("enabled", False))
orchestrator = bool(legacy.get("orchestrator", False))
team = legacy.get("team", []) or []
if not enabled or not orchestrator:
mode = "llm_only"
elif team:
mode = "hybrid"
else:
# legacy "enabled but no team" usually means orchestration via A2A only
mode = "hybrid"
default_profile = {
"team_name": f"{agent.get('display_name', agent.get('id', 'agent'))} Team",
"parallel_roles": True,
"max_concurrency": 3,
"synthesis": {
"role_context": f"{agent.get('display_name', agent.get('id', 'agent'))} Orchestrator",
"llm_profile": agent.get("llm_profile", "reasoning"),
},
"team": [],
"delegation": {
"enabled": bool(legacy.get("can_delegate_to_all", False)),
"forbid_self": True,
"max_hops": 2,
"allow_top_level_agents": [],
},
}
for member in team:
role_name = member.get("role", "") if isinstance(member, dict) else str(member)
default_profile["team"].append(
{
"id": _slugify(role_name),
"role_context": role_name,
"llm_profile": "reasoning",
}
)
return {
"mode": mode,
"crew": {
"enabled": enabled and orchestrator,
"default_profile": "default",
"profiles": {"default": default_profile},
},
"a2a": {
"enabled": bool(legacy.get("can_delegate_to_all", False)),
"allow_top_level_agents": ["all_top_level"] if legacy.get("can_delegate_to_all", False) else [],
"max_hops": 2,
"forbid_self": True,
},
"response_contract": {
"user_visible_speaker": "self",
"crew_roles_user_visible": False,
},
}
def get_orchestration(agent: Dict[str, Any]) -> Dict[str, Any]:
"""
Return normalized orchestration object.
Prefers `orchestration`, falls back to legacy `crewai`.
"""
if isinstance(agent.get("orchestration"), dict):
return agent["orchestration"]
return _legacy_crewai_to_orchestration(agent)
def get_default_profile_config(orchestration: Dict[str, Any]) -> Dict[str, Any]:
crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {}
profiles = crew.get("profiles", {}) if isinstance(crew, dict) else {}
default_profile = crew.get("default_profile", "default")
if isinstance(profiles, dict) and default_profile in profiles:
return profiles[default_profile] or {}
if isinstance(profiles, dict) and "default" in profiles:
return profiles["default"] or {}
return {}
def cmd_list(args):
registry = load_registry()
agents = registry.get("agents", [])
print(f"\n{Colors.BOLD}DAGI Agent Registry{Colors.RESET}")
ver = registry.get('version', 'unknown')
print(f"Version: {ver}")
print(f"Total agents: {len(agents)}\n")
top_level = [a for a in agents if a.get("class") == "top_level"]
internal = [a for a in agents if a.get("class") == "internal"]
print(f"{Colors.CYAN}=== TOP-LEVEL AGENTS ({len(top_level)}) ==={Colors.RESET}")
header = f"{'ID':<15} {'Display':<20} {'Visibility':<10} {'Telegram':<10} Role"
print(header)
print("-" * 100)
for a in top_level:
vis_color = Colors.GREEN if a.get("visibility") == "public" else Colors.YELLOW
role = a.get('canonical_role', '')[:45]
aid = a['id']
dname = a.get('display_name', '')
vis = a.get('visibility', '')
tg = a.get('telegram_mode', '')
print(f"{aid:<15} {dname:<20} {vis_color}{vis:<10}{Colors.RESET} {tg:<10} {role}...")
print(f"\n{Colors.CYAN}=== INTERNAL AGENTS ({len(internal)}) ==={Colors.RESET}")
header2 = f"{'ID':<15} {'Display':<20} {'Scope':<12} Role"
print(header2)
print("-" * 80)
for a in internal:
role = a.get('canonical_role', '')[:40]
aid = a['id']
dname = a.get('display_name', '')
scope = a.get('scope', '')
print(f"{aid:<15} {dname:<20} {scope:<12} {role}...")
print()
def cmd_validate(args):
registry = load_registry()
agents = registry.get("agents", [])
errors = []
warnings = []
print(f"\n{Colors.BOLD}Validating Agent Registry...{Colors.RESET}\n")
required_fields = ["id", "display_name", "class", "visibility", "scope",
"canonical_role", "domains", "routing", "llm_profile"]
ids_seen = set()
for agent in agents:
agent_id = agent.get("id", "UNKNOWN")
for field in required_fields:
if field not in agent:
errors.append(f"{agent_id}: Missing required field '{field}'")
if agent_id in ids_seen:
errors.append(f"{agent_id}: Duplicate agent ID")
ids_seen.add(agent_id)
agent_class = agent.get("class")
if agent_class not in ["top_level", "internal"]:
errors.append(f"{agent_id}: Invalid class '{agent_class}'")
vis = agent.get("visibility")
if vis not in ["public", "private", "internal"]:
errors.append(f"{agent_id}: Invalid visibility '{vis}'")
tg_mode = agent.get("telegram_mode")
if tg_mode not in ["public", "whitelist", "off"]:
errors.append(f"{agent_id}: Invalid telegram_mode '{tg_mode}'")
llm_profile = agent.get("llm_profile")
if llm_profile and llm_profile not in registry.get("llm_profiles", {}):
warnings.append(f"{agent_id}: LLM profile '{llm_profile}' not defined in registry")
if agent.get("class") == "top_level" and agent.get("telegram_mode") != "off":
prompt_file = agent.get("prompt_file")
if prompt_file:
prompt_path = GATEWAY_DIR / prompt_file
if not prompt_path.exists():
warnings.append(f"{agent_id}: Prompt file not found: {prompt_file}")
routing = agent.get("routing", {})
if not routing.get("keywords"):
warnings.append(f"{agent_id}: No routing keywords defined")
if "handoff_contract" not in agent:
warnings.append(f"{agent_id}: No handoff_contract defined")
# Visibility enforcement checks
if agent.get("visibility") == "private" and agent.get("telegram_mode") == "public":
errors.append(f"{agent_id}: visibility=private but telegram_mode=public (must be whitelist/off)")
if agent.get("scope") == "node_local" and "node_binding" not in agent:
errors.append(f"{agent_id}: scope=node_local but missing node_binding")
# Keywords minimum check for top_level
if agent.get("class") == "top_level":
kw_count = len(routing.get("keywords", []))
if kw_count < 3:
warnings.append(f"{agent_id}: Only {kw_count} routing keywords (recommend >= 3)")
orchestration = get_orchestration(agent)
mode = orchestration.get("mode", "llm_only")
crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {}
crew_enabled = bool(crew.get("enabled", False))
profiles = crew.get("profiles", {}) if isinstance(crew, dict) else {}
default_profile = crew.get("default_profile", "default")
if mode not in ["llm_only", "crew_only", "hybrid"]:
errors.append(f"{agent_id}: Invalid orchestration.mode '{mode}'")
if mode in ["crew_only", "hybrid"] and not crew_enabled:
errors.append(f"{agent_id}: mode={mode} requires orchestration.crew.enabled=true")
if crew_enabled:
if not isinstance(profiles, dict) or not profiles:
errors.append(f"{agent_id}: crew.enabled=true but no crew.profiles defined")
elif default_profile not in profiles:
errors.append(f"{agent_id}: default_profile '{default_profile}' missing in crew.profiles")
else:
p = profiles.get(default_profile) or {}
team = p.get("team", []) if isinstance(p, dict) else []
delegation = p.get("delegation", {}) if isinstance(p, dict) else {}
# allow delegation-only orchestrators, but otherwise team must exist
if not team and not delegation.get("enabled", False):
errors.append(
f"{agent_id}: default crew profile has empty team and delegation disabled "
f"(nothing to orchestrate)"
)
rc = orchestration.get("response_contract", {}) if isinstance(orchestration, dict) else {}
if rc and rc.get("crew_roles_user_visible", False):
errors.append(f"{agent_id}: response_contract.crew_roles_user_visible must be false")
if errors:
print(f"{Colors.RED}ERRORS ({len(errors)}):{Colors.RESET}")
for e in errors:
print(f" X {e}")
if warnings:
print(f"\n{Colors.YELLOW}WARNINGS ({len(warnings)}):{Colors.RESET}")
for w in warnings:
print(f" ! {w}")
if not errors and not warnings:
print(f"{Colors.GREEN}All validations passed!{Colors.RESET}")
elif not errors:
print(f"\n{Colors.GREEN}No errors (but {len(warnings)} warnings){Colors.RESET}")
else:
print(f"\n{Colors.RED}Validation failed with {len(errors)} errors{Colors.RESET}")
sys.exit(1)
print()
def cmd_generate(args):
registry = load_registry()
agents = registry.get("agents", [])
flags = registry.get("feature_flags", {})
print(f"\n{Colors.BOLD}Generating configs from registry...{Colors.RESET}\n")
# Generate metadata
try:
git_commit = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
capture_output=True, text=True, cwd=BASE_DIR
).stdout.strip()
except Exception:
git_commit = "unknown"
registry_hash = hashlib.sha256(str(registry).encode()).hexdigest()[:16]
generated_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
generated_files = []
if flags.get("generate_prompts", True):
gateway_registry = {
"schema_version": registry.get("schema_version", 1),
"version": registry.get("version"),
"generated_at": generated_at,
"git_commit": git_commit,
"registry_fingerprint": registry_hash,
"agents": {}
}
for agent in agents:
if agent.get("telegram_mode") != "off":
gateway_registry["agents"][agent["id"]] = {
"display_name": agent.get("display_name"),
"canonical_role": agent.get("canonical_role"),
"prompt_file": agent.get("prompt_file"),
"telegram_mode": agent.get("telegram_mode"),
"visibility": agent.get("visibility"),
"domains": agent.get("domains", []),
"mentor": agent.get("mentor"),
}
gateway_json = GATEWAY_DIR / "agent_registry.json"
with open(gateway_json, "w") as f:
json.dump(gateway_registry, f, indent=2, ensure_ascii=False)
generated_files.append(str(gateway_json))
print(f" {Colors.GREEN}OK{Colors.RESET} {gateway_json}")
if flags.get("generate_router_config", True):
router_agents = {}
for agent in agents:
dname = agent.get('display_name', '')
crole = agent.get('canonical_role', '')[:50]
router_agents[agent["id"]] = {
"description": f"{dname} - {crole}",
"default_llm": agent.get("llm_profile", "fast"),
"routing_priority": agent.get("routing", {}).get("priority", 50),
"keywords": agent.get("routing", {}).get("keywords", []),
"domains": agent.get("domains", []),
"class": agent.get("class"),
"visibility": agent.get("visibility"),
}
router_json = BASE_DIR / "config" / "router_agents.json"
with open(router_json, "w") as f:
json.dump(router_agents, f, indent=2, ensure_ascii=False)
generated_files.append(str(router_json))
print(f" {Colors.GREEN}OK{Colors.RESET} {router_json}")
if flags.get("generate_crewai_config", True):
crewai_config = {
"orchestrators": [],
"workers": [],
"teams": {}
}
existing_crewai = {}
existing_crewai_path = BASE_DIR / "config" / "crewai_agents.json"
if existing_crewai_path.exists():
try:
with open(existing_crewai_path, "r", encoding="utf-8") as f:
existing_crewai = json.load(f)
except Exception:
existing_crewai = {}
for agent in agents:
has_explicit_orchestration = isinstance(agent.get("orchestration"), dict)
if has_explicit_orchestration:
orchestration = get_orchestration(agent)
mode = orchestration.get("mode", "llm_only")
crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {}
crew_enabled = bool(crew.get("enabled", False)) and mode in ["crew_only", "hybrid"]
else:
# Strict backward compatibility for legacy registry entries.
legacy = agent.get("crewai", {}) or {}
mode = "hybrid" if legacy.get("enabled", False) else "llm_only"
crew_enabled = bool(legacy.get("enabled", False))
if crew_enabled:
agent_entry = {
"id": agent["id"],
"display_name": agent.get("display_name"),
"role": agent.get("canonical_role"),
"can_orchestrate": bool(
get_orchestration(agent).get("mode", "llm_only") != "llm_only"
if has_explicit_orchestration
else (agent.get("crewai", {}) or {}).get("orchestrator", False)
),
"domains": agent.get("domains", []),
}
if has_explicit_orchestration:
is_orchestrator = agent.get("class") == "top_level"
else:
is_orchestrator = bool((agent.get("crewai", {}) or {}).get("orchestrator", False))
if is_orchestrator:
crewai_config["orchestrators"].append(agent_entry)
else:
crewai_config["workers"].append(agent_entry)
if has_explicit_orchestration:
orchestration = get_orchestration(agent)
profile_cfg = get_default_profile_config(orchestration)
team_members = profile_cfg.get("team", []) if isinstance(profile_cfg, dict) else []
team_name = profile_cfg.get("team_name", f"{agent.get('display_name', agent['id'])} Team")
if team_members:
# Router needs lightweight list; keep role names for compatibility.
members_summary = []
for m in team_members:
if isinstance(m, dict):
members_summary.append(
{
"role": m.get("role_context", m.get("id", "role")),
"skills": m.get("skills", []),
}
)
else:
members_summary.append({"role": str(m), "skills": []})
crewai_config["teams"][agent["id"]] = {
"team_name": team_name,
"members": members_summary,
}
else:
# Preserve legacy team payload (including skills) if present.
legacy_team = (agent.get("crewai", {}) or {}).get("team", [])
if existing_crewai.get("teams", {}).get(agent["id"]):
# Keep pre-existing generated team shape to avoid accidental shrinking.
crewai_config["teams"][agent["id"]] = existing_crewai["teams"][agent["id"]]
elif legacy_team:
crewai_config["teams"][agent["id"]] = {
"team_name": f"{agent.get('display_name', agent['id'])} Team",
"members": legacy_team,
}
crewai_json = BASE_DIR / "config" / "crewai_agents.json"
with open(crewai_json, "w") as f:
json.dump(crewai_config, f, indent=2, ensure_ascii=False)
generated_files.append(str(crewai_json))
print(f" {Colors.GREEN}OK{Colors.RESET} {crewai_json}")
if flags.get("generate_crewai_teams", False):
teams_doc = {
"schema_version": 1,
"version": registry.get("version", "generated"),
"description": "Generated from config/agent_registry.yml (orchestration.crew.*)",
}
for agent in agents:
if agent.get("class") != "top_level":
continue
# Canary-safe generation: only agents with explicit orchestration block
# are emitted to generated teams file.
if not isinstance(agent.get("orchestration"), dict):
continue
orchestration = get_orchestration(agent)
mode = orchestration.get("mode", "llm_only")
crew = orchestration.get("crew", {}) if isinstance(orchestration, dict) else {}
crew_enabled = bool(crew.get("enabled", False)) and mode in ["crew_only", "hybrid"]
if not crew_enabled:
continue
profiles = crew.get("profiles", {})
if not isinstance(profiles, dict) or not profiles:
continue
teams_doc[agent["id"]] = {
"profiles": profiles,
"default_profile": crew.get("default_profile", "default"),
}
hints = crew.get("profile_hints")
if hints:
teams_doc[agent["id"]]["profile_hints"] = hints
with open(CREWAI_TEAMS_GENERATED, "w") as f:
yaml.safe_dump(teams_doc, f, sort_keys=False, allow_unicode=True)
generated_files.append(str(CREWAI_TEAMS_GENERATED))
print(f" {Colors.GREEN}OK{Colors.RESET} {CREWAI_TEAMS_GENERATED}")
print(f"\n{Colors.GREEN}Generated {len(generated_files)} files{Colors.RESET}\n")
def cmd_smoke(args):
registry = load_registry()
agents = {a["id"]: a for a in registry.get("agents", [])}
agent_id = args.id
if agent_id not in agents:
print(f"{Colors.RED}ERROR: Agent '{agent_id}' not found in registry{Colors.RESET}")
print(f"Available: {list(agents.keys())}")
sys.exit(1)
agent = agents[agent_id]
print(f"\n{Colors.BOLD}Smoke Test: {agent_id}{Colors.RESET}")
print(f"Role: {agent.get('canonical_role')}")
print(f"Visibility: {agent.get('visibility')}")
print(f"Telegram: {agent.get('telegram_mode')}\n")
tests_passed = 0
tests_total = 0
tests_total += 1
prompt_file = agent.get("prompt_file")
if prompt_file:
prompt_path = GATEWAY_DIR / prompt_file
if prompt_path.exists():
print(f" {Colors.GREEN}OK{Colors.RESET} Prompt file exists: {prompt_file}")
tests_passed += 1
else:
print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file missing: {prompt_file}")
else:
if agent.get("telegram_mode") == "off":
print(f" {Colors.GREEN}OK{Colors.RESET} No prompt file (telegram_mode=off)")
tests_passed += 1
else:
print(f" {Colors.RED}FAIL{Colors.RESET} Prompt file not configured")
tests_total += 1
keywords = agent.get("routing", {}).get("keywords", [])
if keywords:
print(f" {Colors.GREEN}OK{Colors.RESET} Routing keywords: {len(keywords)} defined")
tests_passed += 1
else:
print(f" {Colors.YELLOW}WARN{Colors.RESET} No routing keywords")
tests_total += 1
crewai = agent.get("crewai", {})
if crewai.get("enabled"):
team_size = len(crewai.get("team", []))
print(f" {Colors.GREEN}OK{Colors.RESET} CrewAI enabled (team: {team_size} members)")
tests_passed += 1
else:
print(f" {Colors.YELLOW}WARN{Colors.RESET} CrewAI disabled")
print(f"\n{Colors.BOLD}Result: {tests_passed}/{tests_total} tests passed{Colors.RESET}\n")
def cmd_check(args):
"""CI check: validate + generate + verify no drift."""
print(f"\n{Colors.BOLD}CI Check: validate + no-drift test{Colors.RESET}\n")
# Step 1: Validate
print(f"{Colors.CYAN}[1/3] Validating registry...{Colors.RESET}")
try:
cmd_validate(args)
except SystemExit as e:
if e.code != 0:
print(f"{Colors.RED}CI Check FAILED: validation errors{Colors.RESET}")
sys.exit(1)
# Step 2: Generate
print(f"{Colors.CYAN}[2/3] Generating configs...{Colors.RESET}")
cmd_generate(args)
# Step 3: Check for structural drift (ignore timestamp/git_commit)
print(f"{Colors.CYAN}[3/3] Checking for structural drift...{Colors.RESET}")
try:
# Use git diff but ignore generated_at and git_commit lines
result = subprocess.run(
["git", "diff",
"-I", "generated_at",
"-I", "git_commit",
"--exit-code",
"gateway-bot/agent_registry.json",
"config/router_agents.json",
"config/crewai_agents.json"],
capture_output=True, text=True, cwd=BASE_DIR
)
if result.returncode != 0:
# Check if only timestamp/commit changed
diff_lines = [l for l in result.stdout.split('\n')
if l.startswith('+') or l.startswith('-')]
structural_changes = [l for l in diff_lines
if 'generated_at' not in l and 'git_commit' not in l
and not l.startswith('+++') and not l.startswith('---')]
if structural_changes:
print(f"\n{Colors.RED}CI Check FAILED: Structural changes detected!{Colors.RESET}")
print(f"Run 'python3 tools/agents generate' and commit the changes.")
print(f"\nStructural diff:\n" + '\n'.join(structural_changes[:20]))
sys.exit(1)
else:
print(f" {Colors.GREEN}OK{Colors.RESET} No structural drift (only timestamp/commit metadata)")
else:
print(f" {Colors.GREEN}OK{Colors.RESET} No drift detected")
except Exception as e:
print(f"{Colors.YELLOW}Warning: Could not check git diff: {e}{Colors.RESET}")
print(f"\n{Colors.GREEN}CI Check PASSED{Colors.RESET}\n")
def main():
parser = argparse.ArgumentParser(description="DAGI Agent Registry CLI")
subparsers = parser.add_subparsers(dest="command", help="Commands")
subparsers.add_parser("list", help="List all agents")
subparsers.add_parser("validate", help="Validate registry")
subparsers.add_parser("generate", help="Generate configs from registry")
subparsers.add_parser("check", help="CI check: validate + no-drift test")
smoke_parser = subparsers.add_parser("smoke", help="Run smoke test")
smoke_parser.add_argument("--id", required=True, help="Agent ID")
args = parser.parse_args()
if args.command == "list":
cmd_list(args)
elif args.command == "validate":
cmd_validate(args)
elif args.command == "generate":
cmd_generate(args)
elif args.command == "check":
cmd_check(args)
elif args.command == "smoke":
cmd_smoke(args)
else:
parser.print_help()
if __name__ == "__main__":
main()