diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index 06ce285c..55803978 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -2,11 +2,13 @@ Bot Gateway HTTP API Handles incoming webhooks from Telegram, Discord, etc. """ +import asyncio import logging import os +import time import httpx from pathlib import Path -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List, Tuple from datetime import datetime from dataclasses import dataclass @@ -109,6 +111,18 @@ HELION_CONFIG = load_agent_config( default_prompt=f"Ти — {os.getenv('HELION_NAME', 'Helion')}, AI-агент платформи Energy Union. Допомагай учасникам з технологіями та токеномікою." ) +# GREENFOOD Configuration +GREENFOOD_CONFIG = load_agent_config( + agent_id="greenfood", + name=os.getenv("GREENFOOD_NAME", "GREENFOOD"), + prompt_path=os.getenv( + "GREENFOOD_PROMPT_PATH", + str(Path(__file__).parent / "greenfood_prompt.txt"), + ), + telegram_token_env="GREENFOOD_TELEGRAM_BOT_TOKEN", + default_prompt="Ти — GREENFOOD Assistant, AI-ERP для крафтових виробників та кооперативів. Допомагай з обліком партій, логістикою, бухгалтерією та продажами." +) + # Registry of all agents (для легкого додавання нових агентів) # # Щоб додати нового агента: @@ -136,6 +150,7 @@ HELION_CONFIG = load_agent_config( AGENT_REGISTRY: Dict[str, AgentConfig] = { "daarwizz": DAARWIZZ_CONFIG, "helion": HELION_CONFIG, + "greenfood": GREENFOOD_CONFIG, } # Backward compatibility @@ -143,6 +158,8 @@ DAARWIZZ_NAME = DAARWIZZ_CONFIG.name DAARWIZZ_SYSTEM_PROMPT = DAARWIZZ_CONFIG.system_prompt HELION_NAME = HELION_CONFIG.name HELION_SYSTEM_PROMPT = HELION_CONFIG.system_prompt +GREENFOOD_NAME = GREENFOOD_CONFIG.name +GREENFOOD_SYSTEM_PROMPT = GREENFOOD_CONFIG.system_prompt # ======================================== @@ -181,6 +198,90 @@ def get_dao_id(chat_id: str, source: str) -> str: return CHAT_TO_DAO.get(key, CHAT_TO_DAO["default"]) +# ======================================== +# Helper Functions +# ======================================== + +SERVICE_ACK_PREFIXES = ( + "📥 Імпортую", + "📄 Обробляю", + "Обробляю голосове", + "🎤", +) + + +def is_service_response(text: str) -> bool: + """Heuristic: визначає, чи відповідь є службовою (вітальна/ack).""" + if not text: + return True + stripped = text.strip() + if not stripped: + return True + if len(stripped) < 5: + return True + lower = stripped.lower() + return any(lower.startswith(prefix.lower()) for prefix in SERVICE_ACK_PREFIXES) + + +def extract_bot_mentions(text: str) -> List[str]: + """Витягує згадки інших ботів виду @NameBot.""" + if not text: + return [] + mentions = [] + for token in text.split(): + if token.startswith("@") and token[1:].lower().endswith("bot"): + mentions.append(token[1:]) + return mentions + + +def should_force_concise_reply(text: str) -> bool: + """Якщо коротке або без питального знаку — просимо агента відповісти стисло.""" + if not text: + return True + stripped = text.strip() + if len(stripped) <= 120 and "?" not in stripped: + return True + return False + + +COMPLEX_REASONING_KEYWORDS = [ + "стратег", "roadmap", "алгоритм", "architecture", "архітектур", + "прогноз", "scenario", "модель", "аналіз", "побудуй", "plan", "дослідж", + "симуляц", "forecast", "оптиміз", "розрахуй", "calculate", "predict" +] + + +def requires_complex_reasoning(text: str) -> bool: + if not text: + return False + stripped = text.strip() + if len(stripped) > 400: + return True + lower = stripped.lower() + return any(keyword in lower for keyword in COMPLEX_REASONING_KEYWORDS) + + +LAST_RESPONSE_CACHE: Dict[Tuple[str, str], Dict[str, Any]] = {} +LAST_RESPONSE_TTL = float(os.getenv("TELEGRAM_LAST_RESPONSE_TTL", "15")) + + +def get_cached_response(agent_id: str, chat_id: str, text: str) -> Optional[str]: + entry = LAST_RESPONSE_CACHE.get((agent_id, chat_id)) + if not entry: + return None + if entry["text"] == text and time.time() - entry["ts"] < LAST_RESPONSE_TTL: + return entry["answer"] + return None + + +def store_response_cache(agent_id: str, chat_id: str, text: str, answer: str) -> None: + LAST_RESPONSE_CACHE[(agent_id, chat_id)] = { + "text": text, + "answer": answer, + "ts": time.time(), + } + + # ======================================== # Helper Functions # ======================================== @@ -388,7 +489,9 @@ async def process_photo( message=f"[Photo: {file_id}]", response=answer_text, channel_id=chat_id, - scope="short_term" + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={"context": "photo"}, ) return {"ok": True, "agent": agent_config.agent_id, "model": "specialist_vision_8b"} @@ -467,7 +570,6 @@ async def process_document( raise HTTPException(status_code=400, detail="Failed to get file from Telegram") file_url = f"https://api.telegram.org/file/bot{telegram_token}/{file_path}" - await send_telegram_message(chat_id, "📄 Обробляю PDF-документ... Це може зайняти кілька секунд.", telegram_token) session_id = f"telegram:{chat_id}" result = await parse_document( @@ -628,10 +730,29 @@ async def handle_telegram_webhook( user_id = str(from_user.get("id", "unknown")) chat_id = str(chat.get("id", "unknown")) username = from_user.get("username", "") + first_name = from_user.get("first_name") + last_name = from_user.get("last_name") + is_sender_bot = bool(from_user.get("is_bot") or (username and username.lower().endswith("bot"))) # Get DAO ID for this chat dao_id = get_dao_id(chat_id, "telegram") + # Оновлюємо факти про користувача/агента для побудови графу пам'яті + asyncio.create_task( + memory_client.upsert_fact( + user_id=f"tg:{user_id}", + fact_key="profile", + fact_value_json={ + "username": username, + "first_name": first_name, + "last_name": last_name, + "language_code": from_user.get("language_code"), + "is_bot": is_sender_bot, + }, + team_id=dao_id, + ) + ) + telegram_token = agent_config.get_telegram_token() if not telegram_token: raise HTTPException(status_code=500, detail=f"Telegram token not configured for {agent_config.name}") @@ -658,8 +779,6 @@ async def handle_telegram_webhook( file_path = await get_telegram_file_path(file_id, telegram_token) if file_path: file_url = f"https://api.telegram.org/file/bot{telegram_token}/{file_path}" - await send_telegram_message(chat_id, "📥 Імпортую документ у RAG...", telegram_token) - result = await ingest_document( session_id=session_id, doc_url=file_url, @@ -753,6 +872,28 @@ async def handle_telegram_webhook( raise HTTPException(status_code=400, detail="No text or voice in message") logger.info(f"{agent_config.name} Telegram message from {username} (tg:{user_id}) in chat {chat_id}: {text[:50]}") + mentioned_bots = extract_bot_mentions(text) + needs_complex_reasoning = requires_complex_reasoning(text) + + cached_answer = get_cached_response(agent_config.agent_id, chat_id, text) + if cached_answer: + await send_telegram_message(chat_id, cached_answer, telegram_token) + await memory_client.save_chat_turn( + agent_id=agent_config.agent_id, + team_id=dao_id, + user_id=f"tg:{user_id}", + message=text, + response=cached_answer, + channel_id=chat_id, + scope="short_term", + save_agent_response=not is_service_response(cached_answer), + agent_metadata={ + "cached_reply": True, + "mentioned_bots": mentioned_bots, + "requires_complex_reasoning": needs_complex_reasoning, + }, + ) + return {"ok": True, "agent": agent_config.agent_id, "cached": True} # Check if there's a document context for follow-up questions session_id = f"telegram:{chat_id}" @@ -809,14 +950,31 @@ async def handle_telegram_webhook( "session_id": f"tg:{chat_id}:{dao_id}", "username": username, "chat_id": chat_id, + "sender_is_bot": is_sender_bot, + "mentioned_bots": mentioned_bots, + "requires_complex_reasoning": needs_complex_reasoning, }, "context": { "agent_name": agent_config.name, "system_prompt": agent_config.system_prompt, "memory": memory_context, + "participants": { + "sender_is_bot": is_sender_bot, + "mentioned_bots": mentioned_bots, + }, }, } + if should_force_concise_reply(text): + router_request["message"] = ( + f"{text}\n\n(Інструкція: дай максимально коротку відповідь, якщо не просили деталей " + "і дочекайся додаткового питання.)" + ) + + if needs_complex_reasoning: + router_request["metadata"]["provider"] = "cloud_deepseek" + router_request["metadata"]["reason"] = "auto_complex" + # Send to Router logger.info(f"Sending to Router: agent={agent_config.agent_id}, dao={dao_id}, user=tg:{user_id}") response = await send_to_router(router_request) @@ -835,6 +993,23 @@ async def handle_telegram_webhook( # Send response back to Telegram await send_telegram_message(chat_id, answer_text, telegram_token) + await memory_client.save_chat_turn( + agent_id=agent_config.agent_id, + team_id=dao_id, + user_id=f"tg:{user_id}", + message=text, + response=answer_text, + channel_id=chat_id, + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={ + "mentioned_bots": mentioned_bots, + "requires_complex_reasoning": needs_complex_reasoning, + }, + ) + + store_response_cache(agent_config.agent_id, chat_id, text, answer_text) + return {"ok": True, "agent": agent_config.agent_id} else: error_msg = response.get("error", "Unknown error") if isinstance(response, dict) else "Router error" @@ -911,8 +1086,6 @@ async def _old_telegram_webhook(update: TelegramUpdate): file_path = await get_telegram_file_path(file_id) if file_path: file_url = f"https://api.telegram.org/file/bot{telegram_token}/{file_path}" - await send_telegram_message(chat_id, "📥 Імпортую документ у RAG...") - result = await ingest_document( session_id=session_id, doc_url=file_url, @@ -984,9 +1157,6 @@ async def _old_telegram_webhook(update: TelegramUpdate): # Build file URL file_url = f"https://api.telegram.org/file/bot{telegram_token}/{file_path}" - # Send "Processing..." message - await send_telegram_message(chat_id, "📄 Обробляю PDF-документ... Це може зайняти кілька секунд.") - # Use doc_service for parsing session_id = f"telegram:{chat_id}" result = await parse_document( @@ -1105,7 +1275,9 @@ async def _old_telegram_webhook(update: TelegramUpdate): message=f"[Photo: {file_id}]", response=answer_text, channel_id=chat_id, - scope="short_term" + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={"context": "photo"}, ) return {"ok": True, "agent": "daarwizz", "model": "specialist_vision_8b"} @@ -1261,7 +1433,9 @@ async def _old_telegram_webhook(update: TelegramUpdate): message=text, response=answer_text, channel_id=chat_id, - scope="short_term" + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={"context": "legacy_daarwizz"}, ) # Send response back to Telegram @@ -1354,7 +1528,9 @@ async def discord_webhook(message: DiscordMessage): message=text, response=answer_text, channel_id=channel_id, - scope="short_term" + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={"source": "discord"}, ) # TODO: Send response back to Discord @@ -1492,6 +1668,22 @@ async def helion_telegram_webhook(update: TelegramUpdate): raise HTTPException(status_code=500, detail=str(e)) +# ======================================== +# GREENFOOD Telegram Webhook +# ======================================== + +@router.post("/greenfood/telegram/webhook") +async def greenfood_telegram_webhook(update: TelegramUpdate): + """ + Handle Telegram webhook for GREENFOOD agent. + """ + try: + return await handle_telegram_webhook(GREENFOOD_CONFIG, update) + except Exception as e: + logger.error(f"Error handling GREENFOOD Telegram webhook: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + # Legacy code - will be removed after testing async def _old_helion_telegram_webhook(update: TelegramUpdate): """Стара версія - використовується для тестування""" @@ -1533,8 +1725,6 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): file_path = await get_telegram_file_path(file_id) if file_path: file_url = f"https://api.telegram.org/file/bot{helion_token}/{file_path}" - await send_telegram_message(chat_id, "📥 Імпортую документ у RAG...", helion_token) - result = await ingest_document( session_id=session_id, doc_url=file_url, @@ -1605,7 +1795,6 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): raise HTTPException(status_code=400, detail="Failed to get file from Telegram") file_url = f"https://api.telegram.org/file/bot{helion_token}/{file_path}" - await send_telegram_message(chat_id, "📄 Обробляю PDF-документ... Це може зайняти кілька секунд.", helion_token) session_id = f"telegram:{chat_id}" result = await parse_document( @@ -1720,7 +1909,9 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): message=f"[Photo: {file_id}]", response=answer_text, channel_id=chat_id, - scope="short_term" + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={"context": "photo"}, ) return {"ok": True, "agent": "helion", "model": "specialist_vision_8b"} @@ -1745,6 +1936,8 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): raise HTTPException(status_code=400, detail="No text in message") logger.info(f"Helion Telegram message from {username} (tg:{user_id}) in chat {chat_id}: {text[:50]}") + mentioned_bots = extract_bot_mentions(text) + needs_complex_reasoning = requires_complex_reasoning(text) # Check if there's a document context for follow-up questions session_id = f"telegram:{chat_id}" @@ -1802,6 +1995,8 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): "session_id": f"tg:{chat_id}:{dao_id}", "username": username, "chat_id": chat_id, + "mentioned_bots": mentioned_bots, + "requires_complex_reasoning": needs_complex_reasoning, }, "context": { "agent_name": HELION_NAME, @@ -1831,7 +2026,13 @@ async def _old_helion_telegram_webhook(update: TelegramUpdate): message=text, response=answer_text, channel_id=chat_id, - scope="short_term" + scope="short_term", + save_agent_response=not is_service_response(answer_text), + agent_metadata={ + "context": "helion", + "mentioned_bots": mentioned_bots, + "requires_complex_reasoning": needs_complex_reasoning, + }, ) # Send response back to Telegram diff --git a/gateway-bot/memory_client.py b/gateway-bot/memory_client.py index d793e933..aab3beb5 100644 --- a/gateway-bot/memory_client.py +++ b/gateway-bot/memory_client.py @@ -1,16 +1,15 @@ -""" -Memory Service Client для Gateway -Використовується для отримання та збереження пам'яті діалогів -""" +import asyncio import os import logging -from typing import Optional, Dict, Any, List +import time +from typing import Optional, Dict, Any, List, Tuple from datetime import datetime import httpx logger = logging.getLogger(__name__) MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000") +CONTEXT_CACHE_TTL = float(os.getenv("MEMORY_CONTEXT_CACHE_TTL", "5")) class MemoryClient: @@ -19,6 +18,17 @@ class MemoryClient: def __init__(self, base_url: str = MEMORY_SERVICE_URL): self.base_url = base_url.rstrip("/") self.timeout = 10.0 + self._context_cache: Dict[str, Tuple[float, Dict[str, Any]]] = {} + + def _cache_key( + self, + user_id: str, + agent_id: str, + team_id: str, + channel_id: Optional[str], + limit: int + ) -> str: + return f"{user_id}:{agent_id}:{team_id}:{channel_id}:{limit}" async def get_context( self, @@ -30,26 +40,21 @@ class MemoryClient: ) -> Dict[str, Any]: """ Отримати контекст пам'яті для діалогу - - Повертає: - { - "facts": [...], # user_facts - "recent_events": [...], # останні agent_memory_events - "dialog_summaries": [...] # підсумки діалогів - } """ + cache_key = self._cache_key(user_id, agent_id, team_id, channel_id, limit) + cached = self._context_cache.get(cache_key) + now = time.monotonic() + if cached and now - cached[0] < CONTEXT_CACHE_TTL: + return cached[1] + try: async with httpx.AsyncClient(timeout=self.timeout) as client: - # Отримуємо user facts - facts_response = await client.get( + facts_request = client.get( f"{self.base_url}/facts", params={"user_id": user_id, "team_id": team_id, "limit": limit}, - headers={"Authorization": f"Bearer {user_id}"} # Заглушка + headers={"Authorization": f"Bearer {user_id}"} ) - facts = facts_response.json() if facts_response.status_code == 200 else [] - - # Отримуємо останні memory events - events_response = await client.get( + events_request = client.get( f"{self.base_url}/agents/{agent_id}/memory", params={ "team_id": team_id, @@ -60,10 +65,7 @@ class MemoryClient: }, headers={"Authorization": f"Bearer {user_id}"} ) - events = events_response.json().get("items", []) if events_response.status_code == 200 else [] - - # Отримуємо dialog summaries - summaries_response = await client.get( + summaries_request = client.get( f"{self.base_url}/summaries", params={ "team_id": team_id, @@ -73,13 +75,30 @@ class MemoryClient: }, headers={"Authorization": f"Bearer {user_id}"} ) - summaries = summaries_response.json().get("items", []) if summaries_response.status_code == 200 else [] - return { + facts_response, events_response, summaries_response = await asyncio.gather( + facts_request, events_request, summaries_request, return_exceptions=True + ) + + facts = facts_response.json() if isinstance(facts_response, httpx.Response) and facts_response.status_code == 200 else [] + events = ( + events_response.json().get("items", []) + if isinstance(events_response, httpx.Response) and events_response.status_code == 200 + else [] + ) + summaries = ( + summaries_response.json().get("items", []) + if isinstance(summaries_response, httpx.Response) and summaries_response.status_code == 200 + else [] + ) + + result = { "facts": facts, "recent_events": events, "dialog_summaries": summaries } + self._context_cache[cache_key] = (now, result) + return result except Exception as e: logger.warning(f"Memory context fetch failed: {e}") return { @@ -96,7 +115,9 @@ class MemoryClient: message: str, response: str, channel_id: Optional[str] = None, - scope: str = "short_term" + scope: str = "short_term", + save_agent_response: bool = True, + agent_metadata: Optional[Dict[str, Any]] = None ) -> bool: """ Зберегти один turn діалогу (повідомлення + відповідь) @@ -122,22 +143,27 @@ class MemoryClient: ) # Зберігаємо відповідь агента - agent_event = { - "agent_id": agent_id, - "team_id": team_id, - "channel_id": channel_id, - "user_id": user_id, - "scope": scope, - "kind": "message", - "body_text": response, - "body_json": {"type": "agent_response", "source": "telegram"} - } - - await client.post( - f"{self.base_url}/agents/{agent_id}/memory", - json=agent_event, - headers={"Authorization": f"Bearer {user_id}"} - ) + if save_agent_response and response: + agent_event = { + "agent_id": agent_id, + "team_id": team_id, + "channel_id": channel_id, + "user_id": user_id, + "scope": scope, + "kind": "message", + "body_text": response, + "body_json": { + "type": "agent_response", + "source": "telegram", + **(agent_metadata or {}) + } + } + + await client.post( + f"{self.base_url}/agents/{agent_id}/memory", + json=agent_event, + headers={"Authorization": f"Bearer {user_id}"} + ) return True except Exception as e: