""" CrewAI Registry Loader - Variant A (Profiles per Agent) Loads team configurations from crewai_teams.yml with profile support. """ import os import json import yaml import logging from pathlib import Path from functools import lru_cache logger = logging.getLogger(__name__) CREWAI_AGENTS_PATH = os.getenv("CREWAI_AGENTS_PATH", "/app/config/crewai_agents.json") CREWAI_TEAMS_PATH = os.getenv("CREWAI_TEAMS_PATH", "/app/config/crewai_teams.yml") ROLES_BASE_PATH = os.getenv("ROLES_BASE_PATH", "/app/config/roles") _teams_config = None _agents_config = None def load_agents_config(): """Load basic agent config from crewai_agents.json""" global _agents_config if _agents_config is None: try: with open(CREWAI_AGENTS_PATH, "r") as f: _agents_config = json.load(f) logger.info(f"Loaded agents config from {CREWAI_AGENTS_PATH}") except Exception as e: logger.error(f"Failed to load agents config: {e}") _agents_config = {} return _agents_config def load_teams_config(): """Load teams/profiles config from crewai_teams.yml""" global _teams_config if _teams_config is None: try: with open(CREWAI_TEAMS_PATH, "r") as f: _teams_config = yaml.safe_load(f) version = _teams_config.get("version", "unknown") logger.info(f"Loaded teams config v{version} from {CREWAI_TEAMS_PATH}") except Exception as e: logger.error(f"Failed to load teams config: {e}") _teams_config = {} return _teams_config def load_role_prompt(prompt_ref: str) -> str: """Load role prompt from .md file""" if not prompt_ref: return "" prompt_path = Path(ROLES_BASE_PATH) / prompt_ref try: if prompt_path.exists(): return prompt_path.read_text(encoding="utf-8") else: logger.warning(f"Role prompt not found: {prompt_path}") return f"# Role: {prompt_ref}\n(prompt file missing)" except Exception as e: logger.error(f"Error loading role prompt {prompt_ref}: {e}") return "" def get_agent_profiles(agent_id: str) -> list: """Get list of available profiles for an agent""" config = load_teams_config() agent_cfg = config.get(agent_id, {}) profiles = agent_cfg.get("profiles", {}) return list(profiles.keys()) def get_default_profile(agent_id: str) -> str: """Get default profile name for an agent""" config = load_teams_config() agent_cfg = config.get(agent_id, {}) return agent_cfg.get("default_profile", "default") def get_profile_hints(agent_id: str) -> dict: """Get profile selection hints (keywords) for an agent""" config = load_teams_config() agent_cfg = config.get(agent_id, {}) return agent_cfg.get("profile_hints", {}) def select_profile(agent_id: str, prompt: str) -> str: """Select appropriate profile based on prompt keywords""" hints = get_profile_hints(agent_id) prompt_lower = prompt.lower() for profile_name, keywords in hints.items(): for kw in keywords: if kw.lower() in prompt_lower: logger.info(f"Selected profile {profile_name} for {agent_id} (matched: {kw})") return profile_name return get_default_profile(agent_id) def get_profile_config(agent_id: str, profile: str = None) -> dict: """Get full profile configuration for an agent""" config = load_teams_config() agent_cfg = config.get(agent_id, {}) profiles = agent_cfg.get("profiles", {}) if profile is None: profile = get_default_profile(agent_id) return profiles.get(profile, {}) def get_team_members(agent_id: str, profile: str = None) -> list: """Get team members with resolved prompts""" profile_cfg = get_profile_config(agent_id, profile) team = profile_cfg.get("team", []) resolved = [] for member in team: resolved_member = dict(member) prompt_ref = member.get("system_prompt_ref", "") resolved_member["system_prompt"] = load_role_prompt(prompt_ref) resolved.append(resolved_member) return resolved def get_synthesis_config(agent_id: str, profile: str = None) -> dict: """Get synthesis config with resolved prompt""" profile_cfg = get_profile_config(agent_id, profile) synthesis = profile_cfg.get("synthesis", {}) if synthesis: prompt_ref = synthesis.get("system_prompt_ref", "") synthesis = dict(synthesis) synthesis["system_prompt"] = load_role_prompt(prompt_ref) return synthesis def get_team_settings(agent_id: str, profile: str = None) -> dict: """Get team execution settings""" profile_cfg = get_profile_config(agent_id, profile) return { "team_name": profile_cfg.get("team_name", f"{agent_id} team"), "parallel_roles": profile_cfg.get("parallel_roles", True), "max_concurrency": profile_cfg.get("max_concurrency", 3) } def get_delegation_config(agent_id: str, profile: str = None) -> dict: """Get delegation config for an agent""" profile_cfg = get_profile_config(agent_id, profile) return profile_cfg.get("delegation", {"enabled": False}) def can_delegate_to(agent_id: str, target_agent_id: str, profile: str = None) -> bool: """Check if agent can delegate to target""" deleg = get_delegation_config(agent_id, profile) if not deleg.get("enabled"): return False if deleg.get("forbid_self") and target_agent_id == agent_id: return False allowed = deleg.get("allow_top_level_agents", []) return target_agent_id in allowed def is_orchestrator(agent_id: str) -> bool: """Check if agent has orchestrator capability""" config = load_teams_config() return agent_id in config and "profiles" in config.get(agent_id, {}) def get_all_agents_summary() -> dict: """Get summary of all agents and their profiles""" config = load_teams_config() summary = {} skip_keys = ["schema_version", "version", "description"] for agent_id, agent_cfg in config.items(): if agent_id in skip_keys: continue if not isinstance(agent_cfg, dict): continue profiles = agent_cfg.get("profiles", {}) summary[agent_id] = { "profiles": list(profiles.keys()), "default_profile": agent_cfg.get("default_profile", "default"), "has_hints": bool(agent_cfg.get("profile_hints")) } # Add role counts per profile for pname, pcfg in profiles.items(): summary[agent_id][f"{pname}_roles"] = len(pcfg.get("team", [])) return summary