gateway: enforce source-lock, pii guard, style profile, and intent retry

This commit is contained in:
Apple
2026-02-20 14:16:07 -08:00
parent 195eb9b7ac
commit e6c083a000

View File

@@ -4,6 +4,7 @@ Handles incoming webhooks from Telegram, Discord, etc.
"""
import asyncio
import base64
import copy
import json
import re
import logging
@@ -61,6 +62,10 @@ PENDING_STATE_TTL = 1800 # 30 minutes
USER_LANGUAGE_PREFS: Dict[str, Dict[str, Any]] = {}
USER_LANGUAGE_PREF_TTL = 30 * 24 * 3600 # 30 days
# Per-user response style cache (agent:chat:user -> {style, ts})
USER_RESPONSE_STYLE_PREFS: Dict[str, Dict[str, Any]] = {}
USER_RESPONSE_STYLE_PREF_TTL = 30 * 24 * 3600 # 30 days
# Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts})
RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {}
RECENT_PHOTO_TTL = 30 * 60 # 30 minutes
@@ -88,16 +93,120 @@ def _get_recent_photo_file_id(agent_id: str, chat_id: str, user_id: str) -> Opti
return rec.get("file_id")
def _extract_recent_photo_file_id_from_memory(memory_context: Dict[str, Any]) -> Optional[str]:
"""
Extract last seen Telegram photo file_id from memory context.
Looks for patterns like: [Photo: <file_id>]
"""
if not memory_context:
return None
pattern = re.compile(r"\[Photo:\s*([^\]\s]+)\]")
recent_events = memory_context.get("recent_events", []) or []
for ev in reversed(recent_events):
body = (ev.get("body_text") or "").strip()
if not body:
continue
m = pattern.search(body)
if m:
return m.group(1)
local_text = memory_context.get("local_context_text") or ""
for line in reversed(local_text.splitlines()):
m = pattern.search(line)
if m:
return m.group(1)
return None
def _looks_like_photo_followup(text: str) -> bool:
if not text:
return False
t = text.strip().lower()
markers = [
direct_markers = [
"що ти бачиш", "що на фото", "що на зображенні", "опиши фото", "подивись фото",
"що на цьому фото", "що на цій фотографії", "що на цій світлині",
"проаналізуй фото", "аналіз фото", "переглянь фото", "повернись до фото",
"яка це рослина", "що це за рослина", "що за рослина", "що за культура",
"яка культура", "визнач рослину",
"what do you see", "what is in the image", "describe the photo",
"analyze the photo", "analyze image", "what plant is this",
"что ты видишь", "что на фото", "опиши фото", "посмотри фото",
"проанализируй фото", "какое это растение", "что за растение",
]
return any(m in t for m in markers)
if any(m in t for m in direct_markers):
return True
# If user is correcting previous visual interpretation, route to vision again.
correction_markers = [
"неправильна відповідь", "не правильна відповідь", "не видумуй", "це не так",
"ти помилився", "ти помилилась", "неправильно визначив",
"wrong answer", "you are wrong", "that is incorrect",
"неправильный ответ", "это не так", "ты ошибся",
]
photo_topic_markers = ["фото", "зображ", "рослин", "image", "photo", "plant", "растен"]
if any(c in t for c in correction_markers) and any(p in t for p in photo_topic_markers):
return True
# Flexible forms: "що на ... фото/зображенні/світлині"
if re.search(r"(що|what|что)\s+на\s+.*(фото|зображ|світлин|image|photo)", t):
# Exclude common meta-questions
meta_exclude = ["канал", "чат", "бот", "нормально"]
if not any(ex in t for ex in meta_exclude):
return True
return False
def _extract_unanswered_user_messages(
memory_context: Dict[str, Any],
current_user_id: str,
max_items: int = 3,
) -> List[str]:
"""
Extract unresolved user questions from structured memory events.
A user message is considered unresolved if no later agent reply exists.
"""
events = memory_context.get("recent_events") or []
if not isinstance(events, list) or not current_user_id:
return []
pending: List[str] = []
for ev in events:
role = str(ev.get("role") or ev.get("type") or "").lower()
text = str(ev.get("body_text") or "").strip()
if not text:
continue
if role == "user" and str(ev.get("user_id") or "") == current_user_id:
pending.append(text)
continue
if role in ("assistant", "agent") and pending:
# Assume latest agent reply resolved the oldest pending user question.
pending.pop(0)
# Keep the latest unresolved items only.
if len(pending) > max_items:
pending = pending[-max_items:]
return pending
def _is_question_like(text: str) -> bool:
"""
Detect user questions without false positives from substring matches
(e.g. 'схоже' should not match 'що').
"""
if not text:
return False
t = text.strip().lower()
if "?" in t:
return True
# Ukrainian / Russian / English interrogatives with word boundaries.
return bool(
re.search(
r"\b(що|як|чому|коли|де|хто|чи|what|why|how|when|where|who|зачем|почему|когда|где|кто|ли)\b",
t,
)
)
def _cleanup_user_language_prefs() -> None:
@@ -107,6 +216,13 @@ def _cleanup_user_language_prefs() -> None:
del USER_LANGUAGE_PREFS[k]
def _cleanup_user_response_style_prefs() -> None:
now = time.time()
expired = [k for k, v in USER_RESPONSE_STYLE_PREFS.items() if now - float(v.get("ts", 0)) > USER_RESPONSE_STYLE_PREF_TTL]
for k in expired:
del USER_RESPONSE_STYLE_PREFS[k]
def _normalize_lang_code(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
@@ -855,6 +971,242 @@ def should_force_concise_reply(text: str) -> bool:
return True
def _detect_response_style_signal(text: str) -> Optional[str]:
t = (text or "").strip().lower()
if not t:
return None
concise_markers = ["коротко", "коротка відповідь", "лаконічно", "brief", "short answer"]
detailed_markers = ["детально", "розгорнуто", "поясни детальніше", "deep dive", "step by step"]
if any(m in t for m in detailed_markers):
return "detailed"
if any(m in t for m in concise_markers):
return "concise"
return None
async def resolve_response_style_preference(
agent_id: str,
chat_id: str,
user_id: str,
text: str,
team_id: Optional[str],
) -> str:
_cleanup_user_response_style_prefs()
cache_key = f"{agent_id}:{chat_id}:{user_id}"
signal = _detect_response_style_signal(text)
now = time.time()
if signal in ("concise", "detailed"):
USER_RESPONSE_STYLE_PREFS[cache_key] = {"style": signal, "ts": now}
await memory_client.upsert_fact(
user_id=f"tg:{user_id}",
fact_key=f"communication_profile:{agent_id}",
fact_value_json={"response_style": signal, "updated_at": datetime.utcnow().isoformat()},
team_id=team_id,
)
return signal
cached = USER_RESPONSE_STYLE_PREFS.get(cache_key)
if cached:
return str(cached.get("style") or "concise")
fact = await memory_client.get_fact(
user_id=f"tg:{user_id}",
fact_key=f"communication_profile:{agent_id}",
team_id=team_id,
)
if fact:
data = fact.get("fact_value_json") if isinstance(fact, dict) else None
if isinstance(data, str):
try:
data = json.loads(data)
except Exception:
data = None
if isinstance(data, dict):
style = str(data.get("response_style") or "").lower()
if style in ("concise", "detailed"):
USER_RESPONSE_STYLE_PREFS[cache_key] = {"style": style, "ts": now}
return style
return "concise"
def _redact_private_mentions(text: str) -> str:
if not text:
return ""
sanitized = text
sanitized = re.sub(r"(?i)(email|e-mail)\s*[:\-]?\s*[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", r"\1: [redacted]", sanitized)
sanitized = re.sub(r"(?i)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", "[redacted-email]", sanitized)
sanitized = re.sub(r"(?<!\d)(\+?\d[\d\s\-\(\)]{8,}\d)(?!\d)", "[redacted-phone]", sanitized)
return sanitized
def _is_private_profile_dump_request(user_text: str) -> bool:
t = (user_text or "").lower()
if not t:
return False
markers = [
"приватн", "контакт ментор", "телефон ментор", "email ментор",
"всі контакти", "скинь контакти", "private data", "mentor contacts",
]
return any(m in t for m in markers)
def _block_private_profile_dump(user_text: str) -> Optional[str]:
if not _is_private_profile_dump_request(user_text):
return None
return (
"Не можу надавати приватні дані людей (контакти, особисті профілі). "
"Можу дати лише публічну, узагальнену інформацію."
)
def _is_numeric_question(text: str) -> bool:
t = (text or "").lower()
if not t:
return False
markers = ["скільки", "сума", "витрат", "добрив", "грн", "кг", "вартість", "cost", "amount", "total", "spent"]
return any(m in t for m in markers)
def _has_numeric_answer_contract(answer_text: str) -> bool:
a = (answer_text or "").lower()
if not a:
return False
has_value = bool(re.search(r"\d", a))
has_unit = any(u in a for u in ("грн", "uah", "usd", "eur", "кг", "kg", "%"))
has_source = any(s in a for s in ("рядок", "лист", "sheet", "row", "джерело"))
return has_value and has_unit and has_source
def _answer_seems_off_intent(user_text: str, answer_text: str) -> bool:
u = (user_text or "").lower()
a = (answer_text or "").lower()
if not u or not a:
return False
if _is_numeric_question(u) and not _has_numeric_answer_contract(answer_text):
return True
if any(k in u for k in ("excel", "xlsx", "таблиц", "файл", "звіт")) and any(
k in a for k in ("вступ", "структура", "презентаці", "слайд")
):
return True
return False
def _sanitize_agent_answer(agent_id: str, user_text: str, answer_text: str) -> str:
blocked = _block_private_profile_dump(user_text)
if blocked:
return blocked
sanitized = _redact_private_mentions(answer_text or "")
return sanitized
def _strip_answer_markup_noise(answer_text: str) -> str:
if not answer_text:
return ""
cleaned = answer_text.strip()
cleaned = re.sub(r"^\s*\*{1,3}\s*коротка відповідь\s*:?\s*\*{0,3}\s*", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"^\s*\*{1,3}\s*відповідь\s*:?\s*\*{0,3}\s*", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"^\s*#+\s*", "", cleaned)
# Remove markdown emphasis noise that leaks into short answers
cleaned = cleaned.replace("**", "")
cleaned = cleaned.replace("__", "")
return cleaned.strip()
def _compress_bulleted_answer(answer_text: str, max_items: int = 3) -> str:
if not answer_text:
return ""
lines = [ln.strip() for ln in answer_text.splitlines() if ln.strip()]
bullet_lines: List[str] = []
for ln in lines:
normalized = ln.replace("**", "").replace("__", "").strip()
if re.match(r"^(\*?\s*[-*•]|\*?\s*\d+[\.\):])\s*", normalized):
item = re.sub(r"^(\*?\s*[-*•]|\*?\s*\d+[\.\):])\s*", "", normalized).strip()
item = re.sub(r"\s+", " ", item).strip(" -–—")
item = re.sub(r"\.{2,}", ".", item)
item = re.sub(r"\s+\.", ".", item)
# Keep concise mode truly short: first complete sentence from each bullet.
parts = re.split(r"(?<=[.!?…])\s+", item)
if parts:
item = parts[0].strip()
item = item.rstrip(":").strip()
if item:
bullet_lines.append(item)
if not bullet_lines:
return answer_text.strip()
picked = bullet_lines[:max_items]
joined = ". ".join(picked)
if joined and not joined.endswith((".", "!", "?")):
joined += "."
joined = re.sub(r"\s+", " ", joined).strip()
return joined or answer_text.strip()
def _limit_to_sentences(text: str, max_sentences: int = 3) -> str:
if not text:
return ""
parts = re.split(r"(?<=[.!?…])\s+", text.strip())
parts = [p.strip() for p in parts if p.strip()]
if len(parts) <= max_sentences:
return " ".join(parts).strip()
return " ".join(parts[:max_sentences]).strip()
def _agromatrix_rewrite_capability_limitations(user_text: str, answer_text: str) -> str:
if not answer_text:
return answer_text
low = answer_text.lower()
limitation_markers = (
"не можу бачити", "не можу переглядати зображення", "не маю доступу до зображень",
"працюю лише з текстом", "працюю виключно з текстом",
"cannot view images", "cannot analyze images", "as a text model",
)
if not any(m in low for m in limitation_markers):
return answer_text
ulow = (user_text or "").lower()
photo_markers = ("фото", "зображ", "image", "photo", "картин", "світлин")
if any(m in ulow for m in photo_markers):
return (
"Можу аналізувати фото. Надішли, будь ласка, зображення ще раз одним повідомленням "
"з коротким питанням, і я дам точний розбір."
)
return (
"Можу працювати природною мовою та з мультимодальністю: фото, голос і документи. "
"Сформулюй запит коротко, і я відповім по суті."
)
def postprocess_agent_answer(
agent_id: str,
user_text: str,
answer_text: str,
force_detailed: bool,
needs_complex_reasoning: bool,
) -> str:
if not answer_text:
return answer_text
if (agent_id or "").lower() != "agromatrix":
return answer_text
# Keep detailed/complex answers intact.
if force_detailed or needs_complex_reasoning:
return answer_text
user_text_len = len((user_text or "").strip())
if user_text_len > 280:
return _agromatrix_rewrite_capability_limitations(user_text, answer_text)
cleaned = _strip_answer_markup_noise(answer_text)
cleaned = _agromatrix_rewrite_capability_limitations(user_text, cleaned)
compact = _compress_bulleted_answer(cleaned, max_items=1)
short = _limit_to_sentences(compact, max_sentences=3)
return short or answer_text
COMPLEX_REASONING_KEYWORDS = [
"стратег", "roadmap", "алгоритм", "architecture", "архітектур",
"прогноз", "scenario", "модель", "аналіз", "побудуй", "plan", "дослідж",
@@ -2148,12 +2500,45 @@ async def handle_telegram_webhook(
text = update.message.get("text", "")
caption = update.message.get("caption", "")
# If user asks about a recently sent photo, run vision on cached photo file_id.
if text and _looks_like_photo_followup(text):
# Photo/image intent guard:
# if text references a photo/image, try to resolve latest file_id and route to vision.
photo_intent = False
if text:
tl = text.lower()
photo_intent = _looks_like_photo_followup(text) or any(
k in tl for k in ("фото", "зображ", "світлин", "image", "photo")
)
if not photo_intent:
# Robust fallback for common formulations like "що на цьому фото?"
photo_intent = bool(
re.search(r"(що|what|что).{0,24}(цьому|этом|this).{0,24}(фото|зображ|світлин|image|photo)", tl)
)
if photo_intent:
recent_file_id = _get_recent_photo_file_id(agent_config.agent_id, chat_id, user_id)
# Fallback: recover latest photo file_id from memory-service context (survives process restarts).
if not recent_file_id:
try:
mc = await memory_client.get_context(
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
team_id=dao_id,
channel_id=chat_id,
limit=80,
)
recent_file_id = _extract_recent_photo_file_id_from_memory(mc)
if recent_file_id:
_set_recent_photo_context(agent_config.agent_id, chat_id, user_id, recent_file_id)
logger.info(
f"{agent_config.name}: Recovered photo file_id from memory context for follow-up: {recent_file_id}"
)
except Exception as e:
logger.warning(f"{agent_config.name}: failed to recover photo file_id from memory: {e}")
if recent_file_id:
logger.info(
f"{agent_config.name}: Detected follow-up photo question; using cached file_id={recent_file_id}"
f"{agent_config.name}: Photo intent detected; using file_id={recent_file_id}"
)
followup_result = await process_photo(
agent_config=agent_config,
@@ -2167,6 +2552,16 @@ async def handle_telegram_webhook(
bypass_media_gate=True,
)
return followup_result
# Hard guard: don't send photo-related requests to text LLM path when image context is missing.
is_question_like = ("?" in text) or any(k in tl for k in ("що", "опиши", "проанал", "what", "describe", "analy", "что"))
if is_question_like:
await send_telegram_message(
chat_id,
"Бачу питання про фото, але не знайшов зображення в історії сесії. Надішли фото ще раз з коротким питанням, і я одразу проаналізую.",
telegram_token,
)
return {"ok": True, "handled": True, "reason": "photo_followup_without_image_context"}
if not text and not caption:
# Check for unsupported message types and silently ignore
@@ -2218,32 +2613,52 @@ async def handle_telegram_webhook(
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
# Check if it's a question (simple heuristic: contains question words or ends with ?)
is_question = (
"?" in text or
any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"])
)
is_question = _is_question_like(text)
if is_question:
logger.info(f"{agent_config.name}: Follow-up question detected for doc_id={doc_context.doc_id}")
# Try RAG query first
rag_result = await ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}"
)
try:
rag_result = await asyncio.wait_for(
ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
),
timeout=25.0,
)
except asyncio.TimeoutError:
logger.warning(
f"{agent_config.name}: doc follow-up timeout for doc_id={doc_context.doc_id}; "
"fallback to regular chat path"
)
rag_result = None
if rag_result.success and rag_result.answer:
if rag_result and rag_result.success and rag_result.answer:
# Truncate if too long for Telegram
answer = rag_result.answer
answer = postprocess_agent_answer(
agent_id=agent_config.agent_id,
user_text=text or "",
answer_text=rag_result.answer,
force_detailed=should_force_detailed_reply(text),
needs_complex_reasoning=requires_complex_reasoning(text),
)
if len(answer) > TELEGRAM_SAFE_LENGTH:
answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_"
answer = _sanitize_agent_answer(agent_config.agent_id, text or "", answer)
await send_telegram_message(chat_id, answer, telegram_token)
return {"ok": True, "agent": "parser", "mode": "rag_query"}
# Fall through to regular chat if RAG query fails
# Source-lock: with active document context answer only from that document.
await send_telegram_message(
chat_id,
"Не знайшов точну відповідь у поточному документі. Уточни питання або надішли файл повторно.",
telegram_token,
)
return {"ok": True, "agent": "parser", "mode": "source_lock_no_answer"}
# ========================================
# BEHAVIOR POLICY v2.1: Check if should respond
@@ -2296,6 +2711,19 @@ async def handle_telegram_webhook(
)
respond_decision = sowa_decision.should_respond
respond_reason = sowa_decision.reason
# AgroMatrix usability guard:
# In dedicated chats users often ask short operational questions without explicit mention.
# Do not silence clear question turns for agromatrix.
if (
sowa_decision.action == "SILENT"
and agent_config.agent_id == "agromatrix"
and _is_question_like(text)
):
sowa_decision.action = "FULL"
sowa_decision.should_respond = True
respond_decision = True
respond_reason = "agromatrix_question_guard"
if sowa_decision.action == "SILENT":
logger.info(f"\U0001f507 SOWA: Agent {agent_config.agent_id} NOT responding. Reason: {respond_reason}")
@@ -2398,11 +2826,30 @@ async def handle_telegram_webhook(
training_prefix = "[РЕЖИМ НАВЧАННЯ - відповідай на це повідомлення, ти в навчальній групі Agent Preschool]\n\n"
logger.info(f"🎓 Training mode activated for chat {chat_id}")
unresolved_questions = _extract_unanswered_user_messages(
memory_context=memory_context,
current_user_id=f"tg:{user_id}",
max_items=3,
)
unresolved_block = ""
if unresolved_questions:
# Do not duplicate current prompt if it matches one pending message.
filtered = [q for q in unresolved_questions if q.strip() != (text or "").strip()]
if filtered:
unresolved_block = "[Невідповідані питання цього користувача]\n" + "\n".join(
f"- {q}" for q in filtered
) + "\n\n"
if local_history:
# Add conversation history to message for better context understanding
message_with_context = f"{training_prefix}[Контекст розмови]\n{local_history}\n\n[Поточне повідомлення від {username}]\n{text}"
message_with_context = (
f"{training_prefix}"
f"[Контекст розмови]\n{local_history}\n\n"
f"{unresolved_block}"
f"[Поточне повідомлення від {username}]\n{text}"
)
else:
message_with_context = f"{training_prefix}{text}"
message_with_context = f"{training_prefix}{unresolved_block}{text}"
preferred_lang = await resolve_preferred_language_persistent(
chat_id=chat_id,
@@ -2412,6 +2859,15 @@ async def handle_telegram_webhook(
team_id=dao_id,
)
preferred_lang_label = preferred_language_label(preferred_lang)
response_style_pref = await resolve_response_style_preference(
agent_id=agent_config.agent_id,
chat_id=chat_id,
user_id=str(user_id),
text=text or "",
team_id=dao_id,
)
force_detailed = should_force_detailed_reply(text) or response_style_pref == "detailed"
force_concise = (not force_detailed) and (should_force_concise_reply(text) or response_style_pref == "concise")
# Build request to Router
system_prompt = agent_config.system_prompt
@@ -2432,6 +2888,7 @@ async def handle_telegram_webhook(
"session_id": f"tg:{chat_id}:{dao_id}",
"username": username,
"chat_id": chat_id,
"raw_user_text": text,
"sender_is_bot": is_sender_bot,
"mentioned_bots": mentioned_bots,
"requires_complex_reasoning": needs_complex_reasoning,
@@ -2439,6 +2896,7 @@ async def handle_telegram_webhook(
"is_training_group": is_training_group,
"preferred_response_language": preferred_lang,
"preferred_response_language_label": preferred_lang_label,
"response_style_preference": response_style_pref,
},
"context": {
"agent_name": agent_config.name,
@@ -2451,10 +2909,10 @@ async def handle_telegram_webhook(
},
}
if should_force_detailed_reply(text):
if force_detailed:
router_request["metadata"]["force_detailed"] = True
if should_force_concise_reply(text):
if force_concise:
# IMPORTANT: preserve conversation context! Only append concise instruction
router_request["metadata"]["force_concise"] = True
router_request["message"] = (
@@ -2473,7 +2931,7 @@ async def handle_telegram_webhook(
router_request["metadata"]["provider"] = "cloud_deepseek"
router_request["metadata"]["reason"] = "auto_complex"
if not should_force_concise_reply(text):
if not force_concise:
router_request["message"] = (
router_request["message"]
+ f"\n\n(Мова відповіді: {preferred_lang_label}.)"
@@ -2550,10 +3008,40 @@ async def handle_telegram_webhook(
username=username,
)
return {"ok": True, "skipped": True, "reason": "no_output_from_llm"}
# Retry policy: if response drifts from current intent, do one strict reroute.
if _answer_seems_off_intent(text or "", answer_text):
try:
strict_request = copy.deepcopy(router_request)
strict_request["metadata"]["intent_retry"] = 1
strict_request["metadata"]["disable_tools"] = True
strict_request["metadata"]["max_tool_rounds"] = 1
strict_request["metadata"]["temperature"] = 0.1
strict_request["message"] = (
f"{message_with_context}\n\n"
"(Жорстка інструкція: відповідай тільки на ПОТОЧНЕ питання користувача. "
"Не змінюй тему, не генеруй презентацію/план, якщо цього не просили. "
"Для числових питань: дай value + unit + джерело (лист/рядок).)"
)
retry_response = await send_to_router(strict_request)
if isinstance(retry_response, dict) and retry_response.get("ok"):
retry_text = retry_response.get("data", {}).get("text") or retry_response.get("response", "")
if retry_text and not is_no_output_response(retry_text):
answer_text = retry_text
router_request = strict_request
logger.info("Intent retry succeeded with strict prompt")
except Exception as retry_err:
logger.warning(f"Intent retry failed: {retry_err}")
# Truncate if too long for Telegram
if len(answer_text) > TELEGRAM_SAFE_LENGTH:
answer_text = answer_text[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_"
force_detailed_reply = bool(router_request.get("metadata", {}).get("force_detailed"))
answer_text = postprocess_agent_answer(
agent_id=agent_config.agent_id,
user_text=text or "",
answer_text=answer_text,
force_detailed=force_detailed_reply,
needs_complex_reasoning=needs_complex_reasoning,
)
answer_text = _sanitize_agent_answer(agent_config.agent_id, text or "", answer_text)
# Skip Telegram sending for prober requests (chat_id=0)
if is_prober:
@@ -2591,7 +3079,9 @@ async def handle_telegram_webhook(
async with httpx.AsyncClient() as client:
files = {"photo": ("image.png", BytesIO(image_bytes), "image/png")}
data = {"chat_id": chat_id, "caption": answer_text}
# Telegram caption limit is 1024 chars.
safe_caption = (answer_text or "")[:1024]
data = {"chat_id": chat_id, "caption": safe_caption}
response_photo = await client.post(url, files=files, data=data, timeout=30.0)
response_photo.raise_for_status()
logger.info(f"✅ Sent generated image to Telegram chat {chat_id}")
@@ -2995,26 +3485,39 @@ async def _old_telegram_webhook(update: TelegramUpdate):
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
# Check if it's a question (simple heuristic: contains question words or ends with ?)
is_question = (
"?" in text or
any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"])
)
is_question = _is_question_like(text)
if is_question:
logger.info(f"Follow-up question detected for doc_id={doc_context.doc_id}")
# Try RAG query first
rag_result = await ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}"
)
try:
rag_result = await asyncio.wait_for(
ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
),
timeout=25.0,
)
except asyncio.TimeoutError:
logger.warning(
f"Doc follow-up timeout for agent={agent_config.agent_id}, "
f"doc_id={doc_context.doc_id}; fallback to regular chat path"
)
rag_result = None
if rag_result.success and rag_result.answer:
if rag_result and rag_result.success and rag_result.answer:
# Truncate if too long for Telegram
answer = rag_result.answer
answer = postprocess_agent_answer(
agent_id=agent_config.agent_id,
user_text=text or "",
answer_text=rag_result.answer,
force_detailed=should_force_detailed_reply(text),
needs_complex_reasoning=requires_complex_reasoning(text),
)
if len(answer) > TELEGRAM_SAFE_LENGTH:
answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_"
@@ -3532,44 +4035,51 @@ async def send_telegram_message(chat_id: str, text: str, bot_token: Optional[str
return False
# Defensive cleanup for occasional reasoning/markup leaks.
import re
safe_text = re.sub(r'<think>.*?</think>', '', text or "", flags=re.DOTALL)
safe_text = re.sub(r'<think>.*$', '', safe_text, flags=re.DOTALL)
safe_text = safe_text.strip() or "..."
token_id = telegram_token.split(":", 1)[0] if ":" in telegram_token else "unknown"
url = f"https://api.telegram.org/bot{telegram_token}/sendMessage"
payload = {
"chat_id": str(chat_id),
"text": safe_text,
"disable_web_page_preview": True,
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=15.0)
async def _send_chunk(chunk: str) -> bool:
payload = {
"chat_id": str(chat_id),
"text": chunk,
"disable_web_page_preview": True,
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=15.0)
if response.status_code >= 400:
err_desc = response.text[:300]
try:
body = response.json()
err_desc = body.get("description") or err_desc
except Exception:
pass
logger.error(
"Telegram sendMessage failed: bot_id=%s chat_id=%s status=%s desc=%s",
token_id,
chat_id,
response.status_code,
err_desc,
)
if response.status_code >= 400:
err_desc = response.text[:300]
try:
body = response.json()
err_desc = body.get("description") or err_desc
except Exception:
pass
logger.error(
"Telegram sendMessage failed: bot_id=%s chat_id=%s status=%s desc=%s",
token_id,
chat_id,
response.status_code,
err_desc,
)
return False
return True
except Exception as e:
logger.error("Telegram sendMessage exception: bot_id=%s chat_id=%s error=%s", token_id, chat_id, e)
return False
logger.info("Telegram message sent: bot_id=%s chat_id=%s", token_id, chat_id)
return True
except Exception as e:
logger.error("Telegram sendMessage exception: bot_id=%s chat_id=%s error=%s", token_id, chat_id, e)
return False
all_ok = True
chunks = _chunk_text(safe_text, max_len=TELEGRAM_MAX_MESSAGE_LENGTH)
for chunk in chunks:
sent = await _send_chunk(chunk)
all_ok = all_ok and sent
if all_ok:
logger.info("Telegram message sent: bot_id=%s chat_id=%s chunks=%s", token_id, chat_id, len(chunks))
return all_ok
# ========================================