Files
microdao-daarion/services/router/intent_router.py
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

162 lines
6.1 KiB
Python

"""
DAARION Platform - Intent Router
Routes messages to appropriate agents based on content analysis
"""
import re
import yaml
from typing import Optional, Dict, List, Tuple
from pathlib import Path
import structlog
logger = structlog.get_logger()
class IntentRouter:
"""Routes messages to agents based on intent detection"""
def __init__(self, registry_path: str = "agents_registry.yaml"):
self.registry_path = Path(registry_path)
self.registry = self._load_registry()
self.hard_routes = self._compile_hard_routes()
self.intent_keywords = self._build_keyword_index()
logger.info("intent_router_initialized",
agents=len(self.registry.get("agents", {})),
hard_routes=len(self.hard_routes))
def _load_registry(self) -> Dict:
"""Load agent registry from YAML"""
if not self.registry_path.exists():
logger.warning("registry_not_found", path=str(self.registry_path))
return {"agents": {}, "routing": {}}
with open(self.registry_path) as f:
return yaml.safe_load(f)
def _compile_hard_routes(self) -> List[Tuple[re.Pattern, str]]:
"""Compile regex patterns for hard routes"""
routes = []
for route in self.registry.get("routing", {}).get("hard_routes", []):
pattern = re.compile(route["pattern"], re.IGNORECASE)
routes.append((pattern, route["agent"]))
return routes
def _build_keyword_index(self) -> Dict[str, List[Tuple[str, float]]]:
"""Build keyword → agent mapping with confidence"""
index = {}
for route in self.registry.get("routing", {}).get("intent_routes", []):
agent = route["agent"]
threshold = route.get("confidence_threshold", 0.5)
for keyword in route["keywords"]:
kw = keyword.lower()
if kw not in index:
index[kw] = []
index[kw].append((agent, threshold))
return index
def route(self, message: str, source_agent: Optional[str] = None) -> Tuple[str, float, str]:
"""
Route message to appropriate agent.
Returns:
(agent_id, confidence, reason)
"""
message_lower = message.lower().strip()
# 1. Check hard routes first (commands)
for pattern, agent in self.hard_routes:
if pattern.match(message):
logger.info("hard_route_matched", agent=agent, pattern=pattern.pattern)
return (agent, 1.0, "hard_route")
# 2. Intent-based routing (keyword matching)
agent_scores = {}
matched_keywords = {}
for keyword, agents in self.intent_keywords.items():
if keyword in message_lower:
for agent, threshold in agents:
if agent not in agent_scores:
agent_scores[agent] = 0.0
matched_keywords[agent] = []
agent_scores[agent] += threshold
matched_keywords[agent].append(keyword)
if agent_scores:
# Normalize scores
max_score = max(agent_scores.values())
for agent in agent_scores:
agent_scores[agent] /= max(1, len(matched_keywords[agent]))
# Select best agent
best_agent = max(agent_scores, key=agent_scores.get)
confidence = min(agent_scores[best_agent], 1.0)
# Check if active
if self.is_agent_active(best_agent):
logger.info("intent_route_matched",
agent=best_agent,
confidence=confidence,
keywords=matched_keywords[best_agent])
return (best_agent, confidence, f"keywords: {matched_keywords[best_agent][:3]}")
# 3. Fallback
fallback = self.registry.get("routing", {}).get("fallback", {})
fallback_agent = fallback.get("agent", "helion")
logger.info("fallback_route", agent=fallback_agent)
return (fallback_agent, 0.3, "fallback")
def is_agent_active(self, agent_id: str) -> bool:
"""Check if agent is active"""
agent = self.registry.get("agents", {}).get(agent_id)
return agent and agent.get("active", False)
def get_agent_config(self, agent_id: str) -> Optional[Dict]:
"""Get full agent configuration"""
return self.registry.get("agents", {}).get(agent_id)
def get_memory_policy(self, agent_id: str) -> Dict:
"""Get agent memory access policy"""
agent = self.get_agent_config(agent_id)
if not agent:
return {}
return agent.get("memory_policy", {})
def get_data_policy(self, agent_id: str) -> Dict:
"""Get agent data sharing policy"""
agent = self.get_agent_config(agent_id)
if not agent:
return {}
return agent.get("data_policy", {})
def can_handoff(self, source_agent: str, target_agent: str) -> bool:
"""Check if handoff is allowed between agents"""
policy = self.get_data_policy(source_agent)
allowed = policy.get("handoff_allow", [])
return target_agent in allowed
def get_rate_limit(self, agent_id: str) -> int:
"""Get rate limit (requests per minute) for agent"""
agent = self.get_agent_config(agent_id)
if not agent:
return 30 # default
return agent.get("sla", {}).get("rate_limit_rpm", 30)
def list_active_agents(self) -> List[str]:
"""List all active agents"""
return [
agent_id
for agent_id, config in self.registry.get("agents", {}).items()
if config.get("active", False)
]
# Singleton instance
_router_instance = None
def get_intent_router() -> IntentRouter:
"""Get or create intent router instance"""
global _router_instance
if _router_instance is None:
_router_instance = IntentRouter()
return _router_instance