Files
microdao-daarion/gateway-bot/telegram_history_recovery.py
Apple 0c8bef82f4 feat: Add Alateya, Clan, Eonarch agents + fix gateway-router connection
## Agents Added
- Alateya: R&D, biotech, innovations
- Clan (Spirit): Community spirit agent
- Eonarch: Consciousness evolution agent

## Changes
- docker-compose.node1.yml: Added tokens for all 3 new agents
- gateway-bot/http_api.py: Added configs and webhook endpoints
- gateway-bot/clan_prompt.txt: New prompt file
- gateway-bot/eonarch_prompt.txt: New prompt file

## Fixes
- Fixed ROUTER_URL from :9102 to :8000 (internal container port)
- All 9 Telegram agents now working

## Documentation
- Created PROJECT-MASTER-INDEX.md - single entry point
- Added various status documents and scripts

Tokens configured:
- Helion, NUTRA, Agromatrix (existing)
- Alateya, Clan, Eonarch (new)
- Druid, GreenFood, DAARWIZZ (configured)
2026-01-28 06:40:34 -08:00

469 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram History Recovery
Автоматичне відновлення історії повідомлень для агентів
"""
import asyncio
import logging
import os
from typing import List, Dict, Optional, Set
from datetime import datetime, timedelta
import httpx
logger = logging.getLogger(__name__)
# Configuration
HISTORY_LIMIT = int(os.getenv("TELEGRAM_HISTORY_LIMIT", "100")) # Кількість повідомлень для відновлення
MIN_COLLECTION_SIZE = int(os.getenv("MIN_COLLECTION_SIZE", "10")) # Мінімальний розмір колекції
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
ROUTER_URL = os.getenv("ROUTER_URL", "http://localhost:9101")
class TelegramHistoryRecovery:
"""Система відновлення історії Telegram для агентів"""
def __init__(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
self.processed_messages: Set[int] = set() # Кеш оброблених message_id
async def check_collection_health(self, agent_id: str) -> Dict[str, any]:
"""
Перевірити здоров'я колекції агента
Returns:
{
"exists": bool,
"points_count": int,
"needs_recovery": bool
}
"""
try:
collection_name = f"{agent_id}_messages"
url = f"{QDRANT_URL}/collections/{collection_name}"
response = await self.http_client.get(url)
if response.status_code == 404:
logger.warning(f"Collection {collection_name} не існує")
return {"exists": False, "points_count": 0, "needs_recovery": True}
response.raise_for_status()
data = response.json()
points_count = data.get("result", {}).get("points_count", 0)
needs_recovery = points_count < MIN_COLLECTION_SIZE
logger.info(f"Collection {collection_name}: {points_count} points, needs_recovery={needs_recovery}")
return {
"exists": True,
"points_count": points_count,
"needs_recovery": needs_recovery
}
except Exception as e:
logger.error(f"Помилка перевірки колекції {agent_id}: {e}")
return {"exists": False, "points_count": 0, "needs_recovery": True}
async def fetch_telegram_history(
self,
bot_token: str,
chat_id: int,
limit: int = HISTORY_LIMIT
) -> List[Dict]:
"""
Отримати історію повідомлень з Telegram
Note: Telegram API не має прямого методу для отримання історії.
Використовуємо getUpdates з offset для отримання останніх повідомлень.
"""
try:
messages = []
# Telegram не дає прямий доступ до історії чату через Bot API
# Альтернативний підхід: зберігати message_id і використовувати forwardMessage
# Або інтегруватися з MTProto для повного доступу
# Для спрощення: припускаємо що ми можемо отримати останні updates
url = f"https://api.telegram.org/bot{bot_token}/getUpdates"
params = {
"limit": limit,
"timeout": 1
}
response = await self.http_client.get(url, params=params)
response.raise_for_status()
data = response.json()
if not data.get("ok"):
logger.error(f"Telegram API error: {data}")
return []
updates = data.get("result", [])
for update in updates:
message = update.get("message")
if message and message.get("chat", {}).get("id") == chat_id:
messages.append(message)
logger.info(f"Отримано {len(messages)} повідомлень з Telegram для chat {chat_id}")
return messages
except Exception as e:
logger.error(f"Помилка отримання історії Telegram: {e}")
return []
async def check_message_exists(
self,
agent_id: str,
message_id: int
) -> bool:
"""
Перевірити чи повідомлення вже є в Qdrant
"""
if message_id in self.processed_messages:
return True
try:
collection_name = f"{agent_id}_messages"
url = f"{QDRANT_URL}/collections/{collection_name}/points/scroll"
payload = {
"filter": {
"must": [
{
"key": "message_id",
"match": {"value": message_id}
}
]
},
"limit": 1
}
response = await self.http_client.post(url, json=payload)
if response.status_code == 404:
return False
response.raise_for_status()
data = response.json()
points = data.get("result", {}).get("points", [])
exists = len(points) > 0
if exists:
self.processed_messages.add(message_id)
return exists
except Exception as e:
logger.error(f"Помилка перевірки message_id={message_id}: {e}")
return False
async def ingest_message(
self,
agent_id: str,
message: Dict,
bot_token: str
) -> bool:
"""
Відправити повідомлення на інжест через Router
"""
try:
message_id = message.get("message_id")
text = message.get("text", "")
if not text or not message_id:
return False
# Перевірити чи вже є
if await self.check_message_exists(agent_id, message_id):
logger.debug(f"Message {message_id} вже існує, пропускаю")
return True
# Відправити на інжест через Router
from_user = message.get("from", {})
chat = message.get("chat", {})
payload = {
"message": text,
"mode": "ingest_history", # Спеціальний режим для історичних повідомлень
"agent": agent_id,
"metadata": {
"source": "telegram_history_recovery",
"message_id": message_id,
"user_id": f"tg:{from_user.get('id')}",
"chat_id": str(chat.get("id")),
"username": from_user.get("username", ""),
"date": message.get("date"),
"is_historical": True
}
}
response = await self.http_client.post(
f"{ROUTER_URL}/chat",
json=payload,
timeout=10.0
)
if response.status_code == 200:
self.processed_messages.add(message_id)
logger.debug(f"✅ Інжест message {message_id} успішний")
return True
else:
logger.error(f"❌ Помилка інжесту message {message_id}: {response.status_code}")
return False
except Exception as e:
logger.error(f"Помилка інжесту повідомлення: {e}")
return False
async def recover_chat_history(
self,
agent_id: str,
bot_token: str,
chat_id: int,
limit: int = HISTORY_LIMIT
) -> Dict[str, any]:
"""
Відновити історію чату для агента
Returns:
{
"success": bool,
"messages_fetched": int,
"messages_ingested": int,
"messages_skipped": int
}
"""
logger.info(f"🔄 Починаю відновлення історії для {agent_id}, chat={chat_id}, limit={limit}")
# Отримати повідомлення з Telegram
messages = await self.fetch_telegram_history(bot_token, chat_id, limit)
if not messages:
logger.warning(f"Не отримано повідомлень для {agent_id}")
return {
"success": False,
"messages_fetched": 0,
"messages_ingested": 0,
"messages_skipped": 0
}
# Інжестити кожне повідомлення
ingested = 0
skipped = 0
for message in messages:
success = await self.ingest_message(agent_id, message, bot_token)
if success:
ingested += 1
else:
skipped += 1
# Невелика затримка щоб не перевантажити систему
await asyncio.sleep(0.1)
result = {
"success": True,
"messages_fetched": len(messages),
"messages_ingested": ingested,
"messages_skipped": skipped
}
logger.info(f"✅ Відновлення завершено для {agent_id}: {result}")
return result
async def auto_recover_on_startup(
self,
agents: List[Dict[str, str]]
) -> Dict[str, any]:
"""
Автоматичне відновлення при старті Gateway
Args:
agents: List of {"agent_id": str, "bot_token": str, "chat_id": int}
Returns:
{
"total_agents": int,
"agents_recovered": int,
"results": {agent_id: result}
}
"""
logger.info(f"🚀 Автоматичне відновлення при старті для {len(agents)} агентів")
results = {}
agents_recovered = 0
for agent_config in agents:
agent_id = agent_config.get("agent_id")
bot_token = agent_config.get("bot_token")
chat_id = agent_config.get("chat_id")
if not all([agent_id, bot_token, chat_id]):
logger.warning(f"Пропускаю {agent_id}: неповна конфігурація")
continue
# Перевірити стан колекції
health = await self.check_collection_health(agent_id)
if health["needs_recovery"]:
logger.info(f"🔧 Агент {agent_id} потребує відновлення (points={health['points_count']})")
result = await self.recover_chat_history(agent_id, bot_token, chat_id)
results[agent_id] = result
if result["success"]:
agents_recovered += 1
else:
logger.info(f"✅ Агент {agent_id} в порядку (points={health['points_count']})")
results[agent_id] = {"status": "healthy", "points_count": health["points_count"]}
summary = {
"total_agents": len(agents),
"agents_recovered": agents_recovered,
"results": results
}
logger.info(f"🏁 Автоматичне відновлення завершено: {summary}")
return summary
async def nightly_sync(
self,
agents: List[Dict[str, str]]
) -> Dict[str, any]:
"""
Нічна синхронізація історії (cron job о 04:00)
Оновлює тільки активні чати (з повідомленнями за останні 7 днів)
"""
logger.info(f"🌙 Нічна синхронізація для {len(agents)} агентів")
results = {}
for agent_config in agents:
agent_id = agent_config.get("agent_id")
bot_token = agent_config.get("bot_token")
chat_id = agent_config.get("chat_id")
if not all([agent_id, bot_token, chat_id]):
continue
# Синхронізувати останні 20 повідомлень (швидше)
result = await self.recover_chat_history(agent_id, bot_token, chat_id, limit=20)
results[agent_id] = result
# Затримка між агентами
await asyncio.sleep(1)
logger.info(f"🌙 Нічна синхронізація завершена: {results}")
return results
async def close(self):
"""Закрити HTTP клієнт"""
await self.http_client.aclose()
# Singleton instance
recovery_service = TelegramHistoryRecovery()
async def auto_recover_on_startup_all_agents():
"""
Helper функція для запуску при старті Gateway.
Автоматично виявляє агентів з PostgreSQL та .env токенів
"""
import psycopg2
from psycopg2.extras import RealDictCursor
# Конфігурація з .env
agents_config = [
{
"agent_id": "helion",
"bot_token": os.getenv("HELION_TELEGRAM_BOT_TOKEN"),
},
{
"agent_id": "nutra",
"bot_token": os.getenv("NUTRA_TELEGRAM_BOT_TOKEN"),
},
{
"agent_id": "agromatrix",
"bot_token": os.getenv("AGROMATRIX_TELEGRAM_BOT_TOKEN"),
},
{
"agent_id": "greenfood",
"bot_token": os.getenv("GREENFOOD_TELEGRAM_BOT_TOKEN"),
},
{
"agent_id": "daarwizz",
"bot_token": os.getenv("TELEGRAM_BOT_TOKEN"), # Загальний токен
},
]
# Підключення до PostgreSQL для отримання chat_id
try:
# Спершу спробувати з .env, потім дефолтний URL
db_url = os.getenv("DATABASE_URL")
if not db_url:
db_url = "postgresql://daarion:DaarionDB2026!@dagi-postgres:5432/daarion_memory"
conn = psycopg2.connect(db_url)
cursor = conn.cursor(cursor_factory=RealDictCursor)
# Отримати унікальні chat_id з бази
# Схема: fact_key = 'doc_context:telegram:CHAT_ID'
cursor.execute("""
SELECT DISTINCT split_part(fact_key, ':', 3) as chat_id
FROM user_facts
WHERE fact_key LIKE 'doc_context:telegram:%'
AND split_part(fact_key, ':', 3) != ''
ORDER BY chat_id
""")
# Зібрати всі chat_id
all_chat_ids = []
for row in cursor.fetchall():
chat_id = row["chat_id"]
try:
all_chat_ids.append(int(chat_id))
except ValueError:
logger.warning(f"Невалідний chat_id {chat_id}")
cursor.close()
conn.close()
logger.info(f"Знайдено {len(all_chat_ids)} унікальних Telegram чатів: {all_chat_ids}")
except Exception as e:
logger.error(f"Помилка підключення до БД для chat_id: {e}")
logger.info("Відновлення пропущено через недоступність БД")
return {"status": "skipped", "reason": "database unavailable"}
# Сформувати список агентів для recovery
agents = []
for config in agents_config:
agent_id = config["agent_id"]
bot_token = config["bot_token"]
if not bot_token:
logger.debug(f"Пропускаю {agent_id}: немає токену в .env")
continue
# Додати всі чати для цього агента
if not all_chat_ids:
logger.debug(f"Пропускаю {agent_id}: немає активних чатів в БД")
continue
# Додати кожен чат для цього агента
for chat_id in all_chat_ids:
agents.append({
"agent_id": agent_id,
"bot_token": bot_token,
"chat_id": chat_id
})
if not agents:
logger.info("Немає агентів для відновлення")
return {"status": "no_agents", "agents": []}
logger.info(f"Запуск відновлення для {len(agents)} агент-чат пар")
return await recovery_service.auto_recover_on_startup(agents)