Files
microdao-daarion/telegram-infrastructure/telegram-gateway/app/router_handler.py
Apple a6e531a098 fix: NODE1_REPAIR - healthchecks, dependencies, SSR env, telegram gateway
TASK_PHASE_NODE1_REPAIR:
- Fix daarion-web SSR: use CITY_API_BASE_URL instead of 127.0.0.1
- Fix auth API routes: use AUTH_API_URL env var
- Add wget to Dockerfiles for healthchecks (stt, ocr, web-search, swapper, vector-db, rag)
- Update healthchecks to use wget instead of curl
- Fix vector-db-service: update torch==2.4.0, sentence-transformers==2.6.1
- Fix rag-service: correct haystack imports for v2.x
- Fix telegram-gateway: remove msg.ack() for non-JetStream NATS
- Add /health endpoint to nginx mvp-routes.conf
- Add room_role, is_public, sort_order columns to city_rooms migration
- Add TASK_PHASE_NODE1_REPAIR.md and DEPLOY_NODE1_REPAIR.md docs

Previous tasks included:
- TASK 039-044: Orchestrator rooms, Matrix chat cleanup, CrewAI integration
2025-11-29 05:17:08 -08:00

502 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
NATS subscriber для обробки подій agent.telegram.update
Викликає Router через HTTP API та відправляє відповідь назад в Telegram
"""
import asyncio
import json
import logging
from typing import Dict, Any
import httpx
import nats
from .config import settings
from .models import TelegramUpdateEvent, TelegramSendCommand
from .telegram_listener import telegram_listener
logger = logging.getLogger(__name__)
class RouterHandler:
"""Обробник подій з NATS, який викликає Router та відправляє відповіді"""
def __init__(self):
self._nc = None
self._sub = None
self._router_url = settings.ROUTER_BASE_URL
self._running = False
async def connect(self):
"""Підключитися до NATS"""
if self._nc is None or self._nc.is_closed:
self._nc = await nats.connect(settings.NATS_URL)
logger.info(f"✅ RouterHandler connected to NATS at {settings.NATS_URL}")
async def start_subscription(self):
"""Підписатися на події agent.telegram.update"""
await self.connect()
async def message_handler(msg):
"""Обробка повідомлення з NATS"""
try:
data = json.loads(msg.data.decode())
event = TelegramUpdateEvent(**data)
logger.info(
f"📥 Received NATS event: agent={event.agent_id}, "
f"chat={event.chat_id}, text_len={len(event.text or '')}"
)
# Обробити подію асинхронно
asyncio.create_task(self._handle_telegram_event(event))
# NOTE: No ack() for core NATS (non-JetStream) subscriptions
# JetStream ack is only needed when using JetStream consumers
except Exception as e:
logger.error(f"❌ Error processing NATS message: {e}", exc_info=True)
# Підписатися на subject
self._sub = await self._nc.subscribe("agent.telegram.update", cb=message_handler)
self._running = True
logger.info("✅ Subscribed to NATS subject: agent.telegram.update")
async def _handle_telegram_event(self, event: TelegramUpdateEvent):
"""Обробити подію Telegram та викликати Router"""
try:
metadata = event.metadata or {}
# Обробка фото (Vision Encoder)
if "photo" in metadata:
await self._handle_photo(event, metadata)
return
# Обробка документів (Parser Service)
if "document" in metadata:
await self._handle_document(event, metadata)
return
# Звичайні текстові повідомлення
if not event.text:
logger.debug(f"Skipping event without text: agent={event.agent_id}")
return
# Отримати системний промпт для агента
system_prompt = self._get_system_prompt(event.agent_id)
# Викликати Router через HTTP API
# Структура: payload.context.system_prompt (як очікує Router)
router_request = {
"message": event.text,
"mode": "chat",
"agent": event.agent_id,
"source": "telegram",
"user_id": f"tg:{event.user_id}",
"session_id": f"telegram:{event.chat_id}",
"payload": {
"context": {
"agent_name": event.agent_id.upper(),
"system_prompt": system_prompt, # Системний промпт для агента
}
}
}
# Детальне логування перед відправкою
payload_keys = list(router_request.get('payload', {}).keys())
context_keys = list(router_request.get('payload', {}).get('context', {}).keys())
sp_len = len(system_prompt) if system_prompt else 0
logger.info(
f"📞 Calling Router: agent={event.agent_id}, chat={event.chat_id}"
)
logger.info(
f" payload.keys={payload_keys}, "
f"context.keys={context_keys}, "
f"system_prompt_len={sp_len}"
)
if system_prompt:
logger.info(f" system_prompt preview: {system_prompt[:80]}...")
# Логування JSON перед відправкою (з повним system_prompt для діагностики)
import json
full_json = json.dumps(router_request, ensure_ascii=False)
logger.info(f"📤 Full request JSON length: {len(full_json)} bytes")
logger.info(f"📤 Payload.context.system_prompt in request: {router_request.get('payload', {}).get('context', {}).get('system_prompt', '')[:100]}...")
async with httpx.AsyncClient(timeout=120.0) as client: # Збільшено timeout до 120 сек
logger.info(f"📡 Sending HTTP POST to {self._router_url}/route")
response = await client.post(
f"{self._router_url}/route",
json=router_request
)
logger.info(f"📡 Router response status: {response.status_code}")
# Перевірка на 502 Bad Gateway
if response.status_code == 502:
logger.error(f"❌ Router returned 502 Bad Gateway for agent={event.agent_id}")
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text="⚠️ Вибач, зараз велике навантаження. Спробуй через хвилину."
)
return
response.raise_for_status()
result = response.json()
# Отримати відповідь
answer = None
if isinstance(result, dict):
answer = (
result.get("data", {}).get("text") or
result.get("data", {}).get("answer") or
result.get("response") or
result.get("text")
)
if not answer:
logger.warning(f"⚠️ No answer from Router for agent={event.agent_id}")
answer = "Вибач, зараз не можу відповісти."
logger.info(f"📤 Sending response: agent={event.agent_id}, chat={event.chat_id}, len={len(answer)}")
# Перевірити чи треба відповідати голосом (якщо користувач надіслав voice)
raw_update = event.raw_update or {}
should_reply_voice = raw_update.get("voice") or raw_update.get("audio") or raw_update.get("video_note")
if should_reply_voice:
# Синтезувати голос
audio_bytes = await self._text_to_speech(answer)
if audio_bytes:
logger.info(f"🔊 Sending voice response: agent={event.agent_id}, audio_size={len(audio_bytes)}")
await telegram_listener.send_voice(
agent_id=event.agent_id,
chat_id=event.chat_id,
audio_bytes=audio_bytes
)
else:
# Fallback to text
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text=answer
)
else:
# Звичайна текстова відповідь
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text=answer
)
logger.info(f"✅ Response sent: agent={event.agent_id}, chat={event.chat_id}")
except httpx.HTTPError as e:
logger.error(f"❌ HTTP error calling Router: {e}")
# Відправити повідомлення про помилку користувачу
try:
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text="❌ Помилка зв'язку з сервером. Спробуй ще раз."
)
except:
pass
except Exception as e:
logger.error(f"❌ Error handling Telegram event: {e}", exc_info=True)
async def _handle_photo(self, event: TelegramUpdateEvent, metadata: Dict[str, Any]):
"""Обробити фото через Swapper vision-8b модель"""
try:
photo_info = metadata.get("photo", {})
file_url = photo_info.get("file_url", "")
caption = event.text or ""
logger.info(f"🖼️ Processing photo: agent={event.agent_id}, url={file_url[:50]}...")
# Відправити до Router з specialist_vision_8b через Swapper
router_request = {
"message": f"Опиши це зображення детально: {file_url}",
"mode": "chat",
"agent": event.agent_id,
"metadata": {
"source": "telegram",
"chat_id": event.chat_id,
"file_url": file_url,
"has_image": True,
},
}
# Override LLM to use specialist_vision_8b for image understanding
router_request["metadata"]["use_llm"] = "specialist_vision_8b"
try:
async with httpx.AsyncClient(timeout=90.0) as client:
response = await client.post(f"{self._router_url}/route", json=router_request)
response.raise_for_status()
result = response.json()
if result.get("ok"):
answer_text = result.get("data", {}).get("text") or result.get("response", "")
if answer_text:
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text=f"✅ **Фото оброблено**\n\n{answer_text}"
)
return
# Якщо помилка
error_msg = result.get("error", "Unknown error")
logger.error(f"Router error: {error_msg}")
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text=f"Вибач, не вдалося обробити фото: {error_msg}"
)
except Exception as e:
logger.error(f"Error calling Router: {e}", exc_info=True)
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text="Вибач, не вдалося обробити фото. Переконайся, що Swapper Service з vision-8b моделлю запущений."
)
logger.info(f"✅ Photo response sent: agent={event.agent_id}, chat={event.chat_id}")
except Exception as e:
logger.error(f"❌ Error handling photo: {e}", exc_info=True)
try:
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text="❌ Помилка обробки зображення."
)
except:
pass
async def _handle_document(self, event: TelegramUpdateEvent, metadata: Dict[str, Any]):
"""Обробити PDF через Parser Service"""
try:
doc_info = metadata.get("document", {})
file_url = doc_info.get("file_url", "")
file_name = doc_info.get("file_name", "document.pdf")
logger.info(f"📄 Processing document: agent={event.agent_id}, file={file_name}")
# Викликати Parser Service через DAGI Router
parsed_content = await self._parse_document(file_url, file_name)
# Якщо є питання в caption/text - відповісти на основі parsed content
user_question = event.text
if user_question and user_question != f"[DOCUMENT] {file_name}":
# Додати parsed content до контексту
system_prompt = self._get_system_prompt(event.agent_id)
enhanced_text = f"Користувач запитує про документ '{file_name}':\n{user_question}\n\n[DOCUMENT_CONTENT]:\n{parsed_content[:2000]}"
# Викликати Router для відповіді
router_request = {
"message": enhanced_text,
"mode": "chat",
"agent": event.agent_id,
"source": "telegram",
"user_id": f"tg:{event.user_id}",
"session_id": f"telegram:{event.chat_id}",
"payload": {
"context": {
"agent_name": event.agent_id.upper(),
"system_prompt": system_prompt,
}
}
}
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self._router_url}/route",
json=router_request
)
if response.status_code == 502:
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text="⚠️ Вибач, зараз велике навантаження. Спробуй через хвилину."
)
return
response.raise_for_status()
result = response.json()
# Отримати відповідь
answer = (
result.get("data", {}).get("text") or
result.get("data", {}).get("answer") or
result.get("response") or
result.get("text") or
"Вибач, не зміг проаналізувати документ."
)
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text=answer
)
else:
# Просто парсинг без питання
summary = parsed_content[:500] if parsed_content else "Документ оброблено"
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text=f"✅ Документ '{file_name}' оброблено.\n\n{summary}...\n\nЗадай питання про нього!"
)
logger.info(f"✅ Document response sent: agent={event.agent_id}, chat={event.chat_id}")
except Exception as e:
logger.error(f"❌ Error handling document: {e}", exc_info=True)
try:
await telegram_listener.send_message(
agent_id=event.agent_id,
chat_id=event.chat_id,
text="❌ Помилка обробки документу. Спробуй ще раз."
)
except:
pass
async def _parse_document(self, doc_url: str, file_name: str) -> str:
"""Викликати Parser Service для PDF"""
try:
logger.info(f"📡 Calling Parser Service: url={doc_url[:50]}..., file={file_name}")
async with httpx.AsyncClient(timeout=90.0) as client:
# Виклик DAGI Router з mode: "doc_parse"
response = await client.post(
f"{self._router_url}/route",
json={
"mode": "doc_parse",
"agent": "parser",
"payload": {
"context": {
"doc_url": doc_url,
"file_name": file_name,
"output_mode": "markdown"
}
}
}
)
response.raise_for_status()
result = response.json()
# Витягнути parsed content
if "data" in result:
markdown = result["data"].get("markdown", "")
if markdown:
return markdown
# Fallback
return result.get("text", "") or result.get("response", "") or "Документ оброблено"
except Exception as e:
logger.error(f"❌ Parser Service error: {e}")
return "[Не вдалося прочитати документ]"
async def _text_to_speech(self, text: str) -> bytes:
"""Синтезувати голос через TTS Service"""
try:
logger.info(f"🔊 Calling TTS Service: text_len={len(text)}")
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
"http://dagi-tts:9100/tts",
json={
"text": text[:500], # Обмежуємо довжину для TTS
"lang": "uk"
}
)
response.raise_for_status()
audio_bytes = response.content
logger.info(f"✅ TTS response: {len(audio_bytes)} bytes")
return audio_bytes
except Exception as e:
logger.error(f"❌ TTS Service error: {e}")
return b"" # Fallback to text
def _get_system_prompt(self, agent_id: str) -> str:
"""Отримати системний промпт для агента"""
# Системні промпти для агентів
prompts = {
"helion": """Ти - Helion, AI-агент платформи Energy Union екосистеми DAARION.city.
Допомагай користувачам з технологіями EcoMiner/BioMiner, токеномікою та DAO governance.
Твої основні функції:
- Консультації з енергетичними технологіями (сонячні панелі, вітряки, біогаз)
- Пояснення токеноміки Energy Union (ENERGY токен, стейкінг, винагороди)
- Допомога з onboarding в DAO
- Відповіді на питання про EcoMiner/BioMiner устаткування
Стиль спілкування:
- професійний, технічний, але зрозумілий
- точний у цифрах та даних
- конструктивний у рекомендаціях
Важливо:
- Не вигадуй дані, яких немає в системі
- Якщо дані недоступні — чесно скажи про це
- Не давай фінансових порад без консультації з експертами""",
"daarwizz": """Ти — DAARWIZZ, офіційний AI-агент екосистеми DAARION.city.
Допомагай учасникам з microDAO, ролями та процесами.
Відповідай коротко, практично, враховуй RBAC контекст користувача.""",
"greenfood": """Ти — GREENFOOD Assistant, фронтовий оркестратор ERP-системи для крафтових виробників, хабів та покупців.
Твоя місія: зрозуміти, хто з тобою говорить (комітент, менеджер складу, логіст, бухгалтер, маркетолог, покупець), виявити намір і делегувати завдання спеціалізованим агентам GREENFOOD.
У твоєму розпорядженні 12 спеціалізованих агентів:
- Product & Catalog (каталог товарів)
- Batch & Quality (партії та якість)
- Vendor Success (успіх комітентів)
- Warehouse (склад)
- Logistics & Delivery (доставка)
- Seller (продажі)
- Customer Care (підтримка)
- Finance & Pricing (фінанси)
- SMM & Campaigns (маркетинг)
- SEO & Web (SEO)
- Analytics & BI (аналітика)
- Compliance & Audit (аудит)
Правила роботи:
- Спочатку уточнюй роль і контекст
- Перетворюй запит на чітку дію
- Не вигадуй дані - якщо чогось немає, чесно кажи
- Завжди давай коротке резюме: що зроблено, наступні кроки
Відповідай українською, чітко та по-діловому.""",
}
prompt = prompts.get(agent_id.lower(), "")
if prompt:
logger.debug(f"Using system prompt for agent={agent_id}, len={len(prompt)}")
else:
logger.warning(f"No system prompt found for agent={agent_id}")
return prompt
async def close(self):
"""Закрити підписку та з'єднання"""
self._running = False
if self._sub:
await self._sub.unsubscribe()
if self._nc and not self._nc.is_closed:
await self._nc.drain()
await self._nc.close()
logger.info("RouterHandler closed")
router_handler = RouterHandler()