Files
microdao-daarion/services/crewai-service/app/registry_loader.py
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

203 lines
6.6 KiB
Python

"""
CrewAI Registry Loader - Variant A (Profiles per Agent)
Loads team configurations from crewai_teams.yml with profile support.
"""
import os
import json
import yaml
import logging
from pathlib import Path
from functools import lru_cache
logger = logging.getLogger(__name__)
CREWAI_AGENTS_PATH = os.getenv("CREWAI_AGENTS_PATH", "/app/config/crewai_agents.json")
CREWAI_TEAMS_PATH = os.getenv("CREWAI_TEAMS_PATH", "/app/config/crewai_teams.yml")
ROLES_BASE_PATH = os.getenv("ROLES_BASE_PATH", "/app/config/roles")
_teams_config = None
_agents_config = None
def load_agents_config():
"""Load basic agent config from crewai_agents.json"""
global _agents_config
if _agents_config is None:
try:
with open(CREWAI_AGENTS_PATH, "r") as f:
_agents_config = json.load(f)
logger.info(f"Loaded agents config from {CREWAI_AGENTS_PATH}")
except Exception as e:
logger.error(f"Failed to load agents config: {e}")
_agents_config = {}
return _agents_config
def load_teams_config():
"""Load teams/profiles config from crewai_teams.yml"""
global _teams_config
if _teams_config is None:
try:
with open(CREWAI_TEAMS_PATH, "r") as f:
_teams_config = yaml.safe_load(f)
version = _teams_config.get("version", "unknown")
logger.info(f"Loaded teams config v{version} from {CREWAI_TEAMS_PATH}")
except Exception as e:
logger.error(f"Failed to load teams config: {e}")
_teams_config = {}
return _teams_config
def load_role_prompt(prompt_ref: str) -> str:
"""Load role prompt from .md file"""
if not prompt_ref:
return ""
prompt_path = Path(ROLES_BASE_PATH) / prompt_ref
try:
if prompt_path.exists():
return prompt_path.read_text(encoding="utf-8")
else:
logger.warning(f"Role prompt not found: {prompt_path}")
return f"# Role: {prompt_ref}\n(prompt file missing)"
except Exception as e:
logger.error(f"Error loading role prompt {prompt_ref}: {e}")
return ""
def get_agent_profiles(agent_id: str) -> list:
"""Get list of available profiles for an agent"""
config = load_teams_config()
agent_cfg = config.get(agent_id, {})
profiles = agent_cfg.get("profiles", {})
return list(profiles.keys())
def get_default_profile(agent_id: str) -> str:
"""Get default profile name for an agent"""
config = load_teams_config()
agent_cfg = config.get(agent_id, {})
return agent_cfg.get("default_profile", "default")
def get_profile_hints(agent_id: str) -> dict:
"""Get profile selection hints (keywords) for an agent"""
config = load_teams_config()
agent_cfg = config.get(agent_id, {})
return agent_cfg.get("profile_hints", {})
def select_profile(agent_id: str, prompt: str) -> str:
"""Select appropriate profile based on prompt keywords"""
hints = get_profile_hints(agent_id)
prompt_lower = prompt.lower()
for profile_name, keywords in hints.items():
for kw in keywords:
if kw.lower() in prompt_lower:
logger.info(f"Selected profile {profile_name} for {agent_id} (matched: {kw})")
return profile_name
return get_default_profile(agent_id)
def get_profile_config(agent_id: str, profile: str = None) -> dict:
"""Get full profile configuration for an agent"""
config = load_teams_config()
agent_cfg = config.get(agent_id, {})
profiles = agent_cfg.get("profiles", {})
if profile is None:
profile = get_default_profile(agent_id)
return profiles.get(profile, {})
def get_team_members(agent_id: str, profile: str = None) -> list:
"""Get team members with resolved prompts"""
profile_cfg = get_profile_config(agent_id, profile)
team = profile_cfg.get("team", [])
resolved = []
for member in team:
resolved_member = dict(member)
prompt_ref = member.get("system_prompt_ref", "")
resolved_member["system_prompt"] = load_role_prompt(prompt_ref)
resolved.append(resolved_member)
return resolved
def get_synthesis_config(agent_id: str, profile: str = None) -> dict:
"""Get synthesis config with resolved prompt"""
profile_cfg = get_profile_config(agent_id, profile)
synthesis = profile_cfg.get("synthesis", {})
if synthesis:
prompt_ref = synthesis.get("system_prompt_ref", "")
synthesis = dict(synthesis)
synthesis["system_prompt"] = load_role_prompt(prompt_ref)
return synthesis
def get_team_settings(agent_id: str, profile: str = None) -> dict:
"""Get team execution settings"""
profile_cfg = get_profile_config(agent_id, profile)
return {
"team_name": profile_cfg.get("team_name", f"{agent_id} team"),
"parallel_roles": profile_cfg.get("parallel_roles", True),
"max_concurrency": profile_cfg.get("max_concurrency", 3)
}
def get_delegation_config(agent_id: str, profile: str = None) -> dict:
"""Get delegation config for an agent"""
profile_cfg = get_profile_config(agent_id, profile)
return profile_cfg.get("delegation", {"enabled": False})
def can_delegate_to(agent_id: str, target_agent_id: str, profile: str = None) -> bool:
"""Check if agent can delegate to target"""
deleg = get_delegation_config(agent_id, profile)
if not deleg.get("enabled"):
return False
if deleg.get("forbid_self") and target_agent_id == agent_id:
return False
allowed = deleg.get("allow_top_level_agents", [])
return target_agent_id in allowed
def is_orchestrator(agent_id: str) -> bool:
"""Check if agent has orchestrator capability"""
config = load_teams_config()
return agent_id in config and "profiles" in config.get(agent_id, {})
def get_all_agents_summary() -> dict:
"""Get summary of all agents and their profiles"""
config = load_teams_config()
summary = {}
skip_keys = ["schema_version", "version", "description"]
for agent_id, agent_cfg in config.items():
if agent_id in skip_keys:
continue
if not isinstance(agent_cfg, dict):
continue
profiles = agent_cfg.get("profiles", {})
summary[agent_id] = {
"profiles": list(profiles.keys()),
"default_profile": agent_cfg.get("default_profile", "default"),
"has_hints": bool(agent_cfg.get("profile_hints"))
}
# Add role counts per profile
for pname, pcfg in profiles.items():
summary[agent_id][f"{pname}_roles"] = len(pcfg.get("team", []))
return summary