Gateway/Doc: source-lock, PII guard, intent retry, shared Excel contract (#4)

* gateway: enforce source-lock, pii guard, style profile, and intent retry

* doc-service: add shared deterministic excel answer contract

* gateway: auto-handle unresolved user questions in chat context

* gateway: fix greeting UX and reduce false photo-intent fallbacks

---------

Co-authored-by: Apple <apple@MacBook-Pro.local>
This commit is contained in:
Lord of Chaos
2026-02-21 10:16:43 +02:00
committed by GitHub
parent ce6c9ec60a
commit 815a287474
2 changed files with 566 additions and 56 deletions

View File

@@ -4,6 +4,7 @@ Handles incoming webhooks from Telegram, Discord, etc.
"""
import asyncio
import base64
import copy
import json
import re
import logging
@@ -67,6 +68,10 @@ PENDING_STATE_TTL = 1800 # 30 minutes
USER_LANGUAGE_PREFS: Dict[str, Dict[str, Any]] = {}
USER_LANGUAGE_PREF_TTL = 30 * 24 * 3600 # 30 days
# Per-user response style cache (agent:chat:user -> {style, ts})
USER_RESPONSE_STYLE_PREFS: Dict[str, Dict[str, Any]] = {}
USER_RESPONSE_STYLE_PREF_TTL = 30 * 24 * 3600 # 30 days
# Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts})
RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {}
RECENT_PHOTO_TTL = 30 * 60 # 30 minutes
@@ -139,17 +144,6 @@ def _looks_like_photo_followup(text: str) -> bool:
if any(m in t for m in direct_markers):
return True
# If user is correcting previous visual interpretation, route to vision again.
correction_markers = [
"неправильна відповідь", "не правильна відповідь", "не видумуй", "це не так",
"ти помилився", "ти помилилась", "неправильно визначив",
"wrong answer", "you are wrong", "that is incorrect",
"неправильный ответ", "это не так", "ты ошибся",
]
photo_topic_markers = ["фото", "зображ", "рослин", "image", "photo", "plant", "растен"]
if any(c in t for c in correction_markers) and any(p in t for p in photo_topic_markers):
return True
# Flexible forms: "що на ... фото/зображенні/світлині"
if re.search(r"(що|what|что)\s+на\s+.*(фото|зображ|світлин|image|photo)", t):
# Exclude common meta-questions
@@ -173,6 +167,113 @@ def _is_agromatrix_plant_intel_intent(agent_id: str, text: str) -> bool:
]
return any(m in tl for m in markers)
def _needs_photo_only_response(text: str) -> bool:
"""
Return True only for explicit requests to analyze/describe image content.
Do not trigger on meta-dialogue about previous mistakes.
"""
t = (text or "").strip().lower()
if not t:
return False
explicit_patterns = [
r"(що|what|что).{0,24}(на|in).{0,24}(фото|зображ|світлин|image|photo)",
r"(опиши|describe|проаналізуй|analyz|анализируй).{0,32}(фото|зображ|image|photo)",
r"(яка|какая|what).{0,28}(рослин|plant|культура).{0,28}(на|in).{0,28}(фото|image|photo)",
]
return any(re.search(p, t) for p in explicit_patterns)
def _is_simple_greeting(text: str) -> bool:
t = (text or "").strip().lower()
if not t:
return False
compact = re.sub(r"[^a-zаіїєґ0-9 ]+", "", t).strip()
greetings = {
"привіт", "вітаю", "добрий день", "доброго дня", "доброго вечора",
"hello", "hi", "hey", "good morning", "good evening",
}
if compact in greetings:
return True
# Short greeting variants like "привіт!" / "hi!"
return len(compact.split()) <= 3 and any(g in compact for g in greetings)
def _extract_unanswered_user_messages(
memory_context: Dict[str, Any],
current_user_id: str,
max_items: int = 3,
) -> List[str]:
events = memory_context.get("recent_events") or []
if not isinstance(events, list) or not current_user_id:
return []
def _normalize_tokens(raw: str) -> set:
toks = re.findall(r"[a-zA-Zа-яА-ЯіїєґІЇЄҐ0-9]{3,}", (raw or "").lower())
stop = {
"що", "як", "коли", "де", "хто", "чому", "який", "яка", "яке", "скільки", "чи",
"what", "how", "when", "where", "who", "why", "which",
"and", "for", "the", "this", "that", "with", "from",
}
return {t for t in toks if t not in stop}
def _looks_like_ack_or_generic(raw: str) -> bool:
t = (raw or "").strip().lower()
if not t:
return True
markers = [
"привіт", "вітаю", "чим можу допомогти", "ок", "добре", "дякую", "готово",
"hello", "hi", "how can i help", "thanks", "okay", "done",
]
return any(m in t for m in markers) and len(t) < 180
def _assistant_resolves_question(question_text: str, assistant_text: str) -> bool:
if _looks_like_ack_or_generic(assistant_text):
return False
q_tokens = _normalize_tokens(question_text)
a_tokens = _normalize_tokens(assistant_text)
if not q_tokens or not a_tokens:
return False
overlap = len(q_tokens.intersection(a_tokens))
# Require partial semantic overlap, otherwise do not auto-close.
return overlap >= 2 or (overlap >= 1 and len(q_tokens) <= 3)
pending: List[Dict[str, str]] = []
for ev in events:
role = str(ev.get("role") or ev.get("type") or "").lower()
text = str(ev.get("body_text") or "").strip()
if not text:
continue
if role == "user" and str(ev.get("user_id") or "") == current_user_id and _is_question_like(text):
pending.append({"text": text})
continue
if role in ("assistant", "agent") and pending:
# Resolve only matching question; do not auto-close all pending items.
resolved_idx = None
for idx, item in enumerate(pending):
if _assistant_resolves_question(item["text"], text):
resolved_idx = idx
break
if resolved_idx is not None:
pending.pop(resolved_idx)
if len(pending) > max_items:
pending = pending[-max_items:]
return [p["text"] for p in pending]
def _is_question_like(text: str) -> bool:
if not text:
return False
t = text.strip().lower()
if "?" in t:
return True
return bool(
re.search(
r"\b(що|як|чому|коли|де|хто|чи|what|why|how|when|where|who|зачем|почему|когда|где|кто|ли)\b",
t,
)
)
def _cleanup_user_language_prefs() -> None:
now = time.time()
@@ -181,6 +282,13 @@ def _cleanup_user_language_prefs() -> None:
del USER_LANGUAGE_PREFS[k]
def _cleanup_user_response_style_prefs() -> None:
now = time.time()
expired = [k for k, v in USER_RESPONSE_STYLE_PREFS.items() if now - float(v.get("ts", 0)) > USER_RESPONSE_STYLE_PREF_TTL]
for k in expired:
del USER_RESPONSE_STYLE_PREFS[k]
def _normalize_lang_code(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
@@ -1252,7 +1360,134 @@ def should_force_concise_reply(text: str) -> bool:
# For regular Q&A in chat keep first response concise by default.
return True
def _detect_response_style_signal(text: str) -> Optional[str]:
t = (text or "").strip().lower()
if not t:
return None
concise_markers = ["коротко", "коротка відповідь", "лаконічно", "brief", "short answer"]
detailed_markers = ["детально", "розгорнуто", "поясни детальніше", "deep dive", "step by step"]
if any(m in t for m in detailed_markers):
return "detailed"
if any(m in t for m in concise_markers):
return "concise"
return None
async def resolve_response_style_preference(
agent_id: str,
chat_id: str,
user_id: str,
text: str,
team_id: Optional[str],
) -> str:
_cleanup_user_response_style_prefs()
cache_key = f"{agent_id}:{chat_id}:{user_id}"
signal = _detect_response_style_signal(text)
now = time.time()
if signal in ("concise", "detailed"):
USER_RESPONSE_STYLE_PREFS[cache_key] = {"style": signal, "ts": now}
await memory_client.upsert_fact(
user_id=f"tg:{user_id}",
fact_key=f"communication_profile:{agent_id}",
fact_value_json={"response_style": signal, "updated_at": datetime.utcnow().isoformat()},
team_id=team_id,
)
return signal
cached = USER_RESPONSE_STYLE_PREFS.get(cache_key)
if cached:
return str(cached.get("style") or "concise")
fact = await memory_client.get_fact(
user_id=f"tg:{user_id}",
fact_key=f"communication_profile:{agent_id}",
team_id=team_id,
)
if fact:
data = fact.get("fact_value_json") if isinstance(fact, dict) else None
if isinstance(data, str):
try:
data = json.loads(data)
except Exception:
data = None
if isinstance(data, dict):
style = str(data.get("response_style") or "").lower()
if style in ("concise", "detailed"):
USER_RESPONSE_STYLE_PREFS[cache_key] = {"style": style, "ts": now}
return style
return "concise"
def _redact_private_patterns(text: str) -> str:
if not text:
return ""
sanitized = text
sanitized = re.sub(r"(?i)(email|e-mail)\s*[:\-]?\s*[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", r"\1: [redacted]", sanitized)
sanitized = re.sub(r"(?i)\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", "[redacted-email]", sanitized)
sanitized = re.sub(r"(?<!\d)(\+?\d[\d\s\-\(\)]{8,}\d)(?!\d)", "[redacted-phone]", sanitized)
return sanitized
def _is_private_profile_dump_request(user_text: str) -> bool:
t = (user_text or "").lower()
if not t:
return False
markers = [
"приватн", "контакт ментор", "телефон ментор", "email ментор",
"всі контакти", "скинь контакти", "private data", "mentor contacts",
]
return any(m in t for m in markers)
def _block_private_profile_dump(user_text: str) -> Optional[str]:
if not _is_private_profile_dump_request(user_text):
return None
return (
"Не можу надавати приватні дані людей (контакти, особисті профілі). "
"Можу дати лише публічну, узагальнену інформацію."
)
def _is_numeric_question(text: str) -> bool:
t = (text or "").lower()
if not t:
return False
markers = ["скільки", "сума", "витрат", "добрив", "грн", "кг", "вартість", "cost", "amount", "total", "spent"]
return any(m in t for m in markers)
def _has_numeric_answer_contract(answer_text: str) -> bool:
a = (answer_text or "").lower()
if not a:
return False
has_value = bool(re.search(r"\d", a))
has_unit = any(u in a for u in ("грн", "uah", "usd", "eur", "кг", "kg", "%"))
has_source = any(s in a for s in ("рядок", "лист", "sheet", "row", "джерело"))
return has_value and has_unit and has_source
def _answer_seems_off_intent(user_text: str, answer_text: str) -> bool:
u = (user_text or "").lower()
a = (answer_text or "").lower()
if not u or not a:
return False
if _is_numeric_question(u) and not _has_numeric_answer_contract(answer_text):
return True
if any(k in u for k in ("excel", "xlsx", "таблиц", "файл", "звіт")) and any(
k in a for k in ("вступ", "структура", "презентаці", "слайд")
):
return True
return False
def _sanitize_agent_answer_v2(agent_id: str, user_text: str, answer_text: str) -> str:
blocked = _block_private_profile_dump(user_text)
if blocked:
return blocked
sanitized = _redact_private_patterns(answer_text or "")
return sanitized
def _strip_answer_markup_noise(answer_text: str) -> str:
if not answer_text:
return ""
@@ -2652,14 +2887,33 @@ async def handle_telegram_webhook(
text = update.message.get("text", "")
caption = update.message.get("caption", "")
# Friendly greeting fast-path for better UX and less mechanical replies.
if _is_simple_greeting(text):
greeting_reply = (
f"Привіт, {username or 'друже'}! Я {agent_config.name}. "
"Можу допомогти з фото рослин, Excel-звітами та короткими практичними порадами."
)
await send_telegram_message(chat_id, greeting_reply, telegram_token)
await memory_client.save_chat_turn(
agent_id=agent_config.agent_id,
team_id=dao_id,
user_id=f"tg:{user_id}",
message=text,
response=greeting_reply,
channel_id=chat_id,
scope="short_term",
save_agent_response=True,
agent_metadata={"greeting_fast_path": True},
username=username,
)
return {"ok": True, "agent": agent_config.agent_id, "mode": "greeting_fast_path"}
# Photo/image intent guard:
# if text references a photo/image, try to resolve latest file_id and route to vision.
photo_intent = False
if text:
tl = text.lower()
photo_intent = _looks_like_photo_followup(text) or any(
k in tl for k in ("фото", "зображ", "світлин", "image", "photo")
)
photo_intent = _looks_like_photo_followup(text)
if not photo_intent:
# Robust fallback for common formulations like "що на цьому фото?"
photo_intent = bool(
@@ -2706,8 +2960,7 @@ async def handle_telegram_webhook(
return followup_result
# Hard guard: don't send photo-related requests to text LLM path when image context is missing.
is_question_like = ("?" in text) or any(k in tl for k in ("що", "опиши", "проанал", "what", "describe", "analy", "что"))
if is_question_like:
if _needs_photo_only_response(text):
await send_telegram_message(
chat_id,
"Бачу питання про фото, але не знайшов зображення в історії сесії. Надішли фото ще раз з коротким питанням, і я одразу проаналізую.",
@@ -2792,32 +3045,52 @@ async def handle_telegram_webhook(
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
# Check if it's a question (simple heuristic: contains question words or ends with ?)
is_question = (
"?" in text or
any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"])
)
is_question = _is_question_like(text)
if is_question:
logger.info(f"{agent_config.name}: Follow-up question detected for doc_id={doc_context.doc_id}")
# Try RAG query first
rag_result = await ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}"
)
try:
rag_result = await asyncio.wait_for(
ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
),
timeout=25.0,
)
except asyncio.TimeoutError:
logger.warning(
f"{agent_config.name}: doc follow-up timeout for doc_id={doc_context.doc_id}; "
"fallback to regular chat path"
)
rag_result = None
if rag_result.success and rag_result.answer:
if rag_result and rag_result.success and rag_result.answer:
# Truncate if too long for Telegram
answer = rag_result.answer
answer = postprocess_agent_answer(
agent_id=agent_config.agent_id,
user_text=text or "",
answer_text=rag_result.answer,
force_detailed=should_force_detailed_reply(text),
needs_complex_reasoning=requires_complex_reasoning(text),
)
if len(answer) > TELEGRAM_SAFE_LENGTH:
answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_"
answer = _sanitize_agent_answer_v2(agent_config.agent_id, text or "", answer)
await send_telegram_message(chat_id, answer, telegram_token)
return {"ok": True, "agent": "parser", "mode": "rag_query"}
# Fall through to regular chat if RAG query fails
# Source-lock: with active document context answer only from that document.
await send_telegram_message(
chat_id,
"Не знайшов точну відповідь у поточному документі. Уточни питання або надішли файл повторно.",
telegram_token,
)
return {"ok": True, "agent": "parser", "mode": "source_lock_no_answer"}
# ========================================
# BEHAVIOR POLICY v2.1: Check if should respond
@@ -2870,6 +3143,19 @@ async def handle_telegram_webhook(
)
respond_decision = sowa_decision.should_respond
respond_reason = sowa_decision.reason
# AgroMatrix usability guard:
# In dedicated chats users often ask short operational questions without explicit mention.
# Do not silence clear question turns for agromatrix.
if (
sowa_decision.action == "SILENT"
and agent_config.agent_id == "agromatrix"
and _is_question_like(text)
):
sowa_decision.action = "FULL"
sowa_decision.should_respond = True
respond_decision = True
respond_reason = "agromatrix_question_guard"
if sowa_decision.action == "SILENT":
logger.info(f"\U0001f507 SOWA: Agent {agent_config.agent_id} NOT responding. Reason: {respond_reason}")
@@ -2972,11 +3258,36 @@ async def handle_telegram_webhook(
training_prefix = "[РЕЖИМ НАВЧАННЯ - відповідай на це повідомлення, ти в навчальній групі Agent Preschool]\n\n"
logger.info(f"🎓 Training mode activated for chat {chat_id}")
unresolved_questions = _extract_unanswered_user_messages(
memory_context=memory_context,
current_user_id=f"tg:{user_id}",
max_items=3,
)
unresolved_non_current: List[str] = []
unresolved_block = ""
if unresolved_questions:
# Do not duplicate current prompt if it matches one pending message.
unresolved_non_current = [q for q in unresolved_questions if q.strip() != (text or "").strip()]
if unresolved_non_current:
unresolved_block = (
"[КРИТИЧНО: є невідповідані питання цього користувача. "
"Спочатку коротко відповідай на них, потім на поточне повідомлення. "
"Не змінюй тему і не ігноруй pending-питання.]\n"
"[Невідповідані питання цього користувача]\n"
+ "\n".join(f"- {q}" for q in unresolved_non_current)
+ "\n\n"
)
if local_history:
# Add conversation history to message for better context understanding
message_with_context = f"{training_prefix}[Контекст розмови]\n{local_history}\n\n[Поточне повідомлення від {username}]\n{text}"
message_with_context = (
f"{training_prefix}"
f"[Контекст розмови]\n{local_history}\n\n"
f"{unresolved_block}"
f"[Поточне повідомлення від {username}]\n{text}"
)
else:
message_with_context = f"{training_prefix}{text}"
message_with_context = f"{training_prefix}{unresolved_block}{text}"
preferred_lang = await resolve_preferred_language_persistent(
chat_id=chat_id,
@@ -2986,6 +3297,15 @@ async def handle_telegram_webhook(
team_id=dao_id,
)
preferred_lang_label = preferred_language_label(preferred_lang)
response_style_pref = await resolve_response_style_preference(
agent_id=agent_config.agent_id,
chat_id=chat_id,
user_id=str(user_id),
text=text or "",
team_id=dao_id,
)
force_detailed = should_force_detailed_reply(text) or response_style_pref == "detailed"
force_concise = (not force_detailed) and (should_force_concise_reply(text) or response_style_pref == "concise")
# Build request to Router
system_prompt = agent_config.system_prompt
@@ -3014,6 +3334,9 @@ async def handle_telegram_webhook(
"is_training_group": is_training_group,
"preferred_response_language": preferred_lang,
"preferred_response_language_label": preferred_lang_label,
"response_style_preference": response_style_pref,
"has_unresolved_questions": bool(unresolved_non_current),
"unresolved_questions_count": len(unresolved_non_current),
},
"context": {
"agent_name": agent_config.name,
@@ -3026,13 +3349,13 @@ async def handle_telegram_webhook(
},
}
if should_force_detailed_reply(text):
if force_detailed:
router_request["metadata"]["force_detailed"] = True
if _is_agromatrix_plant_intel_intent(agent_config.agent_id, text):
router_request["metadata"]["crewai_profile"] = "plant_intel"
if should_force_concise_reply(text):
if force_concise:
# IMPORTANT: preserve conversation context! Only append concise instruction
router_request["metadata"]["force_concise"] = True
router_request["message"] = (
@@ -3051,12 +3374,18 @@ async def handle_telegram_webhook(
router_request["metadata"]["provider"] = "cloud_deepseek"
router_request["metadata"]["reason"] = "auto_complex"
if not should_force_concise_reply(text):
if not force_concise:
router_request["message"] = (
router_request["message"]
+ f"\n\n(Мова відповіді: {preferred_lang_label}.)"
+ "\n(Не потрібно щоразу представлятися по імені або писати шаблонне: 'чим можу допомогти'.)"
)
if unresolved_non_current:
router_request["message"] = (
router_request["message"]
+ "\n\n(Пріоритет відповіді: 1) закрий невідповідані питання користувача; "
"2) дай відповідь на поточне повідомлення. Якщо питання пов'язані, дай одну узгоджену відповідь.)"
)
# Send to Router
logger.info(f"Sending to Router: agent={agent_config.agent_id}, dao={dao_id}, user=tg:{user_id}")
@@ -3128,6 +3457,30 @@ async def handle_telegram_webhook(
username=username,
)
return {"ok": True, "skipped": True, "reason": "no_output_from_llm"}
# Retry policy: if response drifts from current intent, do one strict reroute.
if _answer_seems_off_intent(text or "", answer_text):
try:
strict_request = copy.deepcopy(router_request)
strict_request["metadata"]["intent_retry"] = 1
strict_request["metadata"]["disable_tools"] = True
strict_request["metadata"]["max_tool_rounds"] = 1
strict_request["metadata"]["temperature"] = 0.1
strict_request["message"] = (
f"{message_with_context}\n\n"
"(Жорстка інструкція: відповідай тільки на ПОТОЧНЕ питання користувача. "
"Не змінюй тему, не генеруй презентацію/план, якщо цього не просили. "
"Для числових питань: дай value + unit + джерело (лист/рядок).)"
)
retry_response = await send_to_router(strict_request)
if isinstance(retry_response, dict) and retry_response.get("ok"):
retry_text = retry_response.get("data", {}).get("text") or retry_response.get("response", "")
if retry_text and not is_no_output_response(retry_text):
answer_text = retry_text
router_request = strict_request
logger.info("Intent retry succeeded with strict prompt")
except Exception as retry_err:
logger.warning(f"Intent retry failed: {retry_err}")
force_detailed_reply = bool(router_request.get("metadata", {}).get("force_detailed"))
answer_text = postprocess_agent_answer(
@@ -3137,7 +3490,7 @@ async def handle_telegram_webhook(
force_detailed=force_detailed_reply,
needs_complex_reasoning=needs_complex_reasoning,
)
answer_text = _sanitize_agent_answer(answer_text, user_id=user_id)
answer_text = _sanitize_agent_answer_v2(agent_config.agent_id, text or "", answer_text)
# Skip Telegram sending for prober requests (chat_id=0)
if is_prober:
@@ -3581,26 +3934,39 @@ async def _old_telegram_webhook(update: TelegramUpdate):
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
# Check if it's a question (simple heuristic: contains question words or ends with ?)
is_question = (
"?" in text or
any(word in text.lower() for word in ["що", "як", "чому", "коли", "де", "хто", "чи"])
)
is_question = _is_question_like(text)
if is_question:
logger.info(f"Follow-up question detected for doc_id={doc_context.doc_id}")
# Try RAG query first
rag_result = await ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}"
)
try:
rag_result = await asyncio.wait_for(
ask_about_document(
session_id=session_id,
question=text,
doc_id=doc_context.doc_id,
dao_id=dao_id or doc_context.dao_id,
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
),
timeout=25.0,
)
except asyncio.TimeoutError:
logger.warning(
f"Doc follow-up timeout for agent={agent_config.agent_id}, "
f"doc_id={doc_context.doc_id}; fallback to regular chat path"
)
rag_result = None
if rag_result.success and rag_result.answer:
if rag_result and rag_result.success and rag_result.answer:
# Truncate if too long for Telegram
answer = rag_result.answer
answer = postprocess_agent_answer(
agent_id=agent_config.agent_id,
user_text=text or "",
answer_text=rag_result.answer,
force_detailed=should_force_detailed_reply(text),
needs_complex_reasoning=requires_complex_reasoning(text),
)
if len(answer) > TELEGRAM_SAFE_LENGTH:
answer = answer[:TELEGRAM_SAFE_LENGTH] + "\n\n_... (відповідь обрізано)_"

View File

@@ -12,6 +12,7 @@ import os
import logging
import hashlib
import json
import re
from typing import Optional, Dict, Any, List
from pydantic import BaseModel
from datetime import datetime
@@ -21,6 +22,8 @@ from memory_client import memory_client
logger = logging.getLogger(__name__)
SHARED_EXCEL_POLICY_AGENTS = {"agromatrix", "helion", "nutra", "greenfood"}
class QAItem(BaseModel):
"""Single Q&A pair"""
@@ -80,6 +83,112 @@ class DocumentService:
def __init__(self):
"""Initialize document service"""
self.memory_client = memory_client
def _is_excel_filename(self, file_name: Optional[str]) -> bool:
if not file_name:
return False
lower = file_name.lower()
return lower.endswith(".xlsx") or lower.endswith(".xls")
def _is_numeric_question(self, question: str) -> bool:
t = (question or "").lower()
if not t:
return False
markers = [
"скільки", "сума", "витрат", "добрив", "грн", "uah", "usd", "eur",
"сколько", "amount", "total", "spent", "cost", "value",
]
return any(m in t for m in markers)
def _extract_query_tokens(self, question: str) -> List[str]:
tokens = re.findall(r"[a-zA-Zа-яА-ЯіїєґІЇЄҐ0-9]{3,}", (question or "").lower())
stop = {
"яка", "який", "яке", "which", "what", "скільки", "сума", "була",
"витрачена", "write", "show", "give", "please", "мені", "будь", "ласка",
"тому", "цьому", "цей", "this", "that", "for", "and", "the",
}
return [t for t in tokens if t not in stop]
async def _try_answer_excel_question(
self,
question: str,
doc_url: Optional[str],
file_name: Optional[str],
) -> Optional[str]:
if not doc_url or not self._is_numeric_question(question):
return None
try:
import httpx
from io import BytesIO
import openpyxl
except Exception:
return None
query_tokens = self._extract_query_tokens(question)
if not query_tokens:
query_tokens = ["сума", "витрати", "добрив"]
try:
async with httpx.AsyncClient(timeout=20.0) as client:
resp = await client.get(doc_url)
if resp.status_code != 200:
return None
content = resp.content
wb = openpyxl.load_workbook(BytesIO(content), data_only=True, read_only=True)
best = None
best_score = -1
fallback = None
for ws in wb.worksheets:
for row_idx, row in enumerate(ws.iter_rows(values_only=True), start=1):
label = ""
numeric_value = None
for cell in row:
if isinstance(cell, (int, float)) and numeric_value is None:
numeric_value = float(cell)
elif isinstance(cell, str) and not label:
label = cell.strip()
if numeric_value is None:
continue
label_low = label.lower()
score = sum(1 for t in query_tokens if t in label_low)
if score > best_score:
best_score = score
best = {
"sheet": ws.title,
"row": row_idx,
"label": label or "n/a",
"value": numeric_value,
}
if fallback is None and any(m in label_low for m in ("добрив", "fertiliz", "удобр")):
fallback = {
"sheet": ws.title,
"row": row_idx,
"label": label or "n/a",
"value": numeric_value,
}
picked = best if best and best_score > 0 else fallback
if not picked:
return None
value = picked["value"]
if abs(value - int(value)) < 1e-9:
value_str = f"{int(value):,}".replace(",", " ")
else:
value_str = f"{value:,.2f}".replace(",", " ").replace(".", ",")
unit = "грн" if self._is_numeric_question(question) else ""
unit_part = f" {unit}" if unit else ""
file_part = f' у файлі "{file_name}"' if file_name else ""
return (
f"За{file_part}: {value_str}{unit_part}. "
f"Джерело: лист {picked['sheet']}, рядок {picked['row']} ({picked['label']})."
)
except Exception as e:
logger.warning(f"Excel deterministic answer failed: {e}")
return None
async def save_doc_context(
self,
@@ -451,7 +560,8 @@ class DocumentService:
question: str,
doc_id: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None
user_id: Optional[str] = None,
agent_id: str = "daarwizz"
) -> QAResult:
"""
Ask a question about a document using RAG query.
@@ -468,11 +578,20 @@ class DocumentService:
"""
try:
# If doc_id not provided, try to get from context
doc_url = None
file_name = None
if not doc_id:
doc_context = await self.get_doc_context(session_id)
if doc_context:
doc_id = doc_context.doc_id
dao_id = dao_id or doc_context.dao_id
doc_url = doc_context.doc_url
file_name = doc_context.file_name
else:
doc_context = await self.get_doc_context(session_id)
if doc_context:
doc_url = doc_context.doc_url
file_name = doc_context.file_name
if not doc_id:
return QAResult(
@@ -484,11 +603,32 @@ class DocumentService:
if not user_id:
parts = session_id.split(":", 1)
user_id = parts[1] if len(parts) > 1 else session_id
# Shared deterministic Excel policy for top-level agrarian agents.
if (
(agent_id or "").lower() in SHARED_EXCEL_POLICY_AGENTS
and self._is_excel_filename(file_name)
):
deterministic = await self._try_answer_excel_question(
question=question,
doc_url=doc_url,
file_name=file_name,
)
if deterministic:
return QAResult(
success=True,
answer=deterministic,
doc_id=doc_id,
sources=[{
"type": "excel_deterministic",
"file_name": file_name,
}],
)
# Build RAG query request
router_request = {
"mode": "rag_query",
"agent": "daarwizz",
"agent": agent_id,
"metadata": {
"source": self._extract_source(session_id),
"dao_id": dao_id,
@@ -503,7 +643,9 @@ class DocumentService:
},
}
logger.info(f"RAG query: session={session_id}, question={question[:50]}, doc_id={doc_id}")
logger.info(
f"RAG query: agent={agent_id}, session={session_id}, question={question[:50]}, doc_id={doc_id}"
)
# Send to Router
response = await send_to_router(router_request)
@@ -593,7 +735,8 @@ async def ask_about_document(
question: str,
doc_id: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None
user_id: Optional[str] = None,
agent_id: str = "daarwizz"
) -> QAResult:
"""Ask a question about a document using RAG query"""
return await doc_service.ask_about_document(
@@ -601,7 +744,8 @@ async def ask_about_document(
question=question,
doc_id=doc_id,
dao_id=dao_id,
user_id=user_id
user_id=user_id,
agent_id=agent_id
)