Files
microdao-daarion/services/router/registry_loader.py
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

100 lines
2.5 KiB
Python

"""
Router Agent Registry Loader
Loads agent routing configs from centralized registry JSON.
Feature flag controlled.
"""
import json
import os
import logging
from pathlib import Path
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
REGISTRY_ENABLED = os.getenv("AGENT_REGISTRY_ENABLED", "true").lower() == "true"
REGISTRY_PATH = Path(__file__).parent.parent.parent / "config" / "router_agents.json"
_cache = None
_loaded = False
def load_registry() -> Optional[Dict[str, Any]]:
"""Load router agents from registry JSON."""
global _cache, _loaded
if _loaded:
return _cache
if not REGISTRY_ENABLED:
logger.info("Router registry disabled by feature flag")
_loaded = True
return None
if not REGISTRY_PATH.exists():
logger.warning(f"Router registry not found: {REGISTRY_PATH}")
_loaded = True
return None
try:
with open(REGISTRY_PATH) as f:
_cache = json.load(f)
logger.info(f"Router registry loaded: {len(_cache)} agents")
_loaded = True
return _cache
except Exception as e:
logger.error(f"Failed to load router registry: {e}")
_loaded = True
return None
def get_agent_routing(agent_id: str) -> Optional[Dict[str, Any]]:
"""Get routing config for agent."""
registry = load_registry()
if not registry:
return None
return registry.get(agent_id)
def get_agent_keywords(agent_id: str) -> List[str]:
"""Get routing keywords for agent."""
config = get_agent_routing(agent_id)
if config:
return config.get("keywords", [])
return []
def get_agent_domains(agent_id: str) -> List[str]:
"""Get domains for agent."""
config = get_agent_routing(agent_id)
if config:
return config.get("domains", [])
return []
def get_all_visible_agents() -> List[str]:
"""Get list of visible (non-internal) agents."""
registry = load_registry()
if not registry:
return []
return [
agent_id for agent_id, config in registry.items()
if config.get("visibility") != "internal"
]
def get_agent_llm_profile(agent_id: str) -> str:
"""Get LLM profile for agent."""
config = get_agent_routing(agent_id)
if config:
return config.get("default_llm", "fast")
return "fast"
def reload_registry():
"""Force reload registry."""
global _cache, _loaded
_cache = None
_loaded = False
return load_registry()