""" DAARION Platform - Chat Isolation Module ========================================= Determines agent_id ONLY from chat_id / bot_token. NO cross-agent routing! """ import yaml import logging from pathlib import Path from typing import Dict, Optional, Tuple, Any from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class ChatResolution: """Result of chat -> agent resolution""" agent_id: str agent_name: str nats_invoke: str nats_response: str is_private: bool chat_name: Optional[str] = None out_of_domain_response: Optional[str] = None class ChatIsolation: """ Manages strict chat -> agent isolation. Each chat belongs to exactly ONE agent. """ def __init__(self, config_path: str = None): if config_path is None: config_path = Path(__file__).parent / "agents_chat_map.yaml" self.config_path = Path(config_path) self.config: Dict[str, Any] = {} self.chat_map: Dict[int, str] = {} # chat_id -> agent_id self.agents: Dict[str, Dict] = {} # agent_id -> agent config self.bot_tokens: Dict[str, str] = {} # env_var -> agent_id self._load_config() def _load_config(self): """Load and validate configuration""" if not self.config_path.exists(): logger.error(f"Chat isolation config not found: {self.config_path}") return try: with open(self.config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) # Build chat_id -> agent_id mapping for agent_id, agent_data in self.config.get("agents", {}).items(): self.agents[agent_id] = agent_data for chat in agent_data.get("telegram_chats", []): if chat.get("type") == "private": # Private chats handled via bot token continue chat_id = chat.get("chat_id") if chat_id and chat.get("enabled", True): self.chat_map[chat_id] = agent_id # Build bot_token -> agent_id mapping for agent_id, env_var in self.config.get("bot_tokens", {}).items(): self.bot_tokens[env_var] = agent_id logger.info(f"✅ Chat isolation loaded: {len(self.chat_map)} group chats, {len(self.agents)} agents") except Exception as e: logger.error(f"Failed to load chat isolation config: {e}") def resolve_agent( self, chat_id: int, chat_type: str = "private", bot_token_env: str = None ) -> Optional[ChatResolution]: """ Resolve chat to agent. Priority: 1. For private chats: use bot_token to determine agent 2. For group chats: use chat_id mapping Returns: ChatResolution or None if chat is not configured """ agent_id = None chat_name = None is_private = chat_type == "private" # 1. Try to resolve by bot token (for private chats) if is_private and bot_token_env: agent_id = self.bot_tokens.get(bot_token_env) logger.debug(f"Private chat: resolved to {agent_id} via bot token") # 2. Try to resolve by chat_id (for groups) if not agent_id and chat_id in self.chat_map: agent_id = self.chat_map[chat_id] # Find chat name agent_data = self.agents.get(agent_id, {}) for chat in agent_data.get("telegram_chats", []): if chat.get("chat_id") == chat_id: chat_name = chat.get("name") break logger.debug(f"Group chat {chat_id}: resolved to {agent_id}") # 3. For private chats without specific mapping, try to infer from bot token if not agent_id and is_private: # Default: try to match webhook path or use fallback logger.warning(f"Private chat {chat_id}: no specific mapping, need bot_token_env") if not agent_id: logger.warning(f"Chat {chat_id} ({chat_type}): NO AGENT CONFIGURED") return None agent_data = self.agents.get(agent_id, {}) out_of_domain = agent_data.get("out_of_domain", {}).get("response_uk", "") return ChatResolution( agent_id=agent_id, agent_name=agent_data.get("name", agent_id), nats_invoke=agent_data.get("nats_invoke", f"agent.{agent_id}.invoke"), nats_response=agent_data.get("nats_response", f"agent.{agent_id}.response"), is_private=is_private, chat_name=chat_name, out_of_domain_response=out_of_domain ) def get_unknown_chat_response(self, lang: str = "uk") -> str: """Get response for unknown/unconfigured chats""" policy = self.config.get("unknown_chat_policy", {}) key = f"message_{lang}" return policy.get(key, policy.get("message_uk", "Чат не налаштований.")) def get_agent_domain(self, agent_id: str) -> list: """Get agent's domain keywords""" return self.agents.get(agent_id, {}).get("domain", []) def is_out_of_domain(self, agent_id: str, message: str) -> bool: """ Check if message is clearly outside agent's domain. NOTE: This is a simple heuristic, not for routing! Used only to generate polite out-of-domain responses. """ # Get other agents' domains other_domains = [] for aid, adata in self.agents.items(): if aid != agent_id: other_domains.extend(adata.get("domain", [])) # Simple keyword check message_lower = message.lower() # Check if message contains keywords from OTHER domains # AND does NOT contain keywords from THIS agent's domain own_domain = self.get_agent_domain(agent_id) has_own_keyword = any(kw in message_lower for kw in own_domain) has_other_keyword = any(kw in message_lower for kw in other_domains) # Only flag as out-of-domain if clearly about another domain return has_other_keyword and not has_own_keyword def get_out_of_domain_response(self, agent_id: str, lang: str = "uk") -> str: """Get polite out-of-domain response for agent""" agent_data = self.agents.get(agent_id, {}) key = f"response_{lang}" return agent_data.get("out_of_domain", {}).get(key, "") # Global instance _chat_isolation: Optional[ChatIsolation] = None def get_chat_isolation() -> ChatIsolation: """Get or create chat isolation instance""" global _chat_isolation if _chat_isolation is None: _chat_isolation = ChatIsolation() return _chat_isolation def resolve_agent_for_chat( chat_id: int, chat_type: str = "private", bot_token_env: str = None ) -> Optional[ChatResolution]: """ Convenience function to resolve agent for a chat. Usage: resolution = resolve_agent_for_chat(chat_id, "group") if resolution: agent_id = resolution.agent_id nats_subject = resolution.nats_invoke """ return get_chat_isolation().resolve_agent(chat_id, chat_type, bot_token_env)