""" DAARION Platform - Intent Router Routes messages to appropriate agents based on content analysis """ import re import yaml from typing import Optional, Dict, List, Tuple from pathlib import Path import structlog logger = structlog.get_logger() class IntentRouter: """Routes messages to agents based on intent detection""" def __init__(self, registry_path: str = "agents_registry.yaml"): self.registry_path = Path(registry_path) self.registry = self._load_registry() self.hard_routes = self._compile_hard_routes() self.intent_keywords = self._build_keyword_index() logger.info("intent_router_initialized", agents=len(self.registry.get("agents", {})), hard_routes=len(self.hard_routes)) def _load_registry(self) -> Dict: """Load agent registry from YAML""" if not self.registry_path.exists(): logger.warning("registry_not_found", path=str(self.registry_path)) return {"agents": {}, "routing": {}} with open(self.registry_path) as f: return yaml.safe_load(f) def _compile_hard_routes(self) -> List[Tuple[re.Pattern, str]]: """Compile regex patterns for hard routes""" routes = [] for route in self.registry.get("routing", {}).get("hard_routes", []): pattern = re.compile(route["pattern"], re.IGNORECASE) routes.append((pattern, route["agent"])) return routes def _build_keyword_index(self) -> Dict[str, List[Tuple[str, float]]]: """Build keyword → agent mapping with confidence""" index = {} for route in self.registry.get("routing", {}).get("intent_routes", []): agent = route["agent"] threshold = route.get("confidence_threshold", 0.5) for keyword in route["keywords"]: kw = keyword.lower() if kw not in index: index[kw] = [] index[kw].append((agent, threshold)) return index def route(self, message: str, source_agent: Optional[str] = None) -> Tuple[str, float, str]: """ Route message to appropriate agent. Returns: (agent_id, confidence, reason) """ message_lower = message.lower().strip() # 1. Check hard routes first (commands) for pattern, agent in self.hard_routes: if pattern.match(message): logger.info("hard_route_matched", agent=agent, pattern=pattern.pattern) return (agent, 1.0, "hard_route") # 2. Intent-based routing (keyword matching) agent_scores = {} matched_keywords = {} for keyword, agents in self.intent_keywords.items(): if keyword in message_lower: for agent, threshold in agents: if agent not in agent_scores: agent_scores[agent] = 0.0 matched_keywords[agent] = [] agent_scores[agent] += threshold matched_keywords[agent].append(keyword) if agent_scores: # Normalize scores max_score = max(agent_scores.values()) for agent in agent_scores: agent_scores[agent] /= max(1, len(matched_keywords[agent])) # Select best agent best_agent = max(agent_scores, key=agent_scores.get) confidence = min(agent_scores[best_agent], 1.0) # Check if active if self.is_agent_active(best_agent): logger.info("intent_route_matched", agent=best_agent, confidence=confidence, keywords=matched_keywords[best_agent]) return (best_agent, confidence, f"keywords: {matched_keywords[best_agent][:3]}") # 3. Fallback fallback = self.registry.get("routing", {}).get("fallback", {}) fallback_agent = fallback.get("agent", "helion") logger.info("fallback_route", agent=fallback_agent) return (fallback_agent, 0.3, "fallback") def is_agent_active(self, agent_id: str) -> bool: """Check if agent is active""" agent = self.registry.get("agents", {}).get(agent_id) return agent and agent.get("active", False) def get_agent_config(self, agent_id: str) -> Optional[Dict]: """Get full agent configuration""" return self.registry.get("agents", {}).get(agent_id) def get_memory_policy(self, agent_id: str) -> Dict: """Get agent memory access policy""" agent = self.get_agent_config(agent_id) if not agent: return {} return agent.get("memory_policy", {}) def get_data_policy(self, agent_id: str) -> Dict: """Get agent data sharing policy""" agent = self.get_agent_config(agent_id) if not agent: return {} return agent.get("data_policy", {}) def can_handoff(self, source_agent: str, target_agent: str) -> bool: """Check if handoff is allowed between agents""" policy = self.get_data_policy(source_agent) allowed = policy.get("handoff_allow", []) return target_agent in allowed def get_rate_limit(self, agent_id: str) -> int: """Get rate limit (requests per minute) for agent""" agent = self.get_agent_config(agent_id) if not agent: return 30 # default return agent.get("sla", {}).get("rate_limit_rpm", 30) def list_active_agents(self) -> List[str]: """List all active agents""" return [ agent_id for agent_id, config in self.registry.get("agents", {}).items() if config.get("active", False) ] # Singleton instance _router_instance = None def get_intent_router() -> IntentRouter: """Get or create intent router instance""" global _router_instance if _router_instance is None: _router_instance = IntentRouter() return _router_instance