From e6c083a00064ebe3d3ab817b618c0dc44357c65e Mon Sep 17 00:00:00 2001 From: Apple Date: Fri, 20 Feb 2026 14:16:07 -0800 Subject: [PATCH] gateway: enforce source-lock, pii guard, style profile, and intent retry --- gateway-bot/http_api.py | 652 +++++++++++++++++++++++++++++++++++----- 1 file changed, 581 insertions(+), 71 deletions(-) diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index aed587dd..acb99d1c 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -4,6 +4,7 @@ Handles incoming webhooks from Telegram, Discord, etc. """ import asyncio import base64 +import copy import json import re import logging @@ -61,6 +62,10 @@ PENDING_STATE_TTL = 1800 # 30 minutes USER_LANGUAGE_PREFS: Dict[str, Dict[str, Any]] = {} USER_LANGUAGE_PREF_TTL = 30 * 24 * 3600 # 30 days +# Per-user response style cache (agent:chat:user -> {style, ts}) +USER_RESPONSE_STYLE_PREFS: Dict[str, Dict[str, Any]] = {} +USER_RESPONSE_STYLE_PREF_TTL = 30 * 24 * 3600 # 30 days + # Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts}) RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {} RECENT_PHOTO_TTL = 30 * 60 # 30 minutes @@ -88,16 +93,120 @@ def _get_recent_photo_file_id(agent_id: str, chat_id: str, user_id: str) -> Opti return rec.get("file_id") +def _extract_recent_photo_file_id_from_memory(memory_context: Dict[str, Any]) -> Optional[str]: + """ + Extract last seen Telegram photo file_id from memory context. + Looks for patterns like: [Photo: ] + """ + if not memory_context: + return None + + pattern = re.compile(r"\[Photo:\s*([^\]\s]+)\]") + + recent_events = memory_context.get("recent_events", []) or [] + for ev in reversed(recent_events): + body = (ev.get("body_text") or "").strip() + if not body: + continue + m = pattern.search(body) + if m: + return m.group(1) + + local_text = memory_context.get("local_context_text") or "" + for line in reversed(local_text.splitlines()): + m = pattern.search(line) + if m: + return m.group(1) + return None + + def _looks_like_photo_followup(text: str) -> bool: if not text: return False t = text.strip().lower() - markers = [ + direct_markers = [ "що ти бачиш", "що на фото", "що на зображенні", "опиши фото", "подивись фото", + "що на цьому фото", "що на цій фотографії", "що на цій світлині", + "проаналізуй фото", "аналіз фото", "переглянь фото", "повернись до фото", + "яка це рослина", "що це за рослина", "що за рослина", "що за культура", + "яка культура", "визнач рослину", "what do you see", "what is in the image", "describe the photo", + "analyze the photo", "analyze image", "what plant is this", "что ты видишь", "что на фото", "опиши фото", "посмотри фото", + "проанализируй фото", "какое это растение", "что за растение", ] - return any(m in t for m in markers) + if any(m in t for m in direct_markers): + return True + + # If user is correcting previous visual interpretation, route to vision again. + correction_markers = [ + "неправильна відповідь", "не правильна відповідь", "не видумуй", "це не так", + "ти помилився", "ти помилилась", "неправильно визначив", + "wrong answer", "you are wrong", "that is incorrect", + "неправильный ответ", "это не так", "ты ошибся", + ] + photo_topic_markers = ["фото", "зображ", "рослин", "image", "photo", "plant", "растен"] + if any(c in t for c in correction_markers) and any(p in t for p in photo_topic_markers): + return True + + # Flexible forms: "що на ... фото/зображенні/світлині" + if re.search(r"(що|what|что)\s+на\s+.*(фото|зображ|світлин|image|photo)", t): + # Exclude common meta-questions + meta_exclude = ["канал", "чат", "бот", "нормально"] + if not any(ex in t for ex in meta_exclude): + return True + return False + + +def _extract_unanswered_user_messages( + memory_context: Dict[str, Any], + current_user_id: str, + max_items: int = 3, +) -> List[str]: + """ + Extract unresolved user questions from structured memory events. + A user message is considered unresolved if no later agent reply exists. + """ + events = memory_context.get("recent_events") or [] + if not isinstance(events, list) or not current_user_id: + return [] + + pending: List[str] = [] + for ev in events: + role = str(ev.get("role") or ev.get("type") or "").lower() + text = str(ev.get("body_text") or "").strip() + if not text: + continue + if role == "user" and str(ev.get("user_id") or "") == current_user_id: + pending.append(text) + continue + if role in ("assistant", "agent") and pending: + # Assume latest agent reply resolved the oldest pending user question. + pending.pop(0) + + # Keep the latest unresolved items only. + if len(pending) > max_items: + pending = pending[-max_items:] + return pending + + +def _is_question_like(text: str) -> bool: + """ + Detect user questions without false positives from substring matches + (e.g. 'схоже' should not match 'що'). + """ + if not text: + return False + t = text.strip().lower() + if "?" in t: + return True + # Ukrainian / Russian / English interrogatives with word boundaries. + return bool( + re.search( + r"\b(що|як|чому|коли|де|хто|чи|what|why|how|when|where|who|зачем|почему|когда|где|кто|ли)\b", + t, + ) + ) def _cleanup_user_language_prefs() -> None: @@ -107,6 +216,13 @@ def _cleanup_user_language_prefs() -> None: del USER_LANGUAGE_PREFS[k] +def _cleanup_user_response_style_prefs() -> None: + now = time.time() + expired = [k for k, v in USER_RESPONSE_STYLE_PREFS.items() if now - float(v.get("ts", 0)) > USER_RESPONSE_STYLE_PREF_TTL] + for k in expired: + del USER_RESPONSE_STYLE_PREFS[k] + + def _normalize_lang_code(raw: Optional[str]) -> Optional[str]: if not raw: return None @@ -855,6 +971,242 @@ def should_force_concise_reply(text: str) -> bool: return True +def _detect_response_style_signal(text: str) -> Optional[str]: + t = (text or "").strip().lower() + if not t: + return None + concise_markers = ["коротко", "коротка відповідь", "лаконічно", "brief", "short answer"] + detailed_markers = ["детально", "розгорнуто", "поясни детальніше", "deep dive", "step by step"] + if any(m in t for m in detailed_markers): + return "detailed" + if any(m in t for m in concise_markers): + return "concise" + return None + + +async def resolve_response_style_preference( + agent_id: str, + chat_id: str, + user_id: str, + text: str, + team_id: Optional[str], +) -> str: + _cleanup_user_response_style_prefs() + cache_key = f"{agent_id}:{chat_id}:{user_id}" + signal = _detect_response_style_signal(text) + now = time.time() + if signal in ("concise", "detailed"): + USER_RESPONSE_STYLE_PREFS[cache_key] = {"style": signal, "ts": now} + await memory_client.upsert_fact( + user_id=f"tg:{user_id}", + fact_key=f"communication_profile:{agent_id}", + fact_value_json={"response_style": signal, "updated_at": datetime.utcnow().isoformat()}, + team_id=team_id, + ) + return signal + + cached = USER_RESPONSE_STYLE_PREFS.get(cache_key) + if cached: + return str(cached.get("style") or "concise") + + fact = await memory_client.get_fact( + user_id=f"tg:{user_id}", + fact_key=f"communication_profile:{agent_id}", + team_id=team_id, + ) + if fact: + data = fact.get("fact_value_json") if isinstance(fact, dict) else None + if isinstance(data, str): + try: + data = json.loads(data) + except Exception: + data = None + if isinstance(data, dict): + style = str(data.get("response_style") or "").lower() + if style in ("concise", "detailed"): + USER_RESPONSE_STYLE_PREFS[cache_key] = {"style": style, "ts": now} + return style + return "concise" + + +def _redact_private_mentions(text: str) -> str: + if not text: + return "" + sanitized = text + sanitized = re.sub(r"(?i)(email|e-mail)\s*[:\-]?\s*[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", r"\1: [redacted]", sanitized) + sanitized = re.sub(r"(?i)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", "[redacted-email]", sanitized) + sanitized = re.sub(r"(? bool: + t = (user_text or "").lower() + if not t: + return False + markers = [ + "приватн", "контакт ментор", "телефон ментор", "email ментор", + "всі контакти", "скинь контакти", "private data", "mentor contacts", + ] + return any(m in t for m in markers) + + +def _block_private_profile_dump(user_text: str) -> Optional[str]: + if not _is_private_profile_dump_request(user_text): + return None + return ( + "Не можу надавати приватні дані людей (контакти, особисті профілі). " + "Можу дати лише публічну, узагальнену інформацію." + ) + + +def _is_numeric_question(text: str) -> bool: + t = (text or "").lower() + if not t: + return False + markers = ["скільки", "сума", "витрат", "добрив", "грн", "кг", "вартість", "cost", "amount", "total", "spent"] + return any(m in t for m in markers) + + +def _has_numeric_answer_contract(answer_text: str) -> bool: + a = (answer_text or "").lower() + if not a: + return False + has_value = bool(re.search(r"\d", a)) + has_unit = any(u in a for u in ("грн", "uah", "usd", "eur", "кг", "kg", "%")) + has_source = any(s in a for s in ("рядок", "лист", "sheet", "row", "джерело")) + return has_value and has_unit and has_source + + +def _answer_seems_off_intent(user_text: str, answer_text: str) -> bool: + u = (user_text or "").lower() + a = (answer_text or "").lower() + if not u or not a: + return False + + if _is_numeric_question(u) and not _has_numeric_answer_contract(answer_text): + return True + + if any(k in u for k in ("excel", "xlsx", "таблиц", "файл", "звіт")) and any( + k in a for k in ("вступ", "структура", "презентаці", "слайд") + ): + return True + return False + + +def _sanitize_agent_answer(agent_id: str, user_text: str, answer_text: str) -> str: + blocked = _block_private_profile_dump(user_text) + if blocked: + return blocked + sanitized = _redact_private_mentions(answer_text or "") + return sanitized + + +def _strip_answer_markup_noise(answer_text: str) -> str: + if not answer_text: + return "" + cleaned = answer_text.strip() + cleaned = re.sub(r"^\s*\*{1,3}\s*коротка відповідь\s*:?\s*\*{0,3}\s*", "", cleaned, flags=re.IGNORECASE) + cleaned = re.sub(r"^\s*\*{1,3}\s*відповідь\s*:?\s*\*{0,3}\s*", "", cleaned, flags=re.IGNORECASE) + cleaned = re.sub(r"^\s*#+\s*", "", cleaned) + # Remove markdown emphasis noise that leaks into short answers + cleaned = cleaned.replace("**", "") + cleaned = cleaned.replace("__", "") + return cleaned.strip() + + +def _compress_bulleted_answer(answer_text: str, max_items: int = 3) -> str: + if not answer_text: + return "" + lines = [ln.strip() for ln in answer_text.splitlines() if ln.strip()] + bullet_lines: List[str] = [] + for ln in lines: + normalized = ln.replace("**", "").replace("__", "").strip() + if re.match(r"^(\*?\s*[-*•]|\*?\s*\d+[\.\):])\s*", normalized): + item = re.sub(r"^(\*?\s*[-*•]|\*?\s*\d+[\.\):])\s*", "", normalized).strip() + item = re.sub(r"\s+", " ", item).strip(" -–—") + item = re.sub(r"\.{2,}", ".", item) + item = re.sub(r"\s+\.", ".", item) + # Keep concise mode truly short: first complete sentence from each bullet. + parts = re.split(r"(?<=[.!?…])\s+", item) + if parts: + item = parts[0].strip() + item = item.rstrip(":").strip() + if item: + bullet_lines.append(item) + if not bullet_lines: + return answer_text.strip() + picked = bullet_lines[:max_items] + joined = ". ".join(picked) + if joined and not joined.endswith((".", "!", "?")): + joined += "." + joined = re.sub(r"\s+", " ", joined).strip() + return joined or answer_text.strip() + + +def _limit_to_sentences(text: str, max_sentences: int = 3) -> str: + if not text: + return "" + parts = re.split(r"(?<=[.!?…])\s+", text.strip()) + parts = [p.strip() for p in parts if p.strip()] + if len(parts) <= max_sentences: + return " ".join(parts).strip() + return " ".join(parts[:max_sentences]).strip() + + +def _agromatrix_rewrite_capability_limitations(user_text: str, answer_text: str) -> str: + if not answer_text: + return answer_text + low = answer_text.lower() + limitation_markers = ( + "не можу бачити", "не можу переглядати зображення", "не маю доступу до зображень", + "працюю лише з текстом", "працюю виключно з текстом", + "cannot view images", "cannot analyze images", "as a text model", + ) + if not any(m in low for m in limitation_markers): + return answer_text + + ulow = (user_text or "").lower() + photo_markers = ("фото", "зображ", "image", "photo", "картин", "світлин") + if any(m in ulow for m in photo_markers): + return ( + "Можу аналізувати фото. Надішли, будь ласка, зображення ще раз одним повідомленням " + "з коротким питанням, і я дам точний розбір." + ) + + return ( + "Можу працювати природною мовою та з мультимодальністю: фото, голос і документи. " + "Сформулюй запит коротко, і я відповім по суті." + ) + + +def postprocess_agent_answer( + agent_id: str, + user_text: str, + answer_text: str, + force_detailed: bool, + needs_complex_reasoning: bool, +) -> str: + if not answer_text: + return answer_text + + if (agent_id or "").lower() != "agromatrix": + return answer_text + + # Keep detailed/complex answers intact. + if force_detailed or needs_complex_reasoning: + return answer_text + + user_text_len = len((user_text or "").strip()) + if user_text_len > 280: + return _agromatrix_rewrite_capability_limitations(user_text, answer_text) + + cleaned = _strip_answer_markup_noise(answer_text) + cleaned = _agromatrix_rewrite_capability_limitations(user_text, cleaned) + compact = _compress_bulleted_answer(cleaned, max_items=1) + short = _limit_to_sentences(compact, max_sentences=3) + return short or answer_text + + COMPLEX_REASONING_KEYWORDS = [ "стратег", "roadmap", "алгоритм", "architecture", "архітектур", "прогноз", "scenario", "модель", "аналіз", "побудуй", "plan", "дослідж", @@ -2148,12 +2500,45 @@ async def handle_telegram_webhook( text = update.message.get("text", "") caption = update.message.get("caption", "") - # If user asks about a recently sent photo, run vision on cached photo file_id. - if text and _looks_like_photo_followup(text): + # Photo/image intent guard: + # if text references a photo/image, try to resolve latest file_id and route to vision. + photo_intent = False + if text: + tl = text.lower() + photo_intent = _looks_like_photo_followup(text) or any( + k in tl for k in ("фото", "зображ", "світлин", "image", "photo") + ) + if not photo_intent: + # Robust fallback for common formulations like "що на цьому фото?" + photo_intent = bool( + re.search(r"(що|what|что).{0,24}(цьому|этом|this).{0,24}(фото|зображ|світлин|image|photo)", tl) + ) + + if photo_intent: recent_file_id = _get_recent_photo_file_id(agent_config.agent_id, chat_id, user_id) + + # Fallback: recover latest photo file_id from memory-service context (survives process restarts). + if not recent_file_id: + try: + mc = await memory_client.get_context( + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + team_id=dao_id, + channel_id=chat_id, + limit=80, + ) + recent_file_id = _extract_recent_photo_file_id_from_memory(mc) + if recent_file_id: + _set_recent_photo_context(agent_config.agent_id, chat_id, user_id, recent_file_id) + logger.info( + f"{agent_config.name}: Recovered photo file_id from memory context for follow-up: {recent_file_id}" + ) + except Exception as e: + logger.warning(f"{agent_config.name}: failed to recover photo file_id from memory: {e}") + if recent_file_id: logger.info( - f"{agent_config.name}: Detected follow-up photo question; using cached file_id={recent_file_id}" + f"{agent_config.name}: Photo intent detected; using file_id={recent_file_id}" ) followup_result = await process_photo( agent_config=agent_config, @@ -2167,6 +2552,16 @@ async def handle_telegram_webhook( bypass_media_gate=True, ) return followup_result + + # Hard guard: don't send photo-related requests to text LLM path when image context is missing. + is_question_like = ("?" in text) or any(k in tl for k in ("що", "опиши", "проанал", "what", "describe", "analy", "что")) + if is_question_like: + await send_telegram_message( + chat_id, + "Бачу питання про фото, але не знайшов зображення в історії сесії. Надішли фото ще раз з коротким питанням, і я одразу проаналізую.", + telegram_token, + ) + return {"ok": True, "handled": True, "reason": "photo_followup_without_image_context"} if not text and not caption: # Check for unsupported message types and silently ignore @@ -2218,32 +2613,52 @@ async def handle_telegram_webhook( # If there's a doc_id and the message looks like a question about the document if doc_context and doc_context.doc_id: - # Check if it's a question (simple heuristic: contains question words or ends with ?) - is_question = ( - "?" in text or - any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"]) - ) + is_question = _is_question_like(text) if is_question: logger.info(f"{agent_config.name}: Follow-up question detected for doc_id={doc_context.doc_id}") # Try RAG query first - rag_result = await ask_about_document( - session_id=session_id, - question=text, - doc_id=doc_context.doc_id, - dao_id=dao_id or doc_context.dao_id, - user_id=f"tg:{user_id}" - ) + try: + rag_result = await asyncio.wait_for( + ask_about_document( + session_id=session_id, + question=text, + doc_id=doc_context.doc_id, + dao_id=dao_id or doc_context.dao_id, + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + ), + timeout=25.0, + ) + except asyncio.TimeoutError: + logger.warning( + f"{agent_config.name}: doc follow-up timeout for doc_id={doc_context.doc_id}; " + "fallback to regular chat path" + ) + rag_result = None - if rag_result.success and rag_result.answer: + if rag_result and rag_result.success and rag_result.answer: # Truncate if too long for Telegram - answer = rag_result.answer + answer = postprocess_agent_answer( + agent_id=agent_config.agent_id, + user_text=text or "", + answer_text=rag_result.answer, + force_detailed=should_force_detailed_reply(text), + needs_complex_reasoning=requires_complex_reasoning(text), + ) if len(answer) > TELEGRAM_SAFE_LENGTH: answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_" + answer = _sanitize_agent_answer(agent_config.agent_id, text or "", answer) await send_telegram_message(chat_id, answer, telegram_token) return {"ok": True, "agent": "parser", "mode": "rag_query"} - # Fall through to regular chat if RAG query fails + # Source-lock: with active document context answer only from that document. + await send_telegram_message( + chat_id, + "Не знайшов точну відповідь у поточному документі. Уточни питання або надішли файл повторно.", + telegram_token, + ) + return {"ok": True, "agent": "parser", "mode": "source_lock_no_answer"} # ======================================== # BEHAVIOR POLICY v2.1: Check if should respond @@ -2296,6 +2711,19 @@ async def handle_telegram_webhook( ) respond_decision = sowa_decision.should_respond respond_reason = sowa_decision.reason + + # AgroMatrix usability guard: + # In dedicated chats users often ask short operational questions without explicit mention. + # Do not silence clear question turns for agromatrix. + if ( + sowa_decision.action == "SILENT" + and agent_config.agent_id == "agromatrix" + and _is_question_like(text) + ): + sowa_decision.action = "FULL" + sowa_decision.should_respond = True + respond_decision = True + respond_reason = "agromatrix_question_guard" if sowa_decision.action == "SILENT": logger.info(f"\U0001f507 SOWA: Agent {agent_config.agent_id} NOT responding. Reason: {respond_reason}") @@ -2398,11 +2826,30 @@ async def handle_telegram_webhook( training_prefix = "[РЕЖИМ НАВЧАННЯ - відповідай на це повідомлення, ти в навчальній групі Agent Preschool]\n\n" logger.info(f"🎓 Training mode activated for chat {chat_id}") + unresolved_questions = _extract_unanswered_user_messages( + memory_context=memory_context, + current_user_id=f"tg:{user_id}", + max_items=3, + ) + unresolved_block = "" + if unresolved_questions: + # Do not duplicate current prompt if it matches one pending message. + filtered = [q for q in unresolved_questions if q.strip() != (text or "").strip()] + if filtered: + unresolved_block = "[Невідповідані питання цього користувача]\n" + "\n".join( + f"- {q}" for q in filtered + ) + "\n\n" + if local_history: # Add conversation history to message for better context understanding - message_with_context = f"{training_prefix}[Контекст розмови]\n{local_history}\n\n[Поточне повідомлення від {username}]\n{text}" + message_with_context = ( + f"{training_prefix}" + f"[Контекст розмови]\n{local_history}\n\n" + f"{unresolved_block}" + f"[Поточне повідомлення від {username}]\n{text}" + ) else: - message_with_context = f"{training_prefix}{text}" + message_with_context = f"{training_prefix}{unresolved_block}{text}" preferred_lang = await resolve_preferred_language_persistent( chat_id=chat_id, @@ -2412,6 +2859,15 @@ async def handle_telegram_webhook( team_id=dao_id, ) preferred_lang_label = preferred_language_label(preferred_lang) + response_style_pref = await resolve_response_style_preference( + agent_id=agent_config.agent_id, + chat_id=chat_id, + user_id=str(user_id), + text=text or "", + team_id=dao_id, + ) + force_detailed = should_force_detailed_reply(text) or response_style_pref == "detailed" + force_concise = (not force_detailed) and (should_force_concise_reply(text) or response_style_pref == "concise") # Build request to Router system_prompt = agent_config.system_prompt @@ -2432,6 +2888,7 @@ async def handle_telegram_webhook( "session_id": f"tg:{chat_id}:{dao_id}", "username": username, "chat_id": chat_id, + "raw_user_text": text, "sender_is_bot": is_sender_bot, "mentioned_bots": mentioned_bots, "requires_complex_reasoning": needs_complex_reasoning, @@ -2439,6 +2896,7 @@ async def handle_telegram_webhook( "is_training_group": is_training_group, "preferred_response_language": preferred_lang, "preferred_response_language_label": preferred_lang_label, + "response_style_preference": response_style_pref, }, "context": { "agent_name": agent_config.name, @@ -2451,10 +2909,10 @@ async def handle_telegram_webhook( }, } - if should_force_detailed_reply(text): + if force_detailed: router_request["metadata"]["force_detailed"] = True - if should_force_concise_reply(text): + if force_concise: # IMPORTANT: preserve conversation context! Only append concise instruction router_request["metadata"]["force_concise"] = True router_request["message"] = ( @@ -2473,7 +2931,7 @@ async def handle_telegram_webhook( router_request["metadata"]["provider"] = "cloud_deepseek" router_request["metadata"]["reason"] = "auto_complex" - if not should_force_concise_reply(text): + if not force_concise: router_request["message"] = ( router_request["message"] + f"\n\n(Мова відповіді: {preferred_lang_label}.)" @@ -2550,10 +3008,40 @@ async def handle_telegram_webhook( username=username, ) return {"ok": True, "skipped": True, "reason": "no_output_from_llm"} + + # Retry policy: if response drifts from current intent, do one strict reroute. + if _answer_seems_off_intent(text or "", answer_text): + try: + strict_request = copy.deepcopy(router_request) + strict_request["metadata"]["intent_retry"] = 1 + strict_request["metadata"]["disable_tools"] = True + strict_request["metadata"]["max_tool_rounds"] = 1 + strict_request["metadata"]["temperature"] = 0.1 + strict_request["message"] = ( + f"{message_with_context}\n\n" + "(Жорстка інструкція: відповідай тільки на ПОТОЧНЕ питання користувача. " + "Не змінюй тему, не генеруй презентацію/план, якщо цього не просили. " + "Для числових питань: дай value + unit + джерело (лист/рядок).)" + ) + retry_response = await send_to_router(strict_request) + if isinstance(retry_response, dict) and retry_response.get("ok"): + retry_text = retry_response.get("data", {}).get("text") or retry_response.get("response", "") + if retry_text and not is_no_output_response(retry_text): + answer_text = retry_text + router_request = strict_request + logger.info("Intent retry succeeded with strict prompt") + except Exception as retry_err: + logger.warning(f"Intent retry failed: {retry_err}") - # Truncate if too long for Telegram - if len(answer_text) > TELEGRAM_SAFE_LENGTH: - answer_text = answer_text[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_" + force_detailed_reply = bool(router_request.get("metadata", {}).get("force_detailed")) + answer_text = postprocess_agent_answer( + agent_id=agent_config.agent_id, + user_text=text or "", + answer_text=answer_text, + force_detailed=force_detailed_reply, + needs_complex_reasoning=needs_complex_reasoning, + ) + answer_text = _sanitize_agent_answer(agent_config.agent_id, text or "", answer_text) # Skip Telegram sending for prober requests (chat_id=0) if is_prober: @@ -2591,7 +3079,9 @@ async def handle_telegram_webhook( async with httpx.AsyncClient() as client: files = {"photo": ("image.png", BytesIO(image_bytes), "image/png")} - data = {"chat_id": chat_id, "caption": answer_text} + # Telegram caption limit is 1024 chars. + safe_caption = (answer_text or "")[:1024] + data = {"chat_id": chat_id, "caption": safe_caption} response_photo = await client.post(url, files=files, data=data, timeout=30.0) response_photo.raise_for_status() logger.info(f"✅ Sent generated image to Telegram chat {chat_id}") @@ -2995,26 +3485,39 @@ async def _old_telegram_webhook(update: TelegramUpdate): # If there's a doc_id and the message looks like a question about the document if doc_context and doc_context.doc_id: - # Check if it's a question (simple heuristic: contains question words or ends with ?) - is_question = ( - "?" in text or - any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"]) - ) + is_question = _is_question_like(text) if is_question: logger.info(f"Follow-up question detected for doc_id={doc_context.doc_id}") # Try RAG query first - rag_result = await ask_about_document( - session_id=session_id, - question=text, - doc_id=doc_context.doc_id, - dao_id=dao_id or doc_context.dao_id, - user_id=f"tg:{user_id}" - ) + try: + rag_result = await asyncio.wait_for( + ask_about_document( + session_id=session_id, + question=text, + doc_id=doc_context.doc_id, + dao_id=dao_id or doc_context.dao_id, + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + ), + timeout=25.0, + ) + except asyncio.TimeoutError: + logger.warning( + f"Doc follow-up timeout for agent={agent_config.agent_id}, " + f"doc_id={doc_context.doc_id}; fallback to regular chat path" + ) + rag_result = None - if rag_result.success and rag_result.answer: + if rag_result and rag_result.success and rag_result.answer: # Truncate if too long for Telegram - answer = rag_result.answer + answer = postprocess_agent_answer( + agent_id=agent_config.agent_id, + user_text=text or "", + answer_text=rag_result.answer, + force_detailed=should_force_detailed_reply(text), + needs_complex_reasoning=requires_complex_reasoning(text), + ) if len(answer) > TELEGRAM_SAFE_LENGTH: answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_" @@ -3532,44 +4035,51 @@ async def send_telegram_message(chat_id: str, text: str, bot_token: Optional[str return False # Defensive cleanup for occasional reasoning/markup leaks. - import re safe_text = re.sub(r'.*?', '', text or "", flags=re.DOTALL) safe_text = re.sub(r'.*$', '', safe_text, flags=re.DOTALL) safe_text = safe_text.strip() or "..." token_id = telegram_token.split(":", 1)[0] if ":" in telegram_token else "unknown" url = f"https://api.telegram.org/bot{telegram_token}/sendMessage" - payload = { - "chat_id": str(chat_id), - "text": safe_text, - "disable_web_page_preview": True, - } - try: - async with httpx.AsyncClient() as client: - response = await client.post(url, json=payload, timeout=15.0) + async def _send_chunk(chunk: str) -> bool: + payload = { + "chat_id": str(chat_id), + "text": chunk, + "disable_web_page_preview": True, + } + try: + async with httpx.AsyncClient() as client: + response = await client.post(url, json=payload, timeout=15.0) - if response.status_code >= 400: - err_desc = response.text[:300] - try: - body = response.json() - err_desc = body.get("description") or err_desc - except Exception: - pass - logger.error( - "Telegram sendMessage failed: bot_id=%s chat_id=%s status=%s desc=%s", - token_id, - chat_id, - response.status_code, - err_desc, - ) + if response.status_code >= 400: + err_desc = response.text[:300] + try: + body = response.json() + err_desc = body.get("description") or err_desc + except Exception: + pass + logger.error( + "Telegram sendMessage failed: bot_id=%s chat_id=%s status=%s desc=%s", + token_id, + chat_id, + response.status_code, + err_desc, + ) + return False + return True + except Exception as e: + logger.error("Telegram sendMessage exception: bot_id=%s chat_id=%s error=%s", token_id, chat_id, e) return False - logger.info("Telegram message sent: bot_id=%s chat_id=%s", token_id, chat_id) - return True - except Exception as e: - logger.error("Telegram sendMessage exception: bot_id=%s chat_id=%s error=%s", token_id, chat_id, e) - return False + all_ok = True + chunks = _chunk_text(safe_text, max_len=TELEGRAM_MAX_MESSAGE_LENGTH) + for chunk in chunks: + sent = await _send_chunk(chunk) + all_ok = all_ok and sent + if all_ok: + logger.info("Telegram message sent: bot_id=%s chat_id=%s chunks=%s", token_id, chat_id, len(chunks)) + return all_ok # ========================================