""" Telegram History Recovery Автоматичне відновлення історії повідомлень для агентів """ import asyncio import logging import os from typing import List, Dict, Optional, Set from datetime import datetime, timedelta import httpx logger = logging.getLogger(__name__) # Configuration HISTORY_LIMIT = int(os.getenv("TELEGRAM_HISTORY_LIMIT", "100")) # Кількість повідомлень для відновлення MIN_COLLECTION_SIZE = int(os.getenv("MIN_COLLECTION_SIZE", "10")) # Мінімальний розмір колекції QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") ROUTER_URL = os.getenv("ROUTER_URL", "http://localhost:9101") class TelegramHistoryRecovery: """Система відновлення історії Telegram для агентів""" def __init__(self): self.http_client = httpx.AsyncClient(timeout=30.0) self.processed_messages: Set[int] = set() # Кеш оброблених message_id async def check_collection_health(self, agent_id: str) -> Dict[str, any]: """ Перевірити здоров'я колекції агента Returns: { "exists": bool, "points_count": int, "needs_recovery": bool } """ try: collection_name = f"{agent_id}_messages" url = f"{QDRANT_URL}/collections/{collection_name}" response = await self.http_client.get(url) if response.status_code == 404: logger.warning(f"Collection {collection_name} не існує") return {"exists": False, "points_count": 0, "needs_recovery": True} response.raise_for_status() data = response.json() points_count = data.get("result", {}).get("points_count", 0) needs_recovery = points_count < MIN_COLLECTION_SIZE logger.info(f"Collection {collection_name}: {points_count} points, needs_recovery={needs_recovery}") return { "exists": True, "points_count": points_count, "needs_recovery": needs_recovery } except Exception as e: logger.error(f"Помилка перевірки колекції {agent_id}: {e}") return {"exists": False, "points_count": 0, "needs_recovery": True} async def fetch_telegram_history( self, bot_token: str, chat_id: int, limit: int = HISTORY_LIMIT ) -> List[Dict]: """ Отримати історію повідомлень з Telegram Note: Telegram API не має прямого методу для отримання історії. Використовуємо getUpdates з offset для отримання останніх повідомлень. """ try: messages = [] # Telegram не дає прямий доступ до історії чату через Bot API # Альтернативний підхід: зберігати message_id і використовувати forwardMessage # Або інтегруватися з MTProto для повного доступу # Для спрощення: припускаємо що ми можемо отримати останні updates url = f"https://api.telegram.org/bot{bot_token}/getUpdates" params = { "limit": limit, "timeout": 1 } response = await self.http_client.get(url, params=params) response.raise_for_status() data = response.json() if not data.get("ok"): logger.error(f"Telegram API error: {data}") return [] updates = data.get("result", []) for update in updates: message = update.get("message") if message and message.get("chat", {}).get("id") == chat_id: messages.append(message) logger.info(f"Отримано {len(messages)} повідомлень з Telegram для chat {chat_id}") return messages except Exception as e: logger.error(f"Помилка отримання історії Telegram: {e}") return [] async def check_message_exists( self, agent_id: str, message_id: int ) -> bool: """ Перевірити чи повідомлення вже є в Qdrant """ if message_id in self.processed_messages: return True try: collection_name = f"{agent_id}_messages" url = f"{QDRANT_URL}/collections/{collection_name}/points/scroll" payload = { "filter": { "must": [ { "key": "message_id", "match": {"value": message_id} } ] }, "limit": 1 } response = await self.http_client.post(url, json=payload) if response.status_code == 404: return False response.raise_for_status() data = response.json() points = data.get("result", {}).get("points", []) exists = len(points) > 0 if exists: self.processed_messages.add(message_id) return exists except Exception as e: logger.error(f"Помилка перевірки message_id={message_id}: {e}") return False async def ingest_message( self, agent_id: str, message: Dict, bot_token: str ) -> bool: """ Відправити повідомлення на інжест через Router """ try: message_id = message.get("message_id") text = message.get("text", "") if not text or not message_id: return False # Перевірити чи вже є if await self.check_message_exists(agent_id, message_id): logger.debug(f"Message {message_id} вже існує, пропускаю") return True # Відправити на інжест через Router from_user = message.get("from", {}) chat = message.get("chat", {}) payload = { "message": text, "mode": "ingest_history", # Спеціальний режим для історичних повідомлень "agent": agent_id, "metadata": { "source": "telegram_history_recovery", "message_id": message_id, "user_id": f"tg:{from_user.get('id')}", "chat_id": str(chat.get("id")), "username": from_user.get("username", ""), "date": message.get("date"), "is_historical": True } } response = await self.http_client.post( f"{ROUTER_URL}/chat", json=payload, timeout=10.0 ) if response.status_code == 200: self.processed_messages.add(message_id) logger.debug(f"✅ Інжест message {message_id} успішний") return True else: logger.error(f"❌ Помилка інжесту message {message_id}: {response.status_code}") return False except Exception as e: logger.error(f"Помилка інжесту повідомлення: {e}") return False async def recover_chat_history( self, agent_id: str, bot_token: str, chat_id: int, limit: int = HISTORY_LIMIT ) -> Dict[str, any]: """ Відновити історію чату для агента Returns: { "success": bool, "messages_fetched": int, "messages_ingested": int, "messages_skipped": int } """ logger.info(f"🔄 Починаю відновлення історії для {agent_id}, chat={chat_id}, limit={limit}") # Отримати повідомлення з Telegram messages = await self.fetch_telegram_history(bot_token, chat_id, limit) if not messages: logger.warning(f"Не отримано повідомлень для {agent_id}") return { "success": False, "messages_fetched": 0, "messages_ingested": 0, "messages_skipped": 0 } # Інжестити кожне повідомлення ingested = 0 skipped = 0 for message in messages: success = await self.ingest_message(agent_id, message, bot_token) if success: ingested += 1 else: skipped += 1 # Невелика затримка щоб не перевантажити систему await asyncio.sleep(0.1) result = { "success": True, "messages_fetched": len(messages), "messages_ingested": ingested, "messages_skipped": skipped } logger.info(f"✅ Відновлення завершено для {agent_id}: {result}") return result async def auto_recover_on_startup( self, agents: List[Dict[str, str]] ) -> Dict[str, any]: """ Автоматичне відновлення при старті Gateway Args: agents: List of {"agent_id": str, "bot_token": str, "chat_id": int} Returns: { "total_agents": int, "agents_recovered": int, "results": {agent_id: result} } """ logger.info(f"🚀 Автоматичне відновлення при старті для {len(agents)} агентів") results = {} agents_recovered = 0 for agent_config in agents: agent_id = agent_config.get("agent_id") bot_token = agent_config.get("bot_token") chat_id = agent_config.get("chat_id") if not all([agent_id, bot_token, chat_id]): logger.warning(f"Пропускаю {agent_id}: неповна конфігурація") continue # Перевірити стан колекції health = await self.check_collection_health(agent_id) if health["needs_recovery"]: logger.info(f"🔧 Агент {agent_id} потребує відновлення (points={health['points_count']})") result = await self.recover_chat_history(agent_id, bot_token, chat_id) results[agent_id] = result if result["success"]: agents_recovered += 1 else: logger.info(f"✅ Агент {agent_id} в порядку (points={health['points_count']})") results[agent_id] = {"status": "healthy", "points_count": health["points_count"]} summary = { "total_agents": len(agents), "agents_recovered": agents_recovered, "results": results } logger.info(f"🏁 Автоматичне відновлення завершено: {summary}") return summary async def nightly_sync( self, agents: List[Dict[str, str]] ) -> Dict[str, any]: """ Нічна синхронізація історії (cron job о 04:00) Оновлює тільки активні чати (з повідомленнями за останні 7 днів) """ logger.info(f"🌙 Нічна синхронізація для {len(agents)} агентів") results = {} for agent_config in agents: agent_id = agent_config.get("agent_id") bot_token = agent_config.get("bot_token") chat_id = agent_config.get("chat_id") if not all([agent_id, bot_token, chat_id]): continue # Синхронізувати останні 20 повідомлень (швидше) result = await self.recover_chat_history(agent_id, bot_token, chat_id, limit=20) results[agent_id] = result # Затримка між агентами await asyncio.sleep(1) logger.info(f"🌙 Нічна синхронізація завершена: {results}") return results async def close(self): """Закрити HTTP клієнт""" await self.http_client.aclose() # Singleton instance recovery_service = TelegramHistoryRecovery() async def auto_recover_on_startup_all_agents(): """ Helper функція для запуску при старті Gateway. Автоматично виявляє агентів з PostgreSQL та .env токенів """ import psycopg2 from psycopg2.extras import RealDictCursor # Конфігурація з .env agents_config = [ { "agent_id": "helion", "bot_token": os.getenv("HELION_TELEGRAM_BOT_TOKEN"), }, { "agent_id": "nutra", "bot_token": os.getenv("NUTRA_TELEGRAM_BOT_TOKEN"), }, { "agent_id": "agromatrix", "bot_token": os.getenv("AGROMATRIX_TELEGRAM_BOT_TOKEN"), }, { "agent_id": "greenfood", "bot_token": os.getenv("GREENFOOD_TELEGRAM_BOT_TOKEN"), }, { "agent_id": "daarwizz", "bot_token": os.getenv("TELEGRAM_BOT_TOKEN"), # Загальний токен }, ] # Підключення до PostgreSQL для отримання chat_id try: # Спершу спробувати з .env, потім дефолтний URL db_url = os.getenv("DATABASE_URL") if not db_url: db_url = "postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory" conn = psycopg2.connect(db_url) cursor = conn.cursor(cursor_factory=RealDictCursor) # Отримати унікальні chat_id з бази # Схема: fact_key = 'doc_context:telegram:CHAT_ID' cursor.execute(""" SELECT DISTINCT split_part(fact_key, ':', 3) as chat_id FROM user_facts WHERE fact_key LIKE 'doc_context:telegram:%' AND split_part(fact_key, ':', 3) != '' ORDER BY chat_id """) # Зібрати всі chat_id all_chat_ids = [] for row in cursor.fetchall(): chat_id = row["chat_id"] try: all_chat_ids.append(int(chat_id)) except ValueError: logger.warning(f"Невалідний chat_id {chat_id}") cursor.close() conn.close() logger.info(f"Знайдено {len(all_chat_ids)} унікальних Telegram чатів: {all_chat_ids}") except Exception as e: logger.error(f"Помилка підключення до БД для chat_id: {e}") logger.info("Відновлення пропущено через недоступність БД") return {"status": "skipped", "reason": "database unavailable"} # Сформувати список агентів для recovery agents = [] for config in agents_config: agent_id = config["agent_id"] bot_token = config["bot_token"] if not bot_token: logger.debug(f"Пропускаю {agent_id}: немає токену в .env") continue # Додати всі чати для цього агента if not all_chat_ids: logger.debug(f"Пропускаю {agent_id}: немає активних чатів в БД") continue # Додати кожен чат для цього агента for chat_id in all_chat_ids: agents.append({ "agent_id": agent_id, "bot_token": bot_token, "chat_id": chat_id }) if not agents: logger.info("Немає агентів для відновлення") return {"status": "no_agents", "agents": []} logger.info(f"Запуск відновлення для {len(agents)} агент-чат пар") return await recovery_service.auto_recover_on_startup(agents)