fix(fabric): use broadcast subject for NATS capabilities discovery

NATS wildcards (node.*.capabilities.get) only work for subscriptions,
not for publish. Switch to a dedicated broadcast subject
(fabric.capabilities.discover) that all NCS instances subscribe to,
enabling proper scatter-gather discovery across nodes.

Made-with: Cursor
This commit is contained in:
Apple
2026-02-27 03:20:13 -08:00
parent a6531507df
commit 90080c632a
28 changed files with 8883 additions and 1459 deletions

View File

@@ -25,11 +25,27 @@ from pydantic import BaseModel
from router_client import send_to_router
from memory_client import memory_client
from vision_guard import (
extract_label_from_response as _vg_extract_label,
get_vision_lock as _vg_get_lock,
set_vision_lock as _vg_set_lock,
clear_vision_lock as _vg_clear_lock,
set_user_label as _vg_set_user_label,
detect_user_override as _vg_detect_override,
should_skip_reanalysis as _vg_should_skip,
build_low_confidence_clarifier as _vg_build_low_conf,
build_locked_reply as _vg_build_locked_reply,
)
from services.doc_service import (
parse_document,
ingest_document,
ask_about_document,
get_doc_context
get_doc_context,
save_chat_doc_context,
get_chat_doc_context,
fetch_telegram_file_bytes,
extract_summary_from_bytes,
upsert_chat_doc_context_with_summary,
)
from behavior_policy import (
should_respond,
@@ -44,6 +60,7 @@ from behavior_policy import (
get_ack_text,
is_prober_request,
has_agent_chat_participation,
has_recent_interaction,
NO_OUTPUT,
BehaviorDecision,
AGENT_NAME_VARIANTS,
@@ -51,6 +68,16 @@ from behavior_policy import (
logger = logging.getLogger(__name__)
def _safe_has_recent_interaction(agent_id: str, chat_id: str, user_id: str) -> bool:
"""Guard: avoid 500 if has_recent_interaction is missing or raises. Returns False on any error."""
try:
return bool(has_recent_interaction(agent_id, str(chat_id), str(user_id)))
except Exception as e:
logger.warning("has_recent_interaction failed, treating as False: %s", e)
return False
# Telegram message length limits
TELEGRAM_MAX_MESSAGE_LENGTH = 4096
TELEGRAM_SAFE_LENGTH = 3500 # Leave room for formatting
@@ -654,6 +681,31 @@ def _get_last_pending(chat_id: str) -> list | None:
return rec.get('items')
def _find_doc_in_history(history_text: str) -> dict | None:
"""
Шукає посилання на документ у тексті chat history.
Якщо знаходить рядок '[Документ: ...]' — повертає doc_context stub.
Це дозволяє Степану знати про документ навіть без збереженого extracted_summary.
"""
import re as _re
if not history_text:
return None
# Шукаємо паттерн [Документ: filename.xlsx]
matches = _re.findall(r'\[Документ:\s*([^\]]+)\]', history_text)
if not matches:
# Також шукаємо assistant-повідомлення про документ
matches = _re.findall(r'📄[^\n]*\*\*([^*]+)\*\*', history_text)
if matches:
file_name = matches[-1].strip() # Беремо найновіший
return {
"doc_id": "",
"title": file_name,
"extracted_summary": "", # немає вмісту — але є назва
"from_history": True,
}
return None
def _set_last_pending(chat_id: str, items: list):
LAST_PENDING_STATE[str(chat_id)] = {"ts": time.time(), "items": items}
@@ -992,6 +1044,18 @@ async def druid_telegram_webhook(update: TelegramUpdate):
# AGROMATRIX webhook endpoint
# AGX_STEPAN_MODE: inproc = run Crew in-process (default); http = call crewai-service (9010).
_STEPAN_MODE = None
def _get_stepan_mode() -> str:
global _STEPAN_MODE
if _STEPAN_MODE is None:
_STEPAN_MODE = (os.getenv("AGX_STEPAN_MODE", "inproc") or "inproc").strip().lower()
if _STEPAN_MODE not in ("inproc", "http"):
_STEPAN_MODE = "inproc"
logger.info("Stepan mode=%s (AGX_STEPAN_MODE)", _STEPAN_MODE)
return _STEPAN_MODE
async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfig) -> Dict[str, Any]:
update_id = getattr(update, 'update_id', None) or update.update_id
@@ -1004,14 +1068,167 @@ async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfi
message = update.message or update.channel_post or {}
text = message.get('text') or message.get('caption') or ''
if not text:
return {"ok": True, "status": "no_text"}
user = message.get('from', {}) or {}
chat = message.get('chat', {}) or {}
user_id = str(user.get('id', ''))
chat_id = str(chat.get('id', ''))
# ── DOC HANDOFF + EXTRACT-ON-UPLOAD (v3.4 / PROMPT 30) ─────────────────
# При отриманні документа (operator path):
# 1) зберегти базовий doc_ctx (doc_id, file_name)
# 2) для XLSX/XLS/CSV: завантажити байти через Bot API, витягнути summary,
# оновити doc_context_chat з extracted_summary → Stepan бачить дані одразу
_doc_obj = message.get("document")
if _doc_obj and _doc_obj.get("file_id"):
_file_id_tg = _doc_obj.get("file_id")
_fu_id = _doc_obj.get("file_unique_id") or _file_id_tg
_fname = _doc_obj.get("file_name") or ""
_bot_token = agent_config.get_telegram_token() or ""
_doc_ctx_to_save: dict = {
"doc_id": _fu_id,
"file_unique_id": _fu_id,
"file_id": _file_id_tg,
"file_name": _fname,
"source": "telegram",
# Fix D: явно фіксуємо anchor одразу при upload — run.py може читати без парсингу doc_id
"active_doc_id": _fu_id,
}
# Крок 1: зберегти базовий doc_ctx (await = race-safe)
await save_chat_doc_context(chat_id, agent_config.agent_id, _doc_ctx_to_save)
logger.info("Doc Handoff: saved base doc_id=%s file=%s", str(_fu_id)[:16], _fname)
# Крок 2: Extract-on-upload для табличних форматів
_fname_lower = _fname.lower()
_extractable = _fname_lower.endswith((".xlsx", ".xls", ".csv"))
_extract_ok = False
if _extractable and _bot_token:
# Fix 1: One-shot cache — якщо summary вже є для того самого file_unique_id → skip
_existing_ctx = await get_chat_doc_context(chat_id, agent_config.agent_id)
_already_have = (
_existing_ctx
and _existing_ctx.get("extracted_summary")
and (_existing_ctx.get("file_unique_id") or _existing_ctx.get("doc_id")) == _fu_id
)
if _already_have:
_extract_ok = True
logger.info("doc_extract_skipped reason=already_have_summary chat_id=%s fuid=%s",
chat_id, str(_fu_id)[:16])
else:
logger.info("doc_extract_started chat_id=%s file=%s", chat_id, _fname)
try:
_file_bytes = await fetch_telegram_file_bytes(_bot_token, _file_id_tg)
_extract_summary = extract_summary_from_bytes(_fname, _file_bytes)
if _extract_summary:
await upsert_chat_doc_context_with_summary(
chat_id, agent_config.agent_id, _doc_ctx_to_save, _extract_summary
)
_extract_ok = True
logger.info("doc_extract_done ok=true chat_id=%s chars=%d",
chat_id, len(_extract_summary))
else:
logger.warning("doc_extract_done ok=false reason=empty_summary chat_id=%s", chat_id)
except Exception as _ee:
logger.warning("doc_extract_done ok=false reason=%s chat_id=%s",
str(_ee)[:80], chat_id)
# Якщо тексту/caption немає — підтверджуємо отримання і виходимо
if not text:
if _extract_ok:
_reply = (
f"Прочитав «{_fname}». Можу: (1) витягнути прибуток/витрати, "
f"(2) сценарій — добрива×2, (3) зведення грн/га. Що потрібно?"
)
elif _extractable:
_reply = (
f"Отримав «{_fname}», але не зміг витягнути дані автоматично. "
f"Постав питання — перегляну через пошук по документу."
)
else:
_reply = (
f"Бачу «{_fname}». Що зробити: витягнути прибуток/витрати, "
f"сценарій, чи звести по га?"
)
await send_telegram_message(chat_id, _reply, bot_token=_bot_token)
return {"ok": True, "status": "doc_saved"}
# ── PHOTO BRIDGE (v3.5) ─────────────────────────────────────────────────
# Фото в operator path раніше провалювалося через "if not text" і тихо ігнорувалося.
# Тепер: делегуємо до process_photo (vision-8b через Router) — той самий шлях,
# що використовують всі інші агенти. Агент AgroMatrix вже має спеціальний контекст
# (prior_label + agricultural system prompt) у process_photo.
_photo_obj = message.get("photo")
if _photo_obj and not text:
# text може бути caption — вже вище: text = message.get('caption') or ''
# якщо caption не порожній — photo+caption піде в text-гілку нижче (Stepan відповідає)
# тут тільки "фото без тексту"
_username = (user.get('username') or user.get('first_name') or str(user_id))
_dao_id = os.getenv("AGX_DAO_ID", "agromatrix-dao")
_bot_tok = agent_config.get_telegram_token() or ""
logger.info("Photo bridge: routing photo to process_photo chat_id=%s", chat_id)
try:
_photo_result = await process_photo(
agent_config=agent_config,
update=update,
chat_id=chat_id,
user_id=user_id,
username=_username,
dao_id=_dao_id,
photo=_photo_obj,
caption_override=None,
bypass_media_gate=True, # operator path = завжди відповідати
)
# v3.5 fix: зберігаємо timestamp фото в session Степана
# щоб наступний текстовий запит (words=1) знав що щойно було фото
try:
from crews.agromatrix_crew.session_context import update_session
import time as _ts_mod
update_session(
chat_id, "[photo]", depth="light", agents=[],
last_question=None,
last_photo_ts=_ts_mod.time(),
)
logger.info("Photo bridge: last_photo_ts saved chat_id=%s", chat_id)
except Exception:
pass
return _photo_result
except Exception as _pe:
logger.warning("Photo bridge error: %s", _pe)
await send_telegram_message(
chat_id,
"Не вдалося обробити фото. Спробуй ще раз або напиши що на фото.",
bot_token=_bot_tok,
)
return {"ok": True, "status": "photo_error"}
if not text:
return {"ok": True, "status": "no_text"}
# ── PHOTO+TEXT: якщо є caption → Stepan отримує опис через doc_context ─────
# Якщо text (caption) є + фото → стандартний flow Степана + зберігаємо file_id
# щоб він міг згадати фото у відповіді.
if _photo_obj and text:
_photo_largest = _photo_obj[-1] if isinstance(_photo_obj, list) else _photo_obj
_photo_file_id = _photo_largest.get("file_id") if isinstance(_photo_largest, dict) else None
if _photo_file_id:
_set_recent_photo_context(agent_config.agent_id, chat_id, user_id, _photo_file_id)
# ── VISION CONSISTENCY GUARD: Хук C — User Override ─────────────────────
# Whitelist + negation guard: "це соняшник" → user_label;
# "це не соняшник" → ігноруємо.
if text:
try:
_vg_override = _vg_detect_override(text)
if _vg_override:
_vg_set_user_label(agent_config.agent_id, chat_id, _vg_override)
logger.info(
"vision_user_override_set agent=%s chat_id=%s label=%s",
agent_config.agent_id, chat_id, _vg_override,
)
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────
# ops mode if operator
ops_mode = False
op_ids = [s.strip() for s in os.getenv('AGX_OPERATOR_IDS', '').split(',') if s.strip()]
@@ -1022,21 +1239,152 @@ async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfi
ops_mode = True
trace_id = str(uuid.uuid4())
stepan_mode = _get_stepan_mode()
if stepan_mode == "http":
logger.warning("Stepan http mode not implemented; use AGX_STEPAN_MODE=inproc.")
bot_token = agent_config.get_telegram_token()
await send_telegram_message(
chat_id,
"Степан у режимі HTTP зараз недоступний. Встановіть AGX_STEPAN_MODE=inproc.",
bot_token=bot_token,
)
return {"ok": False, "status": "stepan_http_not_implemented"}
# call Stepan directly
try:
sys.path.insert(0, str(Path('/opt/microdao-daarion')))
import gateway_boot
except ImportError:
gateway_boot = type(sys)("gateway_boot")
gateway_boot.STEPAN_IMPORTS_OK = False
if not getattr(gateway_boot, "STEPAN_IMPORTS_OK", False):
logger.warning("Stepan inproc disabled: crews/agromatrix_tools not available at startup")
bot_token = agent_config.get_telegram_token()
await send_telegram_message(
chat_id,
"Степан тимчасово недоступний (не встановлено crews або agromatrix-tools).",
bot_token=bot_token,
)
return {"ok": False, "status": "stepan_disabled"}
try:
# v3: crews/ is in /app/gateway-bot/crews (volume-mounted copy)
# AGX_REPO_ROOT can override for dev/alt deployments
repo_root = os.getenv("AGX_REPO_ROOT", "")
_gw = "/app/gateway-bot"
_at = "/app/gateway-bot/agromatrix-tools"
for _p in [_at, _gw, repo_root]:
if _p and _p not in sys.path:
sys.path.insert(0, _p)
from crews.agromatrix_crew.run import handle_message
# Doc Bridge (v3.3): отримати активний doc_context для цього chat.
# Пріоритет: chat-scoped (doc_context_chat:) > session-scoped (doc_context:).
_stepan_doc_ctx: dict | None = None
try:
# 1) Спочатку пробуємо chat-scoped (надійніший при зміні session_id)
_chat_dc = await get_chat_doc_context(chat_id, agent_config.agent_id)
if _chat_dc and (_chat_dc.get("doc_id") or _chat_dc.get("file_unique_id")):
_chat_doc_id = _chat_dc.get("doc_id") or _chat_dc.get("file_unique_id")
_chat_extracted = _chat_dc.get("extracted_summary") or ""
_chat_fname = _chat_dc.get("file_name") or ""
# Якщо chat-scoped є але без extracted_summary → шукаємо в session-scoped
if not _chat_extracted:
try:
_dc_sess = await get_doc_context(f"telegram:{chat_id}", agent_id=agent_config.agent_id)
if _dc_sess and getattr(_dc_sess, "extracted_summary", None):
_chat_extracted = _dc_sess.extracted_summary
except Exception:
pass
# Якщо ще немає — RAG fallback
if not _chat_extracted and _chat_doc_id:
try:
_qa = await ask_about_document(
session_id=f"telegram:{chat_id}",
question=text,
doc_id=_chat_doc_id,
dao_id=os.getenv("AGX_DAO_ID", "agromatrix-dao"),
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
)
if _qa and getattr(_qa, "answer", None):
_chat_extracted = f"[RAG відповідь по документу]: {_qa.answer}"
logger.info("Doc Bridge: RAG answer retrieved for chat doc_id=%s", _chat_doc_id)
except Exception as _qae:
logger.debug("Doc Bridge RAG fallback failed: %s", _qae)
_stepan_doc_ctx = {
"doc_id": _chat_doc_id,
"title": _chat_fname,
"extracted_summary": _chat_extracted,
"file_unique_id": _chat_dc.get("file_unique_id") or _chat_doc_id,
}
logger.info("Doc Bridge: chat-scoped doc_id=%s found=true", _chat_doc_id[:16] if _chat_doc_id else "")
else:
# 2) Fallback: session-scoped (старий ключ)
_dc = await get_doc_context(f"telegram:{chat_id}", agent_id=agent_config.agent_id)
if _dc and getattr(_dc, "doc_id", None):
_extracted = getattr(_dc, "extracted_summary", "") or ""
if not _extracted and getattr(_dc, "doc_id", None):
try:
_qa = await ask_about_document(
session_id=f"telegram:{chat_id}",
question=text,
doc_id=_dc.doc_id,
dao_id=os.getenv("AGX_DAO_ID", "agromatrix-dao"),
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
)
if _qa and getattr(_qa, "answer", None):
_extracted = f"[RAG відповідь по документу]: {_qa.answer}"
logger.info("Doc Bridge: session-scoped RAG retrieved for doc_id=%s", _dc.doc_id)
except Exception as _qae:
logger.debug("Doc Bridge session RAG fallback failed: %s", _qae)
_stepan_doc_ctx = {
"doc_id": _dc.doc_id,
"title": getattr(_dc, "file_name", "") or "",
"extracted_summary": _extracted,
"file_unique_id": _dc.doc_id,
}
logger.info("Doc Bridge: session-scoped doc_id=%s found=true", _dc.doc_id)
except Exception as _dce:
logger.debug("Doc Bridge: could not fetch doc_context: %s", _dce)
# Chat History Bridge (v3.2): передаємо history з memory-service в Степана.
# Степан інакше не має доступу до переписки — він викликається поза Router pipeline.
_stepan_chat_history: str = ""
try:
_ctx = await memory_client.get_context(
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
team_id=os.getenv("AGX_DAO_ID", "agromatrix-dao"),
channel_id=chat_id,
limit=40,
)
_stepan_chat_history = _ctx.get("local_context_text", "") or ""
# Якщо в history є документ — і _stepan_doc_ctx порожній, шукаємо в history
if not _stepan_doc_ctx and _stepan_chat_history:
_doc_in_history = _find_doc_in_history(_stepan_chat_history)
if _doc_in_history:
_stepan_doc_ctx = _doc_in_history
logger.info("Doc Bridge: found doc reference in chat history: %s",
_doc_in_history.get("title", ""))
except Exception as _che:
logger.debug("Chat History Bridge failed (non-blocking): %s", _che)
started = time.time()
last_pending = _get_last_pending(chat_id)
response_text = await asyncio.wait_for(
asyncio.to_thread(handle_message, text, user_id, chat_id, trace_id, ops_mode, last_pending),
timeout=25
asyncio.to_thread(
handle_message, text, user_id, chat_id, trace_id, ops_mode, last_pending,
None, None, bool(_stepan_doc_ctx), _stepan_doc_ctx,
_stepan_chat_history,
),
timeout=55
)
duration_ms = int((time.time() - started) * 1000)
except Exception as e:
logger.error(f"Stepan handler error: {e}; trace_id={trace_id}")
response_text = f"Помилка обробки. trace_id={trace_id}"
# SANITIZE: без trace_id для юзера (trace_id тільки в логах)
response_text = "Щось пішло не так. Спробуй ще раз або переформулюй запит."
duration_ms = 0
# If JSON, try to show summary
@@ -1078,35 +1426,19 @@ async def agromatrix_telegram_webhook(update: TelegramUpdate):
if user_id and user_id in op_ids:
is_ops = True
# Operator NL or operator slash commands -> handle via Stepan handler.
# Important: do NOT treat generic slash commands (/start, /agromatrix) as operator commands,
# otherwise regular users will see "Недостатньо прав" or Stepan errors.
operator_slash_cmds = {
"whoami",
"pending",
"pending_show",
"approve",
"reject",
"apply_dict",
"pending_stats",
}
slash_cmd = ""
if is_slash:
try:
slash_cmd = (msg_text.strip().split()[0].lstrip("/").strip().lower())
except Exception:
slash_cmd = ""
is_operator_slash = bool(slash_cmd) and slash_cmd in operator_slash_cmds
# Stepan handler currently depends on ChatOpenAI (OPENAI_API_KEY). If key is not configured,
# never route production traffic there (avoid "Помилка обробки..." and webhook 5xx).
stepan_enabled = bool(os.getenv("OPENAI_API_KEY", "").strip())
if stepan_enabled and (is_ops or is_operator_slash):
# Operator: any message (not only slash) goes to Stepan when is_ops.
# v3: stepan_enabled checks DEEPSEEK_API_KEY (preferred) OR OPENAI_API_KEY (fallback)
stepan_enabled = bool(
os.getenv("DEEPSEEK_API_KEY", "").strip()
or os.getenv("OPENAI_API_KEY", "").strip()
)
if stepan_enabled and is_ops:
return await handle_stepan_message(update, AGROMATRIX_CONFIG)
if (is_ops or is_operator_slash) and not stepan_enabled:
if is_ops and not stepan_enabled:
logger.warning(
"Stepan handler disabled (OPENAI_API_KEY missing); falling back to Router pipeline "
f"for chat_id={chat_id}, user_id={user_id}, slash_cmd={slash_cmd!r}"
"Stepan handler disabled (no DEEPSEEK_API_KEY / OPENAI_API_KEY); "
"falling back to Router pipeline "
f"for chat_id={chat_id}, user_id={user_id}"
)
# General conversation -> standard Router pipeline (like all other agents)
@@ -1672,11 +2004,16 @@ async def process_photo(
# Telegram sends multiple sizes, get the largest one (last in array)
photo_obj = photo[-1] if isinstance(photo, list) else photo
file_id = photo_obj.get("file_id") if isinstance(photo_obj, dict) else None
# file_unique_id стабільний між розмірами — використовуємо як lock key
file_unique_id: str | None = (photo_obj.get("file_unique_id") if isinstance(photo_obj, dict) else None) or None
if not file_id:
return {"ok": False, "error": "No file_id in photo"}
logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}")
logger.info(
"%s: Photo from %s (tg:%s), file_id: %s file_unique_id: %s",
agent_config.name, username, user_id, file_id, file_unique_id or "n/a",
)
_set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id)
if agent_config.agent_id == "agromatrix":
await _set_agromatrix_last_photo_ref(
@@ -1717,7 +2054,28 @@ async def process_photo(
username=username,
)
return {"ok": True, "skipped": True, "reason": "media_no_question"}
# ── VISION CONSISTENCY GUARD: Rule 1 ─────────────────────────────────────
# Те саме фото (file_unique_id або file_id) вже аналізувалось →
# повертаємо збережений результат без запиту до Router.
# reeval_request → clear_lock → продовжуємо до Router.
_vg_caption_text = caption.strip() if caption else ""
if agent_config.agent_id == "agromatrix" and _vg_should_skip(
agent_config.agent_id, chat_id, file_id, _vg_caption_text,
file_unique_id=file_unique_id,
):
_vg_lock = _vg_get_lock(agent_config.agent_id, chat_id)
_vg_reply = _vg_build_locked_reply(_vg_lock, _vg_caption_text)
logger.info(
"vision_skip_reanalysis agent=%s chat_id=%s photo_key=%s label=%s",
agent_config.agent_id, chat_id,
file_unique_id or file_id, _vg_lock.get("label", "?"),
)
telegram_token = agent_config.get_telegram_token() or ""
if telegram_token:
await send_telegram_message(chat_id, _vg_reply, telegram_token)
return {"ok": True, "skipped": True, "reason": "vision_lock_same_photo"}
try:
# Get file path from Telegram
telegram_token = agent_config.get_telegram_token()
@@ -1796,6 +2154,32 @@ async def process_photo(
answer_text = response.get("data", {}).get("text") or response.get("response", "")
if answer_text:
# ── VISION CONSISTENCY GUARD: Hooks A+B ──────────────────────
# A: persist lock (label + confidence) keyed by file_unique_id
if agent_config.agent_id == "agromatrix":
try:
_vg_label, _vg_conf = _vg_extract_label(answer_text)
_vg_set_lock(
agent_config.agent_id, chat_id, file_id,
_vg_label, _vg_conf,
file_unique_id=file_unique_id,
)
logger.info(
"vision_lock_set agent=%s chat_id=%s photo_key=%s label=%s conf=%s",
agent_config.agent_id, chat_id,
file_unique_id or file_id, _vg_label, _vg_conf,
)
except Exception:
pass
# B: low-confidence → append clarifier if not already present
answer_text, _vg_low_added = _vg_build_low_conf(answer_text)
if _vg_low_added:
logger.info(
"vision_low_conf_clarifier_added agent=%s chat_id=%s",
agent_config.agent_id, chat_id,
)
# ─────────────────────────────────────────────────────────────
# Photo processed - send LLM response directly
await send_telegram_message(
chat_id,
@@ -1941,7 +2325,8 @@ async def process_document(
dao_id=dao_id,
user_id=f"tg:{user_id}",
output_mode="qa_pairs",
metadata={"username": username, "chat_id": chat_id}
metadata={"username": username, "chat_id": chat_id},
agent_id=agent_config.agent_id,
)
if not result.success:
@@ -1953,7 +2338,42 @@ async def process_document(
if not doc_text and result.chunks_meta:
chunks = result.chunks_meta.get("chunks", [])
doc_text = "\n".join(chunks[:5]) if chunks else ""
# v3.2 Doc Bridge: зберігаємо parsed text щоб Stepan міг відповідати на питання
if doc_text and result.doc_id:
try:
from services.doc_service import save_doc_context as _save_doc_ctx
await _save_doc_ctx(
session_id=session_id,
doc_id=result.doc_id,
doc_url=file_url,
file_name=file_name,
dao_id=dao_id,
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
extracted_summary=doc_text[:4000],
)
logger.info(f"Doc Bridge: saved extracted_summary ({len(doc_text)} chars) for doc_id={result.doc_id}")
except Exception as _dbe:
logger.warning(f"Doc Bridge save_doc_context failed (non-blocking): {_dbe}")
# v3.3 Doc Handoff: зберігаємо chat-scoped ключ (пріоритет для Stepan)
try:
from services.doc_service import _sanitize_summary as _ss
_file_unique = document.get("file_unique_id") or result.doc_id
await save_chat_doc_context(
chat_id=chat_id,
agent_id=agent_config.agent_id,
doc_ctx={
"doc_id": result.doc_id,
"file_unique_id": _file_unique,
"file_name": file_name,
"extracted_summary": _ss(doc_text)[:4000],
"source": "telegram",
},
)
except Exception as _cdbe:
logger.warning("Doc Handoff: save_chat_doc_context failed: %s", _cdbe)
# Ask LLM to summarize the document (human-friendly)
if doc_text:
zip_hint = None
@@ -3097,7 +3517,7 @@ async def handle_telegram_webhook(
# Check if there's a document context for follow-up questions
session_id = f"telegram:{chat_id}"
doc_context = await get_doc_context(session_id)
doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id)
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id:
@@ -3788,7 +4208,8 @@ async def _old_telegram_webhook(update: TelegramUpdate):
dao_id=dao_id,
user_id=f"tg:{user_id}",
output_mode="qa_pairs",
metadata={"username": username, "chat_id": chat_id}
metadata={"username": username, "chat_id": chat_id},
agent_id=agent_config.agent_id,
)
if not result.success:
@@ -3991,7 +4412,7 @@ async def _old_telegram_webhook(update: TelegramUpdate):
# Check if there's a document context for follow-up questions
session_id = f"telegram:{chat_id}"
doc_context = await get_doc_context(session_id)
doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id)
# If there's a doc_id and the message looks like a question about the document
if doc_context and doc_context.doc_id: