Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.
Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles
Excluded from snapshot: venv/, .env, data/, backups, .tgz archives
Co-authored-by: Cursor <cursoragent@cursor.com>
131 lines
3.5 KiB
Python
131 lines
3.5 KiB
Python
"""
|
|
Agent Registry Loader
|
|
Loads agent configurations from centralized registry JSON.
|
|
Feature flag controlled - if disabled, returns None and caller uses legacy config.
|
|
"""
|
|
import json
|
|
import os
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Feature flag - can be overridden by environment variable
|
|
REGISTRY_ENABLED = os.getenv("AGENT_REGISTRY_ENABLED", "true").lower() == "true"
|
|
|
|
# Path to generated registry JSON
|
|
REGISTRY_PATH = Path(__file__).parent / "agent_registry.json"
|
|
|
|
|
|
def load_registry() -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Load agent registry from JSON file.
|
|
Returns None if registry is disabled or file not found.
|
|
"""
|
|
if not REGISTRY_ENABLED:
|
|
logger.info("Agent registry disabled by feature flag")
|
|
return None
|
|
|
|
if not REGISTRY_PATH.exists():
|
|
logger.warning(f"Registry file not found: {REGISTRY_PATH}")
|
|
return None
|
|
|
|
try:
|
|
with open(REGISTRY_PATH) as f:
|
|
data = json.load(f)
|
|
logger.info(f"Loaded agent registry v{data.get('version', 'unknown')} with {len(data.get('agents', {}))} agents")
|
|
return data
|
|
except Exception as e:
|
|
logger.error(f"Failed to load registry: {e}")
|
|
return None
|
|
|
|
|
|
def get_agent_info(agent_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get agent info from registry.
|
|
Returns None if registry disabled or agent not found.
|
|
"""
|
|
registry = load_registry()
|
|
if not registry:
|
|
return None
|
|
|
|
return registry.get("agents", {}).get(agent_id)
|
|
|
|
|
|
def get_all_agents() -> Dict[str, Any]:
|
|
"""
|
|
Get all agents from registry.
|
|
Returns empty dict if registry disabled.
|
|
"""
|
|
registry = load_registry()
|
|
if not registry:
|
|
return {}
|
|
|
|
return registry.get("agents", {})
|
|
|
|
|
|
def is_agent_visible(agent_id: str, mode: str = "telegram") -> bool:
|
|
"""
|
|
Check if agent should be visible in given mode.
|
|
|
|
Args:
|
|
agent_id: Agent identifier
|
|
mode: "telegram" | "api" | "internal"
|
|
|
|
Returns True if:
|
|
- Registry disabled (fallback to legacy)
|
|
- Agent telegram_mode is "public" or "whitelist" for telegram mode
|
|
- Agent visibility is not "internal" for api mode
|
|
"""
|
|
info = get_agent_info(agent_id)
|
|
if not info:
|
|
return True # Fallback: allow if not in registry
|
|
|
|
if mode == "telegram":
|
|
return info.get("telegram_mode") in ["public", "whitelist"]
|
|
elif mode == "api":
|
|
return info.get("visibility") != "internal"
|
|
|
|
return True
|
|
|
|
|
|
def get_canonical_role(agent_id: str) -> Optional[str]:
|
|
"""Get canonical role description for agent."""
|
|
info = get_agent_info(agent_id)
|
|
if info:
|
|
return info.get("canonical_role")
|
|
return None
|
|
|
|
|
|
def get_agent_domains(agent_id: str) -> list:
|
|
"""Get agent domains for routing hints."""
|
|
info = get_agent_info(agent_id)
|
|
if info:
|
|
return info.get("domains", [])
|
|
return []
|
|
|
|
|
|
# Singleton cache
|
|
_registry_cache = None
|
|
_cache_loaded = False
|
|
|
|
|
|
def get_cached_registry() -> Optional[Dict[str, Any]]:
|
|
"""Get cached registry (loads once per process)."""
|
|
global _registry_cache, _cache_loaded
|
|
|
|
if not _cache_loaded:
|
|
_registry_cache = load_registry()
|
|
_cache_loaded = True
|
|
|
|
return _registry_cache
|
|
|
|
|
|
def reload_registry():
|
|
"""Force reload registry (e.g., after tools/agents generate)."""
|
|
global _registry_cache, _cache_loaded
|
|
_cache_loaded = False
|
|
_registry_cache = None
|
|
return get_cached_registry()
|