import asyncio import os import logging import time from typing import Optional, Dict, Any, List, Tuple from datetime import datetime from collections import deque import httpx logger = logging.getLogger(__name__) MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000") CONTEXT_CACHE_TTL = float(os.getenv("MEMORY_CONTEXT_CACHE_TTL", "5")) LOCAL_CONTEXT_MAX_MESSAGES = int(os.getenv("LOCAL_CONTEXT_MAX_MESSAGES", "50")) # Auto-summarize trigger configuration SUMMARIZE_TURN_THRESHOLD = int(os.getenv("SUMMARIZE_TURN_THRESHOLD", "30")) SUMMARIZE_DEBOUNCE_SECONDS = int(os.getenv("SUMMARIZE_DEBOUNCE_SECONDS", "300")) # 5 min # ===================================== # LOCAL CONTEXT STORE (fallback when Memory Service unavailable) # ===================================== class LocalContextStore: """Локальне сховище контексту (in-memory) для випадків, коли Memory Service недоступний""" def __init__(self, max_messages: int = LOCAL_CONTEXT_MAX_MESSAGES): self.max_messages = max_messages # {chat_id: deque([(role, text, timestamp), ...])} self._store: Dict[str, deque] = {} def add_message(self, chat_id: str, role: str, text: str): """Додати повідомлення до контексту""" if chat_id not in self._store: self._store[chat_id] = deque(maxlen=self.max_messages) self._store[chat_id].append({ "role": role, "text": text, "timestamp": datetime.now().isoformat() }) def get_context(self, chat_id: str, limit: int = 30) -> List[Dict[str, Any]]: """Отримати останні повідомлення для контексту""" if chat_id not in self._store: return [] messages = list(self._store[chat_id]) return messages[-limit:] if limit else messages def clear_chat(self, chat_id: str): """Очистити контекст чату""" if chat_id in self._store: del self._store[chat_id] def format_for_prompt(self, chat_id: str, limit: int = 30) -> str: """Форматувати контекст для system prompt""" messages = self.get_context(chat_id, limit) if not messages: return "" lines = [] for msg in messages: role = "User" if msg["role"] == "user" else "Assistant" lines.append(f"{role}: {msg['text']}") return "\n".join(lines) # Global local context store local_context = LocalContextStore() class MemoryClient: """Клієнт для роботи з Memory Service""" def __init__(self, base_url: str = MEMORY_SERVICE_URL): self.base_url = base_url.rstrip("/") self.timeout = 10.0 self._context_cache: Dict[str, Tuple[float, Dict[str, Any]]] = {} # Auto-summarize state self._turn_counters: Dict[str, int] = {} self._last_summarize: Dict[str, float] = {} def _cache_key( self, user_id: str, agent_id: str, team_id: str, channel_id: Optional[str], limit: int ) -> str: return f"{user_id}:{agent_id}:{team_id}:{channel_id}:{limit}" async def get_context( self, user_id: str, agent_id: str, team_id: str, channel_id: Optional[str] = None, limit: int = 80 ) -> Dict[str, Any]: """ Отримати контекст пам'яті для діалогу. Використовує локальний кеш як fallback, якщо Memory Service недоступний. """ cache_key = self._cache_key(user_id, agent_id, team_id, channel_id, limit) cached = self._context_cache.get(cache_key) now = time.monotonic() if cached and now - cached[0] < CONTEXT_CACHE_TTL: return cached[1] # Спроба отримати контекст із Memory Service try: async with httpx.AsyncClient(timeout=self.timeout) as client: params = { "user_id": user_id, "channel_id": channel_id, "limit": limit, } resp = await client.get( f"{self.base_url}/agents/{agent_id}/memory", params=params, headers={"Authorization": f"Bearer {user_id}"}, ) if resp.status_code == 200: data = resp.json() events = data.get("events", []) # Сортуємо за timestamp, якщо є events = sorted( events, key=lambda e: e.get("timestamp", ""), ) # Build user_id -> username mapping from all events (newer events may have metadata) _uid_to_name = {} for e in events: uid = e.get("user_id", "") md = e.get("metadata", {}) uname = md.get("username") or "" if uid and uname and uid not in _uid_to_name: _uid_to_name[uid] = uname # Also try sender_name sn = e.get("sender_name", "") if uid and sn and not sn.startswith("tg:") and uid not in _uid_to_name: _uid_to_name[uid] = sn recent_events = [ { "body_text": e.get("content", ""), "kind": e.get("kind", "message"), "type": "user" if e.get("role") == "user" else "agent", "role": e.get("role", "unknown"), "timestamp": e.get("timestamp"), "user_id": e.get("user_id"), "sender_name": e.get("sender_name"), } for e in events if e.get("content") ] # Формуємо контекст для prompt lines = [] for e in events: content = e.get("content", "") if not content: continue if e.get("role") == "user": # Show sender name for group chats sender = e.get("sender_name", "") if not sender: md = e.get("metadata", {}) sender = md.get("username") or md.get("first_name") or "" # Resolve tg:IDs using the mapping if not sender or sender.startswith("tg:"): uid = e.get("user_id", "") sender = _uid_to_name.get(uid, sender) if sender: role = f"[{sender}]" else: role = "User" else: role = "Assistant" lines.append(f"{role}: {content}") result = { "facts": [], "recent_events": recent_events, "dialog_summaries": [], "local_context_text": "\n".join(lines[-limit:]), } self._context_cache[cache_key] = (now, result) return result except Exception as e: logger.debug(f"Memory Service context fetch failed, using local: {e}") # FALLBACK: локальний контекст (in-memory) local_messages = local_context.get_context(f"{agent_id}:{channel_id or user_id}", limit) local_events = [ {"body_text": msg["text"], "kind": "message", "type": "user" if msg["role"] == "user" else "agent"} for msg in local_messages ] result = { "facts": [], "recent_events": local_events, "dialog_summaries": [], "local_context_text": local_context.format_for_prompt(f"{agent_id}:{channel_id or user_id}", limit), } self._context_cache[cache_key] = (now, result) return result async def save_chat_turn( self, agent_id: str, team_id: str, user_id: str, message: str, response: str, channel_id: Optional[str] = None, scope: str = "short_term", save_agent_response: bool = True, agent_metadata: Optional[Dict[str, Any]] = None, username: Optional[str] = None ) -> bool: """ Зберегти один turn діалогу (повідомлення + відповідь). Завжди зберігає в локальний контекст + намагається зберегти в Memory Service. """ chat_key = f"{agent_id}:{channel_id or user_id}" # ЗАВЖДИ зберігаємо в локальний контекст local_context.add_message(chat_key, "user", message) if save_agent_response and response: local_context.add_message(chat_key, "assistant", response) logger.info(f"💾 Saved to local context: chat={chat_key}, messages={len(local_context.get_context(chat_key))}") # Спроба зберегти в Memory Service (може бути недоступний) try: async with httpx.AsyncClient(timeout=self.timeout) as client: user_event = { "agent_id": agent_id, "team_id": team_id, "channel_id": channel_id, "user_id": user_id, "scope": scope, "kind": "message", "body_text": message, "body_json": {"type": "user_message", "source": "telegram", "username": username or ""} } await client.post( f"{self.base_url}/agents/{agent_id}/memory", json=user_event, headers={"Authorization": f"Bearer {user_id}"} ) if save_agent_response and response: agent_event = { "agent_id": agent_id, "team_id": team_id, "channel_id": channel_id, "user_id": user_id, "scope": scope, "kind": "message", "body_text": response, "body_json": { "type": "agent_response", "source": "telegram", **(agent_metadata or {}) } } await client.post( f"{self.base_url}/agents/{agent_id}/memory", json=agent_event, headers={"Authorization": f"Bearer {user_id}"} ) # Auto-summarize trigger (fire-and-forget, non-blocking) try: asyncio.ensure_future(self._maybe_trigger_summarize( agent_id=agent_id, channel_id=channel_id, user_id=user_id, team_id=team_id )) except Exception as trigger_err: logger.debug(f"Summarize trigger scheduling failed: {trigger_err}") return True except Exception as e: # Memory Service недоступний - але локальний контекст вже збережено logger.debug(f"Memory Service unavailable (using local context): {e}") return True # Return True because local context was saved async def _maybe_trigger_summarize( self, agent_id: str, channel_id, user_id: str, team_id=None ): """ Auto-trigger summarize when conversation reaches threshold. Uses turn counter + debounce to avoid excessive calls. """ chat_key = f"{agent_id}:{channel_id or user_id}" # Increment turn counter self._turn_counters[chat_key] = self._turn_counters.get(chat_key, 0) + 1 turn_count = self._turn_counters[chat_key] # Check threshold if turn_count < SUMMARIZE_TURN_THRESHOLD: return # Check debounce now = time.monotonic() last = self._last_summarize.get(chat_key, 0) if now - last < SUMMARIZE_DEBOUNCE_SECONDS: logger.debug(f"Summarize debounce active for {chat_key}, skipping") return # Reset counter and update timestamp self._turn_counters[chat_key] = 0 self._last_summarize[chat_key] = now # Fire-and-forget summarize request try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{self.base_url}/agents/{agent_id}/summarize", json={ "channel_id": channel_id, "user_id": user_id, "max_events": 60, "force": False }, headers={"Authorization": f"Bearer {user_id}"} ) if resp.status_code == 200: data = resp.json() logger.info( f"Auto-summary created for {chat_key}: " f"events={data.get('events_summarized', '?')}, " f"summary_len={len(data.get('summary', ''))}" ) else: logger.warning( f"Auto-summary failed for {chat_key}: " f"status={resp.status_code}, body={resp.text[:200]}" ) except Exception as e: logger.warning(f"Auto-summary request failed for {chat_key}: {e}") async def create_dialog_summary( self, team_id: str, channel_id: Optional[str], agent_id: str, user_id: Optional[str], period_start: datetime, period_end: datetime, summary_text: str, message_count: int = 0, participant_count: int = 0, topics: Optional[List[str]] = None, summary_json: Optional[Dict[str, Any]] = None ) -> bool: """ Створити підсумок діалогу для масштабування без переповнення контексту """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/summaries", json={ "team_id": team_id, "channel_id": channel_id, "agent_id": agent_id, "user_id": user_id, "period_start": period_start.isoformat(), "period_end": period_end.isoformat(), "summary_text": summary_text, "summary_json": summary_json, "message_count": message_count, "participant_count": participant_count, "topics": topics or [], "meta": {} }, headers={"Authorization": f"Bearer {user_id or 'system'}"} ) return response.status_code in [200, 201] except Exception as e: logger.warning(f"Failed to create dialog summary: {e}") return False async def upsert_fact( self, user_id: str, fact_key: str, fact_value: Optional[str] = None, fact_value_json: Optional[Dict[str, Any]] = None, team_id: Optional[str] = None ) -> bool: """ Створити або оновити факт користувача """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/facts/upsert", json={ "user_id": user_id, "fact_key": fact_key, "fact_value": fact_value, "fact_value_json": fact_value_json, "team_id": team_id }, headers={"Authorization": f"Bearer {user_id}"} ) return response.status_code in [200, 201] except Exception as e: logger.warning(f"Failed to upsert fact: {e}") return False async def get_fact( self, user_id: str, fact_key: str, team_id: Optional[str] = None ) -> Optional[Dict[str, Any]]: """ Отримати факт користувача Returns: Fact dict with fact_value and fact_value_json, or None if not found """ try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/facts/{fact_key}", params={ "user_id": user_id, "team_id": team_id }, headers={"Authorization": f"Bearer {user_id}"} ) if response.status_code == 200: return response.json() return None except Exception as e: logger.warning(f"Failed to get fact: {e}") return None # Глобальний екземпляр клієнта memory_client = MemoryClient()