## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
469 lines
17 KiB
Python
469 lines
17 KiB
Python
"""
|
||
Telegram History Recovery
|
||
Автоматичне відновлення історії повідомлень для агентів
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
from typing import List, Dict, Optional, Set
|
||
from datetime import datetime, timedelta
|
||
import httpx
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Configuration
|
||
HISTORY_LIMIT = int(os.getenv("TELEGRAM_HISTORY_LIMIT", "100")) # Кількість повідомлень для відновлення
|
||
MIN_COLLECTION_SIZE = int(os.getenv("MIN_COLLECTION_SIZE", "10")) # Мінімальний розмір колекції
|
||
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
|
||
ROUTER_URL = os.getenv("ROUTER_URL", "http://localhost:9101")
|
||
|
||
|
||
class TelegramHistoryRecovery:
|
||
"""Система відновлення історії Telegram для агентів"""
|
||
|
||
def __init__(self):
|
||
self.http_client = httpx.AsyncClient(timeout=30.0)
|
||
self.processed_messages: Set[int] = set() # Кеш оброблених message_id
|
||
|
||
async def check_collection_health(self, agent_id: str) -> Dict[str, any]:
|
||
"""
|
||
Перевірити здоров'я колекції агента
|
||
|
||
Returns:
|
||
{
|
||
"exists": bool,
|
||
"points_count": int,
|
||
"needs_recovery": bool
|
||
}
|
||
"""
|
||
try:
|
||
collection_name = f"{agent_id}_messages"
|
||
url = f"{QDRANT_URL}/collections/{collection_name}"
|
||
|
||
response = await self.http_client.get(url)
|
||
|
||
if response.status_code == 404:
|
||
logger.warning(f"Collection {collection_name} не існує")
|
||
return {"exists": False, "points_count": 0, "needs_recovery": True}
|
||
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
points_count = data.get("result", {}).get("points_count", 0)
|
||
needs_recovery = points_count < MIN_COLLECTION_SIZE
|
||
|
||
logger.info(f"Collection {collection_name}: {points_count} points, needs_recovery={needs_recovery}")
|
||
|
||
return {
|
||
"exists": True,
|
||
"points_count": points_count,
|
||
"needs_recovery": needs_recovery
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Помилка перевірки колекції {agent_id}: {e}")
|
||
return {"exists": False, "points_count": 0, "needs_recovery": True}
|
||
|
||
async def fetch_telegram_history(
|
||
self,
|
||
bot_token: str,
|
||
chat_id: int,
|
||
limit: int = HISTORY_LIMIT
|
||
) -> List[Dict]:
|
||
"""
|
||
Отримати історію повідомлень з Telegram
|
||
|
||
Note: Telegram API не має прямого методу для отримання історії.
|
||
Використовуємо getUpdates з offset для отримання останніх повідомлень.
|
||
"""
|
||
try:
|
||
messages = []
|
||
|
||
# Telegram не дає прямий доступ до історії чату через Bot API
|
||
# Альтернативний підхід: зберігати message_id і використовувати forwardMessage
|
||
# Або інтегруватися з MTProto для повного доступу
|
||
|
||
# Для спрощення: припускаємо що ми можемо отримати останні updates
|
||
url = f"https://api.telegram.org/bot{bot_token}/getUpdates"
|
||
params = {
|
||
"limit": limit,
|
||
"timeout": 1
|
||
}
|
||
|
||
response = await self.http_client.get(url, params=params)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
if not data.get("ok"):
|
||
logger.error(f"Telegram API error: {data}")
|
||
return []
|
||
|
||
updates = data.get("result", [])
|
||
|
||
for update in updates:
|
||
message = update.get("message")
|
||
if message and message.get("chat", {}).get("id") == chat_id:
|
||
messages.append(message)
|
||
|
||
logger.info(f"Отримано {len(messages)} повідомлень з Telegram для chat {chat_id}")
|
||
return messages
|
||
|
||
except Exception as e:
|
||
logger.error(f"Помилка отримання історії Telegram: {e}")
|
||
return []
|
||
|
||
async def check_message_exists(
|
||
self,
|
||
agent_id: str,
|
||
message_id: int
|
||
) -> bool:
|
||
"""
|
||
Перевірити чи повідомлення вже є в Qdrant
|
||
"""
|
||
if message_id in self.processed_messages:
|
||
return True
|
||
|
||
try:
|
||
collection_name = f"{agent_id}_messages"
|
||
url = f"{QDRANT_URL}/collections/{collection_name}/points/scroll"
|
||
|
||
payload = {
|
||
"filter": {
|
||
"must": [
|
||
{
|
||
"key": "message_id",
|
||
"match": {"value": message_id}
|
||
}
|
||
]
|
||
},
|
||
"limit": 1
|
||
}
|
||
|
||
response = await self.http_client.post(url, json=payload)
|
||
|
||
if response.status_code == 404:
|
||
return False
|
||
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
|
||
points = data.get("result", {}).get("points", [])
|
||
exists = len(points) > 0
|
||
|
||
if exists:
|
||
self.processed_messages.add(message_id)
|
||
|
||
return exists
|
||
|
||
except Exception as e:
|
||
logger.error(f"Помилка перевірки message_id={message_id}: {e}")
|
||
return False
|
||
|
||
async def ingest_message(
|
||
self,
|
||
agent_id: str,
|
||
message: Dict,
|
||
bot_token: str
|
||
) -> bool:
|
||
"""
|
||
Відправити повідомлення на інжест через Router
|
||
"""
|
||
try:
|
||
message_id = message.get("message_id")
|
||
text = message.get("text", "")
|
||
|
||
if not text or not message_id:
|
||
return False
|
||
|
||
# Перевірити чи вже є
|
||
if await self.check_message_exists(agent_id, message_id):
|
||
logger.debug(f"Message {message_id} вже існує, пропускаю")
|
||
return True
|
||
|
||
# Відправити на інжест через Router
|
||
from_user = message.get("from", {})
|
||
chat = message.get("chat", {})
|
||
|
||
payload = {
|
||
"message": text,
|
||
"mode": "ingest_history", # Спеціальний режим для історичних повідомлень
|
||
"agent": agent_id,
|
||
"metadata": {
|
||
"source": "telegram_history_recovery",
|
||
"message_id": message_id,
|
||
"user_id": f"tg:{from_user.get('id')}",
|
||
"chat_id": str(chat.get("id")),
|
||
"username": from_user.get("username", ""),
|
||
"date": message.get("date"),
|
||
"is_historical": True
|
||
}
|
||
}
|
||
|
||
response = await self.http_client.post(
|
||
f"{ROUTER_URL}/chat",
|
||
json=payload,
|
||
timeout=10.0
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
self.processed_messages.add(message_id)
|
||
logger.debug(f"✅ Інжест message {message_id} успішний")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ Помилка інжесту message {message_id}: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"Помилка інжесту повідомлення: {e}")
|
||
return False
|
||
|
||
async def recover_chat_history(
|
||
self,
|
||
agent_id: str,
|
||
bot_token: str,
|
||
chat_id: int,
|
||
limit: int = HISTORY_LIMIT
|
||
) -> Dict[str, any]:
|
||
"""
|
||
Відновити історію чату для агента
|
||
|
||
Returns:
|
||
{
|
||
"success": bool,
|
||
"messages_fetched": int,
|
||
"messages_ingested": int,
|
||
"messages_skipped": int
|
||
}
|
||
"""
|
||
logger.info(f"🔄 Починаю відновлення історії для {agent_id}, chat={chat_id}, limit={limit}")
|
||
|
||
# Отримати повідомлення з Telegram
|
||
messages = await self.fetch_telegram_history(bot_token, chat_id, limit)
|
||
|
||
if not messages:
|
||
logger.warning(f"Не отримано повідомлень для {agent_id}")
|
||
return {
|
||
"success": False,
|
||
"messages_fetched": 0,
|
||
"messages_ingested": 0,
|
||
"messages_skipped": 0
|
||
}
|
||
|
||
# Інжестити кожне повідомлення
|
||
ingested = 0
|
||
skipped = 0
|
||
|
||
for message in messages:
|
||
success = await self.ingest_message(agent_id, message, bot_token)
|
||
if success:
|
||
ingested += 1
|
||
else:
|
||
skipped += 1
|
||
|
||
# Невелика затримка щоб не перевантажити систему
|
||
await asyncio.sleep(0.1)
|
||
|
||
result = {
|
||
"success": True,
|
||
"messages_fetched": len(messages),
|
||
"messages_ingested": ingested,
|
||
"messages_skipped": skipped
|
||
}
|
||
|
||
logger.info(f"✅ Відновлення завершено для {agent_id}: {result}")
|
||
return result
|
||
|
||
async def auto_recover_on_startup(
|
||
self,
|
||
agents: List[Dict[str, str]]
|
||
) -> Dict[str, any]:
|
||
"""
|
||
Автоматичне відновлення при старті Gateway
|
||
|
||
Args:
|
||
agents: List of {"agent_id": str, "bot_token": str, "chat_id": int}
|
||
|
||
Returns:
|
||
{
|
||
"total_agents": int,
|
||
"agents_recovered": int,
|
||
"results": {agent_id: result}
|
||
}
|
||
"""
|
||
logger.info(f"🚀 Автоматичне відновлення при старті для {len(agents)} агентів")
|
||
|
||
results = {}
|
||
agents_recovered = 0
|
||
|
||
for agent_config in agents:
|
||
agent_id = agent_config.get("agent_id")
|
||
bot_token = agent_config.get("bot_token")
|
||
chat_id = agent_config.get("chat_id")
|
||
|
||
if not all([agent_id, bot_token, chat_id]):
|
||
logger.warning(f"Пропускаю {agent_id}: неповна конфігурація")
|
||
continue
|
||
|
||
# Перевірити стан колекції
|
||
health = await self.check_collection_health(agent_id)
|
||
|
||
if health["needs_recovery"]:
|
||
logger.info(f"🔧 Агент {agent_id} потребує відновлення (points={health['points_count']})")
|
||
result = await self.recover_chat_history(agent_id, bot_token, chat_id)
|
||
results[agent_id] = result
|
||
|
||
if result["success"]:
|
||
agents_recovered += 1
|
||
else:
|
||
logger.info(f"✅ Агент {agent_id} в порядку (points={health['points_count']})")
|
||
results[agent_id] = {"status": "healthy", "points_count": health["points_count"]}
|
||
|
||
summary = {
|
||
"total_agents": len(agents),
|
||
"agents_recovered": agents_recovered,
|
||
"results": results
|
||
}
|
||
|
||
logger.info(f"🏁 Автоматичне відновлення завершено: {summary}")
|
||
return summary
|
||
|
||
async def nightly_sync(
|
||
self,
|
||
agents: List[Dict[str, str]]
|
||
) -> Dict[str, any]:
|
||
"""
|
||
Нічна синхронізація історії (cron job о 04:00)
|
||
|
||
Оновлює тільки активні чати (з повідомленнями за останні 7 днів)
|
||
"""
|
||
logger.info(f"🌙 Нічна синхронізація для {len(agents)} агентів")
|
||
|
||
results = {}
|
||
|
||
for agent_config in agents:
|
||
agent_id = agent_config.get("agent_id")
|
||
bot_token = agent_config.get("bot_token")
|
||
chat_id = agent_config.get("chat_id")
|
||
|
||
if not all([agent_id, bot_token, chat_id]):
|
||
continue
|
||
|
||
# Синхронізувати останні 20 повідомлень (швидше)
|
||
result = await self.recover_chat_history(agent_id, bot_token, chat_id, limit=20)
|
||
results[agent_id] = result
|
||
|
||
# Затримка між агентами
|
||
await asyncio.sleep(1)
|
||
|
||
logger.info(f"🌙 Нічна синхронізація завершена: {results}")
|
||
return results
|
||
|
||
async def close(self):
|
||
"""Закрити HTTP клієнт"""
|
||
await self.http_client.aclose()
|
||
|
||
|
||
# Singleton instance
|
||
recovery_service = TelegramHistoryRecovery()
|
||
|
||
|
||
async def auto_recover_on_startup_all_agents():
|
||
"""
|
||
Helper функція для запуску при старті Gateway.
|
||
Автоматично виявляє агентів з PostgreSQL та .env токенів
|
||
"""
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
|
||
# Конфігурація з .env
|
||
agents_config = [
|
||
{
|
||
"agent_id": "helion",
|
||
"bot_token": os.getenv("HELION_TELEGRAM_BOT_TOKEN"),
|
||
},
|
||
{
|
||
"agent_id": "nutra",
|
||
"bot_token": os.getenv("NUTRA_TELEGRAM_BOT_TOKEN"),
|
||
},
|
||
{
|
||
"agent_id": "agromatrix",
|
||
"bot_token": os.getenv("AGROMATRIX_TELEGRAM_BOT_TOKEN"),
|
||
},
|
||
{
|
||
"agent_id": "greenfood",
|
||
"bot_token": os.getenv("GREENFOOD_TELEGRAM_BOT_TOKEN"),
|
||
},
|
||
{
|
||
"agent_id": "daarwizz",
|
||
"bot_token": os.getenv("TELEGRAM_BOT_TOKEN"), # Загальний токен
|
||
},
|
||
]
|
||
|
||
# Підключення до PostgreSQL для отримання chat_id
|
||
try:
|
||
# Спершу спробувати з .env, потім дефолтний URL
|
||
db_url = os.getenv("DATABASE_URL")
|
||
if not db_url:
|
||
db_url = "postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory"
|
||
conn = psycopg2.connect(db_url)
|
||
cursor = conn.cursor(cursor_factory=RealDictCursor)
|
||
|
||
# Отримати унікальні chat_id з бази
|
||
# Схема: fact_key = 'doc_context:telegram:CHAT_ID'
|
||
cursor.execute("""
|
||
SELECT DISTINCT split_part(fact_key, ':', 3) as chat_id
|
||
FROM user_facts
|
||
WHERE fact_key LIKE 'doc_context:telegram:%'
|
||
AND split_part(fact_key, ':', 3) != ''
|
||
ORDER BY chat_id
|
||
""")
|
||
|
||
# Зібрати всі chat_id
|
||
all_chat_ids = []
|
||
for row in cursor.fetchall():
|
||
chat_id = row["chat_id"]
|
||
|
||
try:
|
||
all_chat_ids.append(int(chat_id))
|
||
except ValueError:
|
||
logger.warning(f"Невалідний chat_id {chat_id}")
|
||
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
logger.info(f"Знайдено {len(all_chat_ids)} унікальних Telegram чатів: {all_chat_ids}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Помилка підключення до БД для chat_id: {e}")
|
||
logger.info("Відновлення пропущено через недоступність БД")
|
||
return {"status": "skipped", "reason": "database unavailable"}
|
||
|
||
# Сформувати список агентів для recovery
|
||
agents = []
|
||
for config in agents_config:
|
||
agent_id = config["agent_id"]
|
||
bot_token = config["bot_token"]
|
||
|
||
if not bot_token:
|
||
logger.debug(f"Пропускаю {agent_id}: немає токену в .env")
|
||
continue
|
||
|
||
# Додати всі чати для цього агента
|
||
if not all_chat_ids:
|
||
logger.debug(f"Пропускаю {agent_id}: немає активних чатів в БД")
|
||
continue
|
||
|
||
# Додати кожен чат для цього агента
|
||
for chat_id in all_chat_ids:
|
||
agents.append({
|
||
"agent_id": agent_id,
|
||
"bot_token": bot_token,
|
||
"chat_id": chat_id
|
||
})
|
||
|
||
if not agents:
|
||
logger.info("Немає агентів для відновлення")
|
||
return {"status": "no_agents", "agents": []}
|
||
|
||
logger.info(f"Запуск відновлення для {len(agents)} агент-чат пар")
|
||
return await recovery_service.auto_recover_on_startup(agents)
|