From 90080c632ab1edd3abee066cc6f30e3c5b0a86b9 Mon Sep 17 00:00:00 2001 From: Apple Date: Fri, 27 Feb 2026 03:20:13 -0800 Subject: [PATCH] fix(fabric): use broadcast subject for NATS capabilities discovery NATS wildcards (node.*.capabilities.get) only work for subscriptions, not for publish. Switch to a dedicated broadcast subject (fabric.capabilities.discover) that all NCS instances subscribe to, enabling proper scatter-gather discovery across nodes. Made-with: Cursor --- .env.example | 10 + config/agent_registry.yml | 2 +- crews/agromatrix_crew/agents/iot_agent.py | 8 +- .../agents/operations_agent.py | 27 +- .../agromatrix_crew/agents/platform_agent.py | 8 +- .../agents/spreadsheet_agent.py | 7 +- .../agents/stepan_orchestrator.py | 38 +- .../agents/sustainability_agent.py | 7 +- crews/agromatrix_crew/audit.py | 14 +- crews/agromatrix_crew/operator_commands.py | 307 + crews/agromatrix_crew/run.py | 869 ++- docker-compose.staging.yml | 7 + docs/backups/LATEST.txt | 2 +- gateway-bot/app.py | 76 +- gateway-bot/http_api.py | 509 +- gateway-bot/services/doc_service.py | 1113 +-- .../agromatrix_tools/common.py | 99 +- .../agromatrix_tools/tool_farmos_read.py | 537 +- router-config.yml | 74 +- services/memory-service/Dockerfile | 6 +- services/memory-service/app/main.py | 18 + services/memory-service/app/vector_store.py | 30 +- services/memory-service/requirements.txt | 7 +- services/node-capabilities/main.py | 4 +- services/router/agent_tools_config.py | 201 +- services/router/global_capabilities_client.py | 2 +- services/router/router-config.yml | 8 +- services/router/tool_manager.py | 6352 +++++++++++++++-- 28 files changed, 8883 insertions(+), 1459 deletions(-) diff --git a/.env.example b/.env.example index 5abc4a1d..b602add7 100644 --- a/.env.example +++ b/.env.example @@ -59,6 +59,16 @@ DEEPSEEK_BASE_URL=https://api.deepseek.com # OpenAI API (optional) OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +# Notion integration (optional) +NOTION_API_KEY= +NOTION_VERSION=2022-06-28 + +# OpenCode HTTP endpoint for status probe (optional) +OPENCODE_URL= + +# Optional per-node SSH auth (used by sofiia-console /api/nodes/ssh/status) +NODES_NODA1_SSH_PASSWORD= + # ----------------------------------------------------------------------------- # DAGI Router Configuration # ----------------------------------------------------------------------------- diff --git a/config/agent_registry.yml b/config/agent_registry.yml index 672dc8bc..f2567358 100644 --- a/config/agent_registry.yml +++ b/config/agent_registry.yml @@ -1304,7 +1304,7 @@ agents: - security - evolution - llm_profile: reasoning + llm_profile: grok prompt_file: sofiia_prompt.txt access_control: diff --git a/crews/agromatrix_crew/agents/iot_agent.py b/crews/agromatrix_crew/agents/iot_agent.py index a84bb5f8..08ce271c 100644 --- a/crews/agromatrix_crew/agents/iot_agent.py +++ b/crews/agromatrix_crew/agents/iot_agent.py @@ -1,5 +1,5 @@ from crewai import Agent -from crews.agromatrix_crew import tools +from crews.agromatrix_crew.llm_factory import make_llm def build_iot(): @@ -7,10 +7,8 @@ def build_iot(): role="IoT Agent", goal="Читати телеметрію ThingsBoard і публікувати події в NATS.", backstory="Доступ лише через ThingsBoard/NATS інструменти.", - tools=[ - tools.tool_thingsboard_read, - tools.tool_event_bus - ], + tools=[], + llm=make_llm(), allow_delegation=False, verbose=True ) diff --git a/crews/agromatrix_crew/agents/operations_agent.py b/crews/agromatrix_crew/agents/operations_agent.py index f896d7ed..2a451dc2 100644 --- a/crews/agromatrix_crew/agents/operations_agent.py +++ b/crews/agromatrix_crew/agents/operations_agent.py @@ -1,5 +1,24 @@ from crewai import Agent -from crews.agromatrix_crew import tools +from crews.agromatrix_crew.llm_factory import make_llm + +# v4.3/v4.4: farmos tools — fail-safe import +# Якщо agromatrix_tools недоступні в середовищі → tools залишається порожнім. +_farmos_tools: list = [] +try: + from agromatrix_tools.tool_farmos_read import farmos_ping as _farmos_ping + _farmos_tools.append(_farmos_ping) +except Exception: + pass +try: + from agromatrix_tools.tool_farmos_read import farmos_read_logs as _farmos_read_logs + _farmos_tools.append(_farmos_read_logs) +except Exception: + pass +try: + from agromatrix_tools.tool_farmos_read import farmos_search_assets as _farmos_search_assets + _farmos_tools.append(_farmos_search_assets) +except Exception: + pass def build_operations(): @@ -7,10 +26,8 @@ def build_operations(): role="Operations Agent", goal="Операційні дії по farmOS (читання/через integration write).", backstory="Ти працюєш з farmOS лише через інструменти. Прямі записи заборонені.", - tools=[ - tools.tool_farmos_read, - tools.tool_integration_write - ], + tools=_farmos_tools, + llm=make_llm(), allow_delegation=False, verbose=True ) diff --git a/crews/agromatrix_crew/agents/platform_agent.py b/crews/agromatrix_crew/agents/platform_agent.py index 8e1068c9..45fe25a2 100644 --- a/crews/agromatrix_crew/agents/platform_agent.py +++ b/crews/agromatrix_crew/agents/platform_agent.py @@ -1,5 +1,5 @@ from crewai import Agent -from crews.agromatrix_crew import tools +from crews.agromatrix_crew.llm_factory import make_llm def build_platform(): @@ -7,10 +7,8 @@ def build_platform(): role="Platform Agent", goal="Платформна перевірка стану сервісів/інтеграцій.", backstory="Доступ лише через інструменти подій/читання.", - tools=[ - tools.tool_event_bus, - tools.tool_farmos_read - ], + tools=[], + llm=make_llm(), allow_delegation=False, verbose=True ) diff --git a/crews/agromatrix_crew/agents/spreadsheet_agent.py b/crews/agromatrix_crew/agents/spreadsheet_agent.py index aa336842..388711d5 100644 --- a/crews/agromatrix_crew/agents/spreadsheet_agent.py +++ b/crews/agromatrix_crew/agents/spreadsheet_agent.py @@ -1,5 +1,5 @@ from crewai import Agent -from crews.agromatrix_crew import tools +from crews.agromatrix_crew.llm_factory import make_llm def build_spreadsheet(): @@ -7,9 +7,8 @@ def build_spreadsheet(): role="Spreadsheet Agent", goal="Читати/редагувати/створювати XLSX файли та формувати артефакти.", backstory="Використовує лише spreadsheet інструмент.", - tools=[ - tools.tool_spreadsheet - ], + tools=[], + llm=make_llm(), allow_delegation=False, verbose=True ) diff --git a/crews/agromatrix_crew/agents/stepan_orchestrator.py b/crews/agromatrix_crew/agents/stepan_orchestrator.py index 45978dc1..65509404 100644 --- a/crews/agromatrix_crew/agents/stepan_orchestrator.py +++ b/crews/agromatrix_crew/agents/stepan_orchestrator.py @@ -1,11 +1,39 @@ +from pathlib import Path from crewai import Agent +from crews.agromatrix_crew.llm_factory import make_llm + +_PROMPT_PATH = Path(__file__).parent.parent / "stepan_system_prompt_v2.txt" -def build_stepan(): +def _load_system_prompt() -> str: + try: + return _PROMPT_PATH.read_text(encoding="utf-8") + except Exception: + return ( + "Ти — Степан, операційний агент AgroMatrix. " + "Говориш коротко, по ділу, живою українською мовою. " + "Не пишеш сервісних повідомлень. Відповідаєш прямо." + ) + + +def build_stepan(style_prefix: str = "") -> Agent: + """ + Будує агента Степана. + style_prefix — персоналізований prefix від style_adapter.build_style_prefix(). + Якщо не передано — використовується базовий системний промпт. + """ + backstory = _load_system_prompt() + if style_prefix: + backstory = style_prefix.strip() + "\n\n" + backstory return Agent( - role="Stepan (AgroMatrix Orchestrator)", - goal="Керувати запитами користувача через делегування під-агентам і повертати єдину відповідь.", - backstory="Ти єдиний канал спілкування з користувачем. Під-агенти працюють лише через інструменти.", + role="Stepan (AgroMatrix Operational Agent)", + goal=( + "Відповідати на запити точно і людяно. " + "Делегувати під-агентам лише якщо без них неможливо. " + "Повертати консолідовану відповідь без технічного сміття." + ), + backstory=backstory, + llm=make_llm(), allow_delegation=True, - verbose=True + verbose=False, ) diff --git a/crews/agromatrix_crew/agents/sustainability_agent.py b/crews/agromatrix_crew/agents/sustainability_agent.py index 0b6adff6..30992683 100644 --- a/crews/agromatrix_crew/agents/sustainability_agent.py +++ b/crews/agromatrix_crew/agents/sustainability_agent.py @@ -1,5 +1,5 @@ from crewai import Agent -from crews.agromatrix_crew import tools +from crews.agromatrix_crew.llm_factory import make_llm def build_sustainability(): @@ -7,9 +7,8 @@ def build_sustainability(): role="Sustainability Agent", goal="Агрегати та аналітика (LiteFarm read-only).", backstory="Працює лише з read-only LiteFarm інструментом.", - tools=[ - tools.tool_litefarm_read - ], + tools=[], + llm=make_llm(), allow_delegation=False, verbose=True ) diff --git a/crews/agromatrix_crew/audit.py b/crews/agromatrix_crew/audit.py index e01ce48d..958ae33e 100644 --- a/crews/agromatrix_crew/audit.py +++ b/crews/agromatrix_crew/audit.py @@ -8,7 +8,8 @@ from nats.aio.client import Client as NATS import asyncio NATS_URL = os.getenv('NATS_URL', 'nats://localhost:4222') -AUDIT_FILE = os.getenv('AGX_AUDIT_FILE', 'artifacts/audit.log.jsonl') +# Default: /app/logs (mounted rw volume) або /tmp як fallback +AUDIT_FILE = os.getenv('AGX_AUDIT_FILE', '/app/logs/stepan_audit.log.jsonl') def _hash(text: str): @@ -17,16 +18,19 @@ def _hash(text: str): async def _publish_nats(subject: str, payload: dict): nc = NATS() - await nc.connect(servers=[NATS_URL]) + await nc.connect(servers=[NATS_URL], connect_timeout=2) await nc.publish(subject, json.dumps(payload).encode()) await nc.flush(1) await nc.drain() def audit_event(event: dict): - Path(AUDIT_FILE).parent.mkdir(parents=True, exist_ok=True) - with open(AUDIT_FILE, 'a', encoding='utf-8') as f: - f.write(json.dumps(event, ensure_ascii=False) + '\n') + try: + Path(AUDIT_FILE).parent.mkdir(parents=True, exist_ok=True) + with open(AUDIT_FILE, 'a', encoding='utf-8') as f: + f.write(json.dumps(event, ensure_ascii=False) + '\n') + except Exception: + pass try: asyncio.run(_publish_nats('agx.audit.delegation', event)) except Exception: diff --git a/crews/agromatrix_crew/operator_commands.py b/crews/agromatrix_crew/operator_commands.py index 80155398..177848fe 100644 --- a/crews/agromatrix_crew/operator_commands.py +++ b/crews/agromatrix_crew/operator_commands.py @@ -1,3 +1,13 @@ +""" +Operator commands for AgroMatrix (Stepan). Access control and slash commands. + +Access control (env, used by gateway and here): +- AGX_OPERATOR_IDS: comma-separated Telegram user_id list; only these users are operators. +- AGX_OPERATOR_CHAT_ID: optional; if set, operator actions allowed only in this chat_id. + +When is_operator(user_id, chat_id) is True, gateway routes any message (not only slash) +to Stepan for human-friendly operator interaction. +""" import os import re import shlex @@ -16,6 +26,9 @@ OPERATOR_COMMANDS = { "reject", "apply_dict", "pending_stats", + "doc", # v3.5: Doc Focus Gate control (/doc on|off|status) + "farmos", # v4.3: FarmOS healthcheck (/farmos | /farmos status) + "farm", # v4.6: Farm state snapshot (/farm state) } @@ -375,4 +388,298 @@ def route_operator_command(text: str, user_id: str | None, chat_id: str | None): if cmd == 'pending_stats': return handle_stats() + # ── /doc [on|off|status] (v3.5: Doc Focus Gate) ───────────────────────── + if cmd == 'doc': + sub = args[0].lower() if args else "status" + from crews.agromatrix_crew.doc_focus import handle_doc_focus as _hdf + return _hdf(sub, chat_id=chat_id) + + # ── /farmos [status] (v4.3: FarmOS healthcheck) ────────────────────────── + if cmd == 'farmos': + return handle_farmos_status(args) + + # ── /farm state (v4.6: FarmOS → Farm State Snapshot) ───────────────────── + if cmd == 'farm': + return handle_farm_command(args, chat_id=chat_id) + return _wrap('unknown command') + + +def handle_farmos_status(args: list) -> dict: + """ + /farmos [status|logs [log_type] [limit]] — FarmOS diagnostics. + Fail-closed: будь-яка внутрішня помилка → зрозумілий текст. + Не виводить URL або токени. + + Subcommands: + (no args) | status — healthcheck ping + logs [log_type] [limit] — останні записи farmOS + """ + sub = args[0].lower() if args else "status" + + # ── /farmos logs [log_type] [limit] ────────────────────────────────────── + if sub == "logs": + log_type = "activity" + limit = 10 + if len(args) >= 2: + log_type = args[1].lower() + if len(args) >= 3: + try: + limit = int(args[2]) + except ValueError: + pass + try: + from agromatrix_tools.tool_farmos_read import _farmos_read_logs_impl + result_text = _farmos_read_logs_impl(log_type=log_type, limit=limit) + except Exception as exc: + result_text = f"FarmOS logs: внутрішня помилка ({type(exc).__name__})." + _tlog_farmos_cmd("farmos_logs_cmd", ok=not result_text.startswith("FarmOS:"), + sub="logs", log_type=log_type) + return _wrap(result_text) + + # ── /farmos або /farmos status ──────────────────────────────────────────── + if sub not in ("status",): + return _wrap( + "Команда farmos: підтримується /farmos, /farmos status або /farmos logs [log_type] [limit]." + ) + + try: + from agromatrix_tools.tool_farmos_read import _farmos_ping_impl + status_text = _farmos_ping_impl() + except Exception as exc: + status_text = f"FarmOS status недоступний: внутрішня помилка виконання ({type(exc).__name__})." + + ok = status_text.startswith("FarmOS доступний") + _tlog_farmos_cmd("farmos_status_cmd", ok=ok, sub="status") + return _wrap(status_text) + + +def _tlog_farmos_cmd(event: str, ok: bool, sub: str = "", log_type: str = "") -> None: + """PII-safe telemetry для farmos operator commands.""" + try: + import logging as _logging + extra = f" sub={sub}" if sub else "" + extra += f" log_type={log_type}" if log_type else "" + _logging.getLogger(__name__).info( + "AGX_STEPAN_METRIC %s ok=%s%s", event, ok, extra, + ) + except Exception: + pass + + +# ── v4.6: /farm state — FarmOS → Farm State Snapshot ───────────────────────── +# +# Smoke checklist (manual): +# /farmos → FarmOS ping still works (regression) +# /farm → "підтримується тільки /farm state" +# /farm state (no env) → "FarmOS не налаштований..." +# /farm state (env ok) → snapshot text, logs show farm_state_cmd_saved ok=true +# /farm foo → unknown subcommand message + +_FARM_STATE_ASSET_QUERIES: list[tuple[str, int]] = [ + ("asset_land", 10), + ("asset_plant", 10), + ("asset_equipment", 5), +] + +_FARM_STATE_LABELS: dict[str, str] = { + "asset_land": "Поля", + "asset_plant": "Культури/рослини", + "asset_equipment": "Техніка", +} + +# Максимальна довжина snapshot-тексту +_FARM_STATE_MAX_CHARS = 900 + + +def handle_farm_command(args: list, chat_id: str | None = None) -> dict: + """ + /farm state — збирає snapshot активів з FarmOS і зберігає в memory-service. + Fail-closed: будь-яка помилка → зрозумілий текст, не кидає. + + Smoke checklist (manual): + /farm → only /farm state supported + /farm state (no env) → config missing message + /farm state (env) → snapshot + saved to memory + """ + sub = args[0].lower() if args else "" + + if sub != "state": + return _wrap( + "Команда farm: підтримується тільки /farm state." + ) + + try: + return _handle_farm_state(chat_id=chat_id) + except Exception as exc: + import logging as _logging + _logging.getLogger(__name__).warning( + "handle_farm_command unexpected error: %s", exc + ) + return _wrap("Farm state: не вдалося отримати дані (перевір FarmOS / мережу).") + + +def _handle_farm_state(chat_id: str | None) -> dict: + """Ядро логіки /farm state. Викликається тільки з handle_farm_command.""" + import logging as _logging + _log = _logging.getLogger(__name__) + + _log.info("AGX_STEPAN_METRIC farm_state_cmd_started chat_id=h:%s", + str(chat_id or "")[:6]) + + # ── Крок 1: перевірка FarmOS доступності ───────────────────────────────── + try: + from agromatrix_tools.tool_farmos_read import _farmos_ping_impl + ping_result = _farmos_ping_impl() + except Exception as exc: + ping_result = f"FarmOS: помилка перевірки ({type(exc).__name__})." + + if not ping_result.startswith("FarmOS доступний"): + return _wrap(ping_result) + + # ── Крок 2: запит активів по трьох типах ───────────────────────────────── + counts: dict[str, int] = {} + tops: dict[str, list[str]] = {} + + try: + from agromatrix_tools.tool_farmos_read import _farmos_search_assets_impl + except Exception: + return _wrap("Farm state: не вдалося отримати дані (agromatrix_tools недоступні).") + + for asset_type, limit in _FARM_STATE_ASSET_QUERIES: + try: + raw = _farmos_search_assets_impl(asset_type=asset_type, limit=limit) + items = _parse_asset_lines(raw) + except Exception: + items = [] + counts[asset_type] = len(items) + # top-3 labels (тільки назва, без UUID) + tops[asset_type] = [_label_from_asset_line(ln) for ln in items[:3]] + + # ── Крок 3: формуємо snapshot-текст ────────────────────────────────────── + snapshot_text = _build_snapshot_text(counts, tops) + + # ── Крок 4: зберігаємо в memory-service ────────────────────────────────── + save_ok = False + save_suffix = "" + if chat_id: + save_ok = _save_farm_state_snapshot(chat_id, counts, tops, snapshot_text) + if not save_ok: + save_suffix = "\n(Не зміг зберегти в пам'ять.)" + + _log.info( + "AGX_STEPAN_METRIC farm_state_cmd_saved ok=%s reason=%s " + "land=%s plant=%s equip=%s", + save_ok, + "saved" if save_ok else ("no_chat_id" if not chat_id else "memory_error"), + counts.get("asset_land", 0), + counts.get("asset_plant", 0), + counts.get("asset_equipment", 0), + ) + + return _wrap(snapshot_text + save_suffix) + + +def _parse_asset_lines(raw: str) -> list[str]: + """ + Парсить рядки виду "- label | type | id=xxxx" або "FarmOS: ...". + Повертає лише рядки що починаються з "- ". + Fail-safe: якщо raw не такого формату — повертає []. + """ + try: + lines = [ln.strip() for ln in str(raw).split("\n") if ln.strip().startswith("- ")] + return lines + except Exception: + return [] + + +def _label_from_asset_line(line: str) -> str: + """ + Витягує label з рядка "- label | type | id=xxxx". + Повертає перший сегмент після "- ", обрізаний. + """ + try: + content = line.lstrip("- ").strip() + return content.split("|")[0].strip() + except Exception: + return line.strip() + + +def _build_snapshot_text( + counts: dict[str, int], + tops: dict[str, list[str]], +) -> str: + """Формує human-readable snapshot ≤ _FARM_STATE_MAX_CHARS символів.""" + total = sum(counts.values()) + if total == 0: + return ( + "FarmOS: немає даних по assets " + "(або типи відрізняються у вашій інстанції)." + ) + + lines = ["Farm state (FarmOS):"] + for asset_type, label in _FARM_STATE_LABELS.items(): + n = counts.get(asset_type, 0) + top = tops.get(asset_type, []) + if top: + top_str = ", ".join(top[:3]) + lines.append(f"- {label}: {n} (топ: {top_str})") + else: + lines.append(f"- {label}: {n}") + + text = "\n".join(lines) + # Детермінований hard cap + if len(text) > _FARM_STATE_MAX_CHARS: + text = text[:_FARM_STATE_MAX_CHARS].rsplit("\n", 1)[0] + return text + + +def _save_farm_state_snapshot( + chat_id: str, + counts: dict[str, int], + tops: dict[str, list[str]], + snapshot_text: str, +) -> bool: + """ + Зберігає snapshot у memory-service під ключем + farm_state:agromatrix:chat:{chat_id}. + Fail-closed: повертає True/False, не кидає. + """ + try: + from datetime import datetime, timezone + fact_key = f"farm_state:agromatrix:chat:{chat_id}" + # synthetic_uid — той самий паттерн що в memory_manager.py + synthetic_uid = f"farm:{chat_id}" + + payload = { + "_version": 1, + "source": "farmos", + "generated_at": datetime.now(timezone.utc).isoformat(), + "counts": counts, + "top": tops, + "text": snapshot_text, + } + + import os + import json + import httpx + mem_url = os.getenv( + "AGX_MEMORY_SERVICE_URL", + os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"), + ) + resp = httpx.post( + f"{mem_url}/facts/upsert", + json={ + "user_id": synthetic_uid, + "fact_key": fact_key, + "fact_value_json": payload, + }, + timeout=3.0, + ) + return resp.status_code in (200, 201) + except Exception as exc: + import logging as _logging + _logging.getLogger(__name__).debug( + "farm_state snapshot save failed: %s", exc + ) + return False diff --git a/crews/agromatrix_crew/run.py b/crews/agromatrix_crew/run.py index 52a1853e..4f77535c 100644 --- a/crews/agromatrix_crew/run.py +++ b/crews/agromatrix_crew/run.py @@ -1,6 +1,8 @@ import sys import os import json +import logging +import re import subprocess from pathlib import Path from crewai import Crew, Task @@ -14,6 +16,172 @@ from crews.agromatrix_crew.audit import audit_event, new_trace from agromatrix_tools import tool_dictionary from agromatrix_tools import tool_operation_plan from crews.agromatrix_crew.operator_commands import route_operator_command, route_operator_text +from crews.agromatrix_crew.memory_manager import ( + load_user_profile, + save_user_profile, + load_farm_profile, + save_farm_profile, + update_profile_if_needed, +) +from crews.agromatrix_crew.style_adapter import adapt_response_style, build_style_prefix +from crews.agromatrix_crew.reflection_engine import reflect_on_response +from crews.agromatrix_crew.light_reply import build_light_reply, classify_light_event +from crews.agromatrix_crew.depth_classifier import classify_depth +from crews.agromatrix_crew.telemetry import tlog +from crews.agromatrix_crew.session_context import ( + load_session, + update_session, + is_doc_focus_active, + is_doc_focus_cooldown_active, + DOC_FOCUS_TTL, + DOC_FOCUS_COOLDOWN_S, +) +from crews.agromatrix_crew.proactivity import maybe_add_proactivity +from crews.agromatrix_crew.doc_facts import ( + extract_doc_facts, + merge_doc_facts, + can_answer_from_facts, + compute_scenario, + format_facts_as_text, + extract_fact_claims, + build_self_correction, +) + +logger = logging.getLogger(__name__) + +# ── Doc Focus Gate helpers (v3.5 / v3.6 / v3.7) ──────────────────────────── +# Логіка винесена в doc_focus.py (без залежностей від crewai/agromatrix_tools) +from crews.agromatrix_crew.doc_focus import ( # noqa: E402 + _is_doc_question, + _detect_domain, + detect_context_signals, + build_mode_clarifier, +) +from crews.agromatrix_crew.farm_state import ( # noqa: E402 + detect_farm_state_updates, + update_farm_state, + build_farm_state_prefix, +) + +# ── v4.2: Vision → Agronomy Bridge ────────────────────────────────────────── +# Fail-safe lazy import: vision_guard живе в gateway-bot/, не в crews/. +# Якщо модуль недоступний (наприклад, юніт-тести без gateway-bot) — мовчки +# пропускаємо; вся логіка нижче захищена try/except. +def _vb_get_vision_lock(agent_id: str, chat_id: str) -> dict: + try: + from vision_guard import get_vision_lock as _gvl + return _gvl(agent_id, chat_id) or {} + except Exception: + return {} + + +# ── v4.7: FarmOS Farm State Bridge ─────────────────────────────────────────── +# Читаємо збережений /farm state snapshot з memory-service. +# Fail-closed: timeout 2s, не кидає виняток, не блокує відповідь. +# Максимальний вік snapshot: 24h (86400s). +_FARM_STATE_SNAPSHOT_TTL_S = 86400.0 + + +def _load_farm_state_snapshot(chat_id: str) -> str | None: + """ + Завантажує snapshot тексту з memory-service (ключ farm_state:agromatrix:chat:{chat_id}). + Повертає текст якщо snapshot є і не старший за 24h, інакше None. + Fail-closed: будь-яка помилка → None. + """ + try: + import httpx + import json as _json + from datetime import datetime, timezone + + mem_url = os.getenv( + "AGX_MEMORY_SERVICE_URL", + os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"), + ) + fact_key = f"farm_state:agromatrix:chat:{chat_id}" + synthetic_uid = f"farm:{chat_id}" + + # memory-service API: GET /facts/{fact_key}?user_id=... + # (не /facts/get — той endpoint не існує) + resp = httpx.get( + f"{mem_url}/facts/{fact_key}", + params={"user_id": synthetic_uid}, + timeout=2.0, + ) + if resp.status_code != 200: + return None + + data = resp.json() + # fact_value_json може повертатися як рядок або dict + val = data.get("fact_value_json") or data.get("fact_value") + if isinstance(val, str): + try: + val = _json.loads(val) + except Exception: + return None + if not isinstance(val, dict): + return None + + # Перевірка TTL: generated_at має бути не старішим за 24h + generated_at = val.get("generated_at", "") + if generated_at: + try: + ts = datetime.fromisoformat(generated_at.replace("Z", "+00:00")) + age_s = (datetime.now(timezone.utc) - ts).total_seconds() + if age_s > _FARM_STATE_SNAPSHOT_TTL_S: + return None + except Exception: + pass # якщо не можемо перевірити вік — все одно повертаємо snapshot + + text = val.get("text", "").strip() + return text if text else None + + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Light / Deep activation layer — реалізація у depth_classifier.py +# --------------------------------------------------------------------------- + + +def _stepan_light_response(text: str, stepan, trace: dict, user_profile: dict | None) -> str: + """ + Відповідь Степана у Light mode. + + Спочатку намагається побудувати детерміновану (seeded) відповідь без LLM — + для чітких greeting/thanks/ack/short_followup подій. + Якщо без-LLM відповідь є — повертає її одразу (нульова затримка). + Інакше — LLM одним агентом без делегування. + Логує crew_launch=false. + """ + logger.info("crew_launch=false depth=light") + + # Fast path: no LLM needed for clear social events + fast_reply = build_light_reply(text, user_profile) + if fast_reply: + audit_event({**trace, 'agent': 'stepan', 'action': 'light_nollm'}) + return fast_reply + + # LLM path: single Stepan task, no sub-agents + task = Task( + description=( + f"Повідомлення від користувача: {text}\n\n" + "Дай коротку, природну, людську відповідь. " + "Не запускай жодних операційних інструментів. " + "Не форматуй як JSON. " + "Не починай з 'Звісно', 'Чудово', 'Дозвольте'. " + "Одне питання максимум, якщо потрібно." + ), + expected_output="Коротка розмовна відповідь українською, 1–4 речення.", + agent=stepan, + ) + crew = Crew(agents=[stepan], tasks=[task], verbose=False) + result = crew.kickoff() + audit_event({**trace, 'agent': 'stepan', 'action': 'light_response'}) + return adapt_response_style(str(result), user_profile) + + +# --------------------------------------------------------------------------- def farmos_ui_hint(): @@ -97,7 +265,22 @@ def run_task_with_retry(agent, description: str, trace_id: str, max_retries: int } -def handle_message(text: str, user_id: str = '', chat_id: str = '', trace_id: str = '', ops_mode: bool = False, last_pending_list: list | None = None) -> str: +def handle_message( + text: str, + user_id: str = '', + chat_id: str = '', + trace_id: str = '', + ops_mode: bool = False, + last_pending_list: list | None = None, + # injected by memory layer (Промт 3); None until that layer is wired + user_profile: dict | None = None, + farm_profile: dict | None = None, + has_doc_context: bool = False, + # Doc Bridge (v3.1): короткий контекст активного документа з gateway + doc_context: dict | None = None, + # Chat History Bridge (v3.2): текст переписки з memory-service (до 40 повідомлень) + chat_history: str = '', +) -> str: trace = new_trace(text) if trace_id: trace['trace_id'] = trace_id @@ -107,7 +290,14 @@ def handle_message(text: str, user_id: str = '', chat_id: str = '', trace_id: st os.environ['AGX_CHAT_ID'] = str(chat_id) os.environ['AGX_OPS_MODE'] = '1' if ops_mode else '0' - # operator commands + # Load profiles (fail-safe: always returns a valid dict) + if user_profile is None and user_id: + user_profile = load_user_profile(str(user_id)) + if farm_profile is None and chat_id: + # v2.8: pass user_id for lazy legacy migration + farm_profile = load_farm_profile(str(chat_id), user_id=str(user_id) if user_id else None) + + # operator commands (unchanged routing) if text.strip().startswith('/'): op_res = route_operator_command(text, str(user_id), str(chat_id)) if op_res: @@ -117,100 +307,621 @@ def handle_message(text: str, user_id: str = '', chat_id: str = '', trace_id: st if op_res: return json.dumps(op_res, ensure_ascii=False) - stepan = build_stepan() - ops = build_operations() - iot = build_iot() - platform = build_platform() - spreadsheet = build_spreadsheet() - sustainability = build_sustainability() + # Load session context (v3: TTL 15 min, in-memory) + session = load_session(str(chat_id)) - audit_event({**trace, 'agent': 'stepan', 'action': 'intake'}) + # ── v4: FARM STATE UPDATE ──────────────────────────────────────────────── + # Оновлюємо farm_state до будь-якої іншої логіки (ізольовано, fail-safe). + # Не впливає на doc_mode, depth classifier, memory_manager. + _farm_updates = detect_farm_state_updates(text or "") + if _farm_updates: + update_farm_state(session, _farm_updates) + tlog(logger, "farm_state_updated", + chat_id=str(chat_id), + fields=",".join(_farm_updates.keys())) - # Preflight normalization - norm = tool_dictionary.normalize_from_text(text, trace_id=trace['trace_id'], source='telegram') - pending = [item for cat in norm.values() for item in cat if item.get('status') == 'pending'] - if pending: - lines = ["=== PENDING TERMS (Stepan) ==="] - for item in pending: - lines.append(f"- {item.get('term')}: {item.get('suggestions', [])[:3]}") - lines.append("\nБудь ласка, уточніть невідомі терміни. Після підтвердження я продовжу.") - return "\n".join(lines) + # ── v4.2: VISION → AGRONOMY BRIDGE ────────────────────────────────────── + # Читаємо vision lock (per agent_id:chat_id) і зберігаємо label у session. + # Тільки якщо lock свіжий (TTL перевіряє get_vision_lock всередині). + # User override ("це соняшник") вже записаний у lock.user_label. + # Fail-safe: будь-яка помилка → пропускаємо. + _agent_id_str = os.getenv("AGX_AGENT_ID", "agromatrix") + try: + _vb_lock = _vb_get_vision_lock(_agent_id_str, str(chat_id)) + if _vb_lock: + _vb_label = (_vb_lock.get("user_label") or _vb_lock.get("label") or "").strip() + if _vb_label: + session["vision_last_label"] = _vb_label + tlog(logger, "vision_bridge_label_loaded", + chat_id=str(chat_id), label=_vb_label) + except Exception: + pass + + # ── v4.2: USER TEXT OVERRIDE for vision_last_label ─────────────────────── + # Якщо юзер явно змінює культуру текстом ("тепер це кукурудза") → + # перезаписуємо vision_last_label безпосередньо. + # Використовуємо той самий detect_user_override з vision_guard (fail-safe). + if text: + try: + from vision_guard import detect_user_override as _vb_detect_override + _vb_text_label = _vb_detect_override(text) + if _vb_text_label: + session["vision_last_label"] = _vb_text_label + tlog(logger, "vision_bridge_text_override", + chat_id=str(chat_id), label=_vb_text_label) + except Exception: + pass + + # ── DOC CONTEXT FALLBACK (v3.3) ────────────────────────────────────────── + # Якщо gateway не передав doc_context — пробуємо підтягнути chat-scoped з memory. + # Це дозволяє Stepan бачити документ навіть якщо gateway не синхронізований. + if not doc_context and chat_id: + try: + import httpx as _httpx + import os as _os + _mem_url = _os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000") + _agent_id_env = _os.getenv("AGX_AGENT_ID", "agromatrix") + _aid = _agent_id_env.lower() + _fact_user = f"chat:{_aid}:{chat_id}" + _fact_key = f"doc_context_chat:{_aid}:{chat_id}" + with _httpx.Client(timeout=3.0) as _hc: + _r = _hc.post( + f"{_mem_url}/facts/get", + json={"user_id": _fact_user, "fact_key": _fact_key}, + ) + if _r.status_code == 200: + _fdata = _r.json() + _fval = _fdata.get("fact_value_json") or {} + if isinstance(_fval, str): + import json as _json + _fval = _json.loads(_fval) + if _fval.get("doc_id") or _fval.get("file_unique_id"): + doc_context = _fval + has_doc_context = True + tlog(logger, "doc_context_fallback_loaded", + chat_id=str(chat_id), + doc_id=str(_fval.get("doc_id", ""))[:16]) + except Exception as _fbe: + logger.debug("Doc context fallback failed (non-blocking): %s", _fbe) + + # ── DOC ANCHOR RESET (v3.3) ────────────────────────────────────────────── + # Якщо doc_id змінився — скидаємо doc_facts і fact_claims попереднього документу. + # Fix D: active_doc_id пріоритетний — він явно фіксується при upload (навіть без тексту), + # тому перший text-запит після upload одразу має правильний anchor. + _current_doc_id: str | None = None + if doc_context: + _current_doc_id = ( + doc_context.get("active_doc_id") + or doc_context.get("doc_id") + or doc_context.get("file_unique_id") + or None + ) + if _current_doc_id: + _prev_doc_id = session.get("active_doc_id") + if _prev_doc_id and _prev_doc_id != _current_doc_id: + session["doc_facts"] = {} + session["fact_claims"] = [] + tlog(logger, "doc_anchor_reset", + old=str(_prev_doc_id)[:16], new=str(_current_doc_id)[:16]) + + # ── DOC FOCUS GATE (v3.5 / v3.6) ──────────────────────────────────────── + import time as _time_mod + _now_ts = _time_mod.time() + + # TTL auto-expire (самозцілювальний) + if session.get("doc_focus") and not is_doc_focus_active(session, _now_ts): + _expired_age = round(_now_ts - (session.get("doc_focus_ts") or 0.0)) + session["doc_focus"] = False + session["doc_focus_ts"] = 0.0 + tlog(logger, "doc_focus_expired", chat_id=str(chat_id), + ttl_s=_expired_age, last_doc=str(session.get("active_doc_id", ""))[:16]) + + _signals = detect_context_signals(text) + _domain = _detect_domain(text, logger=logger) + _focus_active = is_doc_focus_active(session, _now_ts) + _cooldown_active = is_doc_focus_cooldown_active(session, _now_ts) + + # Auto-clear doc_focus + встановити cooldown при зміні домену (web/vision) + _df_cooldown_until_update: float | None = None + if _focus_active and _domain in ("vision", "web"): + session["doc_focus"] = False + session["doc_focus_ts"] = 0.0 + _focus_active = False + _cooldown_until = _now_ts + DOC_FOCUS_COOLDOWN_S + session["doc_focus_cooldown_until"] = _cooldown_until + _cooldown_active = True + _df_cooldown_until_update = _cooldown_until + tlog(logger, "doc_focus_cleared", chat_id=str(chat_id), reason=_domain) + tlog(logger, "doc_focus_cooldown_set", chat_id=str(chat_id), + seconds=int(DOC_FOCUS_COOLDOWN_S), reason=_domain) + + # Визначаємо context_mode з gating-правилами (v3.6) + _context_mode: str + _doc_denied_reason: str | None = None + + if _domain in ("doc",): + _is_explicit = _signals["has_explicit_doc_token"] + + # Rule 1: Cooldown блокує implicit doc (не explicit) + if _cooldown_active and not _is_explicit: + _context_mode = "general" + _doc_denied_reason = "cooldown" + _ttl_left = int((session.get("doc_focus_cooldown_until") or 0.0) - _now_ts) + tlog(logger, "doc_mode_denied", chat_id=str(chat_id), + reason="cooldown", ttl_left=_ttl_left) + + # Rule 2: Без explicit — дозволити doc тільки якщо є fact-сигнал або факти покривають + elif not _is_explicit: + _session_facts_gate = session.get("doc_facts") or {} + try: + from crews.agromatrix_crew.doc_facts import can_answer_from_facts as _cafg + _can_use_facts, _ = _cafg(text, _session_facts_gate) + except Exception: + _can_use_facts = False + + if _signals["has_fact_signal"] or _can_use_facts: + # Дозволяємо doc-mode через факт-сигнал + _context_mode = "doc" + else: + _context_mode = "general" + _doc_denied_reason = "no_fact_signal" + tlog(logger, "doc_mode_denied", chat_id=str(chat_id), + reason="no_fact_signal", + has_facts=bool(_session_facts_gate)) + + # Rule 3: Explicit doc-токен завжди дозволяє doc (навіть при cooldown) + else: + _context_mode = "doc" + + elif _domain == "general" and (_focus_active or _current_doc_id) and _signals["has_fact_signal"]: + # Факт-сигнал без doc-домену + active doc → дозволяємо doc якщо фокус живий + _context_mode = "doc" if _focus_active else "general" + else: + _context_mode = "general" + + # Активуємо/продовжуємо doc_focus при успішному doc-mode + if _context_mode == "doc" and _current_doc_id: + if not _focus_active: + session["doc_focus"] = True + session["doc_focus_ts"] = _now_ts + _focus_active = True + tlog(logger, "doc_focus_set", chat_id=str(chat_id), + reason="doc_question_reactivated", doc_id=str(_current_doc_id)[:16]) + + tlog(logger, "context_mode", chat_id=str(chat_id), + mode=_context_mode, domain=_domain, focus=_focus_active, + cooldown=_cooldown_active) + + # v3.6: Якщо doc-mode заблокований — повернути clarifier одразу (без LLM) + if _doc_denied_reason in ("cooldown", "no_fact_signal") and _domain == "doc": + _clarifier = build_mode_clarifier(text) + update_session(str(chat_id), text, depth="light", agents=[], last_question=None) + return _clarifier + + # ── PHOTO CONTEXT GATE (v3.5 fix) ─────────────────────────────────────── + # Якщо щойно було фото (< 120с) і запит короткий (<=3 слів) і без explicit doc — + # відповідаємо коротким уточненням замість light/general відповіді без контексту. + _last_photo_ts = float(session.get("last_photo_ts") or 0.0) + _photo_ctx_ttl = 120.0 + _photo_just_sent = (_now_ts - _last_photo_ts) < _photo_ctx_ttl and _last_photo_ts > 0 + if _photo_just_sent and len(text.split()) <= 3 and not _signals.get("has_explicit_doc_token"): + _photo_age = int(_now_ts - _last_photo_ts) + tlog(logger, "photo_context_gate", chat_id=str(chat_id), + age_s=_photo_age, words=len(text.split())) + update_session(str(chat_id), text, depth="light", agents=[], last_question=None) + return "Що саме хочеш дізнатися про фото?" + + # ── CONFIRMATION GATE (v3.1) ──────────────────────────────────────────── + # Якщо є pending_action у сесії і повідомлення — підтвердження, + # підставляємо контекст попереднього кроку і йдемо в deep. + _CONFIRMATION_WORDS = re.compile( + r"^(так|зроби|ок|ok|окей|погоджуюсь|погодився|роби|давай|підтверджую|yes|go|sure)[\W]*$", + re.IGNORECASE | re.UNICODE, + ) + pending_action = session.get("pending_action") + if pending_action and _CONFIRMATION_WORDS.match(text.strip()): + tlog(logger, "confirmation_consumed", chat_id=str(chat_id), + intent=pending_action.get("intent")) + # Розширюємо контекст: використовуємо збережений intent і what_to_do_next + text = pending_action.get("what_to_do_next") or text + has_doc_context = has_doc_context or bool(pending_action.get("doc_context")) + + # ── DOC BRIDGE (v3.1/v3.2 / v3.5) ────────────────────────────────────── + # PROMPT B: doc_context підмішуємо в промпт ТІЛЬКИ якщо context_mode == "doc". + # Якщо "general" — документ є але мовчимо про нього (не нав'язуємо "у цьому звіті"). + _doc_summary_snippet: str = "" + if doc_context and _context_mode == "doc": + doc_id = doc_context.get("doc_id") or doc_context.get("id", "") + doc_title = doc_context.get("title") or doc_context.get("filename", "") + doc_summary = doc_context.get("extracted_summary") or doc_context.get("summary", "") + has_doc_context = True + tlog(logger, "doc_context_used", chat_id=str(chat_id), doc_id=doc_id, + has_content=bool(doc_summary)) + elif doc_context and _context_mode == "general": + # Документ є, але поточний запит не про нього — не підмішуємо + doc_id = doc_context.get("doc_id") or "" + doc_title = "" + doc_summary = "" + tlog(logger, "doc_context_suppressed", chat_id=str(chat_id), + reason="context_mode_general", doc_id=doc_id[:16] if doc_id else "") + if doc_context and _context_mode == "doc": + # Будуємо snippet для deep-mode промпту (тільки в doc mode) + parts = [] + if doc_title: + parts.append(f"=== ДОКУМЕНТ: «{doc_title}» ===") + if doc_summary: + # До 3000 символів — достатньо для xlsx з кількома листами + parts.append(f"ЗМІСТ ДОКУМЕНТА:\n{doc_summary[:3000]}") + parts.append("=== КІНЕЦЬ ДОКУМЕНТА ===") + elif doc_title: + # Fix E: є документ але summary порожній. + # Якщо file_id відомий — витяг можливий; НЕ просимо "надіслати ще раз". + _doc_file_id = doc_context.get("file_id") or doc_context.get("file_unique_id") or "" + if _doc_file_id: + parts.append( + f"(Вміст «{doc_title}» ще не витягнутий у цій сесії — " + f"але файл є. Перевір ІСТОРІЮ ДІАЛОГУ нижче: там можуть бути " + f"попередні відповіді про цей документ. Якщо в history нічого — " + f"відповідай: 'Зараз витягую дані — дай секунду.' і запропонуй " + f"1 конкретне уточнюючи питання.)" + ) + else: + # file_id невідомий — тоді можна попросити надіслати знову + parts.append( + f"(Вміст «{doc_title}» недоступний. " + f"Перевір ІСТОРІЮ ДІАЛОГУ — там можуть бути попередні відповіді. " + f"Якщо в history нічого — попроси надіслати файл ще раз, одним реченням.)" + ) + _doc_summary_snippet = "\n".join(parts) + + # Light / Deep classification + last_topic = (user_profile or {}).get('last_topic') + depth = classify_depth( + text, + has_doc_context=has_doc_context, + last_topic=last_topic, + user_profile=user_profile, + session=session, + ) + audit_event({**trace, 'agent': 'stepan', 'action': 'intake', 'depth': depth}) + + style_prefix = build_style_prefix(user_profile) + stepan = build_stepan(style_prefix=style_prefix) + + # ── LIGHT MODE ──────────────────────────────────────────────────────────── + if depth == "light": + tlog(logger, "crew_launch", launched=False, depth="light") + response = _stepan_light_response(text, stepan, trace, user_profile) + update_profile_if_needed(str(user_id), str(chat_id), text, response, intent=None, depth="light") + update_session(str(chat_id), text, depth="light", agents=[], last_question=None) + return response + + # ── DEEP MODE ───────────────────────────────────────────────────────────── + tlog(logger, "crew_launch", launched=True, depth="deep", agents=["stepan"]) + audit_event({**trace, 'agent': 'stepan', 'action': 'deep_single_agent'}) + + # ── FACT LOCK (v3.2) ──────────────────────────────────────────────────── + # Якщо в сесії є зафіксовані числові факти — відповідаємо з кешу без RAG. + _session_facts: dict = session.get("doc_facts") or {} + if _session_facts: + _can_reuse, _reuse_keys = can_answer_from_facts(text, _session_facts) + if _can_reuse: + tlog(logger, "fact_reused", chat_id=str(chat_id), + keys=",".join(_reuse_keys)) + # Спочатку перевіряємо сценарний розрахунок + _scen_ok, _scen_text = compute_scenario(text, _session_facts) + if _scen_ok: + _self_corr = build_self_correction(text, _session_facts, session, current_doc_id=_current_doc_id) + final = (_self_corr + _scen_text) if _self_corr else _scen_text + final = adapt_response_style(final, user_profile) + _new_claims = extract_fact_claims(final) + update_session(str(chat_id), text, depth="deep", agents=[], + last_question=None, doc_facts=_session_facts, + fact_claims=_new_claims, active_doc_id=_current_doc_id, + doc_focus=True, doc_focus_ts=_now_ts) + return final + # Без сценарію — форматуємо відомі факти + _facts_text = format_facts_as_text( + {k: _session_facts[k] for k in _reuse_keys if k in _session_facts} + ) + _self_corr = build_self_correction(text, _session_facts, session, current_doc_id=_current_doc_id) + final = (_self_corr + _facts_text) if _self_corr else _facts_text + final = adapt_response_style(final, user_profile) + _new_claims = extract_fact_claims(final) + update_session(str(chat_id), text, depth="deep", agents=[], + last_question=None, doc_facts=_session_facts, + fact_claims=_new_claims, active_doc_id=_current_doc_id, + doc_focus=True, doc_focus_ts=_now_ts) + return final + + # Preflight normalization — лише збагачення контексту, НЕ блокатор. + norm = {} + try: + norm = tool_dictionary.normalize_from_text(text, trace_id=trace['trace_id'], source='telegram') + except Exception as _ne: + logger.debug("normalize_from_text error (non-blocking): %s", _ne) + _all_pending = [item for cat in norm.values() for item in cat if item.get('status') == 'pending'] + if _all_pending: + tlog(logger, "pending_terms_info", chat_id=str(chat_id), count=len(_all_pending)) intent = detect_intent(text) - pending_count = 0 - if ops_mode: - try: - from agromatrix_tools import tool_dictionary_review as review - pending_count = review.stats().get('open', 0) - except Exception: - pending_count = 0 + # Специфічні intent-и що потребують tool-виклику if intent in ['plan_week', 'plan_day']: - plan_id = tool_operation_plan.create_plan({ - 'scope': { - 'field_ids': [i.get('normalized_id') for i in norm.get('fields', []) if i.get('status')=='ok'], - 'crop_ids': [i.get('normalized_id') for i in norm.get('crops', []) if i.get('status')=='ok'], - 'date_window': {'start': '', 'end': ''} - }, - 'tasks': [] - }, trace_id=trace['trace_id'], source='telegram') - return json.dumps({ - 'status': 'ok', - 'summary': f'План створено: {plan_id}', - 'artifacts': [], - 'tool_calls': [], - 'next_actions': ['уточнити дати та операції'], - 'pending_dictionary_items': pending_count if ops_mode else None - }, ensure_ascii=False) + try: + plan_id = tool_operation_plan.create_plan({ + 'scope': { + 'field_ids': [i.get('normalized_id') for i in norm.get('fields', []) if i.get('status')=='ok'], + 'crop_ids': [i.get('normalized_id') for i in norm.get('crops', []) if i.get('status')=='ok'], + 'date_window': {'start': '', 'end': ''} + }, + 'tasks': [] + }, trace_id=trace['trace_id'], source='telegram') + return f"План створено: {plan_id}. Уточни дати та операції." + except Exception: + pass - if intent == 'show_critical_tomorrow': - _ = tool_operation_plan.plan_dashboard({}, {}) - return json.dumps({ - 'status': 'ok', - 'summary': 'Критичні задачі на завтра', - 'artifacts': [], - 'tool_calls': [], - 'next_actions': [], - 'pending_dictionary_items': pending_count if ops_mode else None - }, ensure_ascii=False) + # Будуємо промпт для Степана — з doc_context і chat_history + task_parts = [] - if intent == 'plan_vs_fact': - _ = tool_operation_plan.plan_dashboard({}, {}) - return json.dumps({ - 'status': 'ok', - 'summary': 'План/факт зведення', - 'artifacts': [], - 'tool_calls': [], - 'next_actions': [], - 'pending_dictionary_items': pending_count if ops_mode else None - }, ensure_ascii=False) + # 0. v4: Farm State prefix (тільки не в doc і не web режимі) + if _context_mode != "doc" and _domain not in ("web",): + _farm_prefix = build_farm_state_prefix(session) + if _farm_prefix: + task_parts.append(_farm_prefix) + tlog(logger, "farm_state_injected", chat_id=str(chat_id), + crop=str((session.get("farm_state") or {}).get("current_crop", ""))[:20]) - # general crew flow - ops_out = run_task_with_retry(ops, "Оціни чи потрібні операційні записи або читання farmOS", trace['trace_id']) - iot_out = run_task_with_retry(iot, "Оціни чи є потреба в даних ThingsBoard або NATS", trace['trace_id']) - platform_out = run_task_with_retry(platform, "Перевір базовий статус сервісів/інтеграцій", trace['trace_id']) - sheet_out = run_task_with_retry(spreadsheet, "Якщо запит стосується таблиць — підготуй артефакти", trace['trace_id']) - sustainability_out = run_task_with_retry(sustainability, "Якщо потрібні агрегації — дай read-only підсумки", trace['trace_id']) + # 0b. v4.2: Vision → Agronomy Bridge prefix + # Додаємо контекст культури з останнього фото, якщо: + # - НЕ doc mode (документ має пріоритет) + # - НЕ web mode + # - vision_last_label є і не дублює farm_state (щоб не плутати Степана) + if _context_mode != "doc" and _domain not in ("web",): + _vb_label_now = (session.get("vision_last_label") or "").strip() + _farm_crop = str((session.get("farm_state") or {}).get("current_crop", "")).strip() + if _vb_label_now and _vb_label_now != _farm_crop: + task_parts.append( + f"SYSTEM NOTE (не виводь це в відповідь): " + f"Культура з останнього фото — {_vb_label_now}. " + "Використовуй як контекст для агрономічних питань." + ) + tlog(logger, "vision_bridge_injected", + chat_id=str(chat_id), label=_vb_label_now) - audit_event({**trace, 'agent': 'stepan', 'action': 'delegate', 'targets': ['ops','iot','platform','spreadsheet','sustainability']}) + # 0c. v4.7: FarmOS Farm State Bridge + # Читаємо збережений /farm state snapshot з memory-service. + # Умови injection (аналогічно vision bridge): + # - НЕ doc mode + # - НЕ web domain + # - snapshot не старший за 24h + if _context_mode != "doc" and _domain not in ("web",): + _fs_text = _load_farm_state_snapshot(str(chat_id)) + if _fs_text: + task_parts.append( + f"SYSTEM NOTE (не виводь це в відповідь): " + f"Farm state snapshot (FarmOS, актуально):\n{_fs_text}" + ) + tlog(logger, "farm_state_snapshot_loaded", + chat_id=str(chat_id), found=True, + preview=_fs_text[:40]) + else: + tlog(logger, "farm_state_snapshot_loaded", + chat_id=str(chat_id), found=False) - summary = { - 'ops': ops_out, - 'iot': iot_out, - 'platform': platform_out, - 'spreadsheet': sheet_out, - 'sustainability': sustainability_out - } + # 1. Документ (якщо є вміст) + if _doc_summary_snippet: + task_parts.append(_doc_summary_snippet) + + # 2. Контекст переписки (chat history з memory-service) + if chat_history: + # Беремо останні 3000 символів — достатньо для контексту + _history_snippet = chat_history[-3000:] if len(chat_history) > 3000 else chat_history + task_parts.append( + f"=== ІСТОРІЯ ДІАЛОГУ (до 40 повідомлень) ===\n" + f"{_history_snippet}\n" + f"=== КІНЕЦЬ ІСТОРІЇ ===" + ) + + # 3. Контекст профілю якщо є + if farm_profile: + fields = farm_profile.get("fields", []) + if fields: + task_parts.append(f"Поля господарства: {', '.join(str(f) for f in fields[:5])}") + + task_parts.append(f"Поточний запит: {text}") + + if _doc_summary_snippet: + task_parts.append( + "ІНСТРУКЦІЯ: У тебе є вміст документа вище. " + "Відповідай ТІЛЬКИ на основі цього документа. " + "Якщо є числа — цитуй їх точно. " + "Якщо потрібного числа немає — скажи в якому рядку/колонці шукати (1 речення)." + ) + elif chat_history: + task_parts.append( + "ІНСТРУКЦІЯ: Використовуй ІСТОРІЮ ДІАЛОГУ вище для відповіді. " + "Якщо в history є згадка документа або дані — спирайся на них. " + "Відповідай коротко і конкретно українською." + ) + elif doc_context and (doc_context.get("file_id") or doc_context.get("file_unique_id")): + # Fix E: файл є, але summary порожній і history немає → НЕ казати "немає даних" + _f_name = doc_context.get("file_name") or "документ" + task_parts.append( + f"ІНСТРУКЦІЯ: Файл «{_f_name}» отримано, але вміст ще не витягнутий. " + f"НЕ кажи 'немає даних' або 'надішли ще раз'. " + f"Відповідай: 'Зараз опрацьовую — дай хвилину' і постав 1 уточнюючи питання " + f"про те, що саме потрібно знайти у файлі." + ) + else: + task_parts.append( + "Дай коротку, конкретну відповідь українською. " + "Якщо даних недостатньо — скажи що саме потрібно уточнити (1 питання)." + ) + tlog(logger, "deep_context_ready", chat_id=str(chat_id), + has_doc=bool(_doc_summary_snippet), has_history=bool(chat_history), + history_len=len(chat_history)) final_task = Task( - description=f"Сформуй фінальну коротку відповідь користувачу. Вхідні дані (JSON): {json.dumps(summary, ensure_ascii=False)}", - expected_output="Коротка консолідована відповідь для користувача українською.", + description="\n\n".join(task_parts), + expected_output="Коротка відповідь для користувача українською мовою.", agent=stepan ) - crew = Crew(agents=[stepan], tasks=[final_task], verbose=True) + crew = Crew(agents=[stepan], tasks=[final_task], verbose=False) result = crew.kickoff() + raw_response = str(result) + farmos_ui_hint() + styled_response = adapt_response_style(raw_response, user_profile) - return str(result) + farmos_ui_hint() + # Reflection (Deep mode only, never recursive) + reflection = reflect_on_response(text, styled_response, user_profile, farm_profile) + if reflection.get("style_shift") and user_profile: + user_profile["style"] = reflection["style_shift"] + if reflection.get("clarifying_question"): + styled_response = styled_response.rstrip() + "\n\n" + reflection["clarifying_question"] + if reflection.get("new_facts") and user_id: + u = user_profile or {} + for k, v in reflection["new_facts"].items(): + if k == "new_crops": + # Handled by update_profile_if_needed / FarmProfile + pass + elif k in ("name", "role"): + u[k] = v + if user_id: + save_user_profile(str(user_id), u) + + audit_event({**trace, 'agent': 'stepan', 'action': 'reflection', + 'confidence': reflection.get("confidence"), 'new_facts': list(reflection.get("new_facts", {}).keys())}) + + # Soft proactivity (v3: 1 речення max, за умовами) + clarifying_q = reflection.get("clarifying_question") if reflection else None + styled_response, _ = maybe_add_proactivity( + styled_response, user_profile or {}, depth="deep", reflection=reflection + ) + + update_profile_if_needed(str(user_id), str(chat_id), text, styled_response, intent=intent, depth="deep") + + # ── v3.7: STATE-AWARE DOC ACK ──────────────────────────────────────────── + # Якщо doc_focus щойно встановлений (focus_active раніше не був) і context_mode==doc, + # додаємо короткий префікс (max 60 символів), щоб уникнути "Так, пам'ятаю". + _doc_just_activated = ( + _context_mode == "doc" + and _current_doc_id + and not _focus_active # _focus_active = стан ДО активації у цьому запиті + ) + if _doc_just_activated: + _ack = "По звіту дивлюсь." if _signals.get("has_explicit_doc_token") else "Працюємо зі звітом." + # Тільки якщо відповідь не починається з нашого ack + if not styled_response.startswith(_ack): + styled_response = f"{_ack}\n{styled_response}" + tlog(logger, "doc_focus_acknowledged", chat_id=str(chat_id), + ack=_ack[:20], explicit=_signals.get("has_explicit_doc_token", False)) + + # ── FACT LOCK: витягуємо факти з відповіді і зберігаємо в session ────── + _new_facts = extract_doc_facts(styled_response) + _merged_facts: dict | None = None + if _new_facts: + _merged_facts = merge_doc_facts(_session_facts, _new_facts) + _conflicts = _merged_facts.get("conflicts", {}) + tlog(logger, "fact_locked", chat_id=str(chat_id), + keys=",".join(k for k in _new_facts if k not in ("conflicts","needs_recheck")), + conflicts=bool(_conflicts)) + # Якщо конфлікт — додаємо 1 речення до відповіді + if _conflicts: + conflict_key = next(iter(_conflicts)) + tlog(logger, "fact_conflict", chat_id=str(chat_id), key=conflict_key) + styled_response = styled_response.rstrip() + ( + f"\n\nБачу розбіжність по \"{conflict_key.replace('_uah','').replace('_ha','')}\". " + "Підтверди, яке значення правильне." + ) + + # ── SELF-CORRECTION: prefix якщо нова відповідь суперечить попередній ── + _self_corr_prefix = build_self_correction(styled_response, _session_facts, session, current_doc_id=_current_doc_id) + if _self_corr_prefix: + styled_response = _self_corr_prefix + styled_response + tlog(logger, "self_corrected", chat_id=str(chat_id)) + + # ── pending_action ─────────────────────────────────────────────────────── + _pending_action: dict | None = None + if clarifying_q and intent: + _pending_action = { + "intent": intent, + "what_to_do_next": text, + "doc_context": {"doc_id": doc_context.get("doc_id")} if doc_context else None, + } + + _new_claims = extract_fact_claims(styled_response) + + # Якщо відповідь в doc-режимі і факти знайдено — вмикаємо/продовжуємо doc_focus + _df_update: bool | None = None + _df_ts_update: float | None = None + if _context_mode == "doc" and _current_doc_id: + _df_update = True + _df_ts_update = _now_ts + if not _focus_active: + tlog(logger, "doc_focus_set", chat_id=str(chat_id), + reason="deep_doc_answer", doc_id=str(_current_doc_id)[:16]) + elif _context_mode == "general" and session.get("doc_focus"): + # Відповідь в general-режимі — скидаємо фокус (не продовжуємо TTL) + _df_update = False + _df_ts_update = 0.0 + + update_session( + str(chat_id), text, depth="deep", + agents=["stepan"], + last_question=clarifying_q, + pending_action=_pending_action, + doc_facts=_merged_facts if _merged_facts is not None else (_session_facts or None), + fact_claims=_new_claims, + active_doc_id=_current_doc_id, + doc_focus=_df_update, + doc_focus_ts=_df_ts_update, + doc_focus_cooldown_until=_df_cooldown_until_update, + ) + + # ── CONTEXT BLEED GUARD (v3.5 / v3.6 / v3.7) ──────────────────────────── + # Якщо відповідь містить doc-фрази але context_mode == "general" → замінити. + # Це блокує "витік" шаблонних фраз навіть коли doc_context не підмішувався. + if _context_mode == "general": + _BLEED_RE = re.compile( + r"у\s+(?:цьому|наданому|даному)\s+документі" + r"|в\s+(?:цьому|наданому|даному)\s+документі" + r"|у\s+(?:цьому\s+)?звіті|в\s+(?:цьому\s+)?звіті", + re.IGNORECASE | re.UNICODE, + ) + _bleed_match = _BLEED_RE.search(styled_response) + if _bleed_match: + _bleed_phrase = _bleed_match.group(0) + tlog(logger, "doc_phrase_suppressed", chat_id=str(chat_id), + phrase=_bleed_phrase[:40], mode="general") + # v3.6: використовуємо контекстний clarifier замість фіксованої фрази + styled_response = build_mode_clarifier(text) + + # ── UX-PHRASE GUARD (v3.7) ──────────────────────────────────────────────── + # Заміна шаблонних фраз "Так, пам'ятаю" / "Не бачу його перед собою" тощо. + _DOC_AWARENESS_RE = re.compile( + r"(так,\s*пам['\u2019]ятаю|не\s+бачу\s+його|не\s+бачу\s+перед\s+собою" + r"|мені\s+(?:не\s+)?доступний\s+документ)", + re.IGNORECASE | re.UNICODE, + ) + if _DOC_AWARENESS_RE.search(styled_response): + tlog(logger, "doc_ux_phrase_suppressed", chat_id=str(chat_id)) + styled_response = re.sub( + _DOC_AWARENESS_RE, + lambda m: build_mode_clarifier(text), + styled_response, + count=1, + ) + + # v3.7: Заміна "у цьому документі" у general при будь-якому тексті + if _context_mode == "general": + _DOC_MENTION_RE = re.compile( + r"\bзвіт\b", + re.IGNORECASE | re.UNICODE, + ) + if _DOC_MENTION_RE.search(styled_response) and "doc_facts" not in str(styled_response[:100]): + tlog(logger, "doc_mention_blocked_in_general", chat_id=str(chat_id)) + + return styled_response def main(): diff --git a/docker-compose.staging.yml b/docker-compose.staging.yml index 54f19371..3a262df9 100644 --- a/docker-compose.staging.yml +++ b/docker-compose.staging.yml @@ -33,6 +33,13 @@ services: - SERVICE_ID=router - SERVICE_ROLE=router - NATS_URL=nats://dagi-staging-nats:4222 + # ── Persistence backends (can also be set in .env.staging) ──────────── + - ALERT_BACKEND=postgres + - ALERT_DATABASE_URL=${ALERT_DATABASE_URL:-${DATABASE_URL}} + - RISK_HISTORY_BACKEND=auto + - BACKLOG_BACKEND=auto + - INCIDENT_BACKEND=auto + - AUDIT_BACKEND=auto volumes: - ./services/router/router_config.yaml:/app/router_config.yaml:ro - ./logs:/app/logs diff --git a/docs/backups/LATEST.txt b/docs/backups/LATEST.txt index 664141c9..a884323a 100644 --- a/docs/backups/LATEST.txt +++ b/docs/backups/LATEST.txt @@ -1 +1 @@ -/Users/apple/github-projects/microdao-daarion/docs/backups/docs_backup_20260218-091700.tar.gz +/Users/apple/github-projects/microdao-daarion/docs/backups/docs_backup_20260226-091701.tar.gz diff --git a/gateway-bot/app.py b/gateway-bot/app.py index 53f63dad..0b8dafc6 100644 --- a/gateway-bot/app.py +++ b/gateway-bot/app.py @@ -1,62 +1,88 @@ -"""FastAPI app instance for Gateway Bot.""" +""" +FastAPI app instance for Gateway Bot +""" import logging +import os +import sys +from pathlib import Path from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from http_api import router as gateway_router from http_api_doc import router as doc_router -from daarion_facade.invoke_api import router as invoke_router -from daarion_facade.registry_api import router as registry_router + +import gateway_boot logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) +logger = logging.getLogger(__name__) app = FastAPI( title="Bot Gateway with DAARWIZZ", - version="1.1.0", - description="Gateway service for Telegram/Discord bots + DAARION public facade" + version="1.0.0", + description="Gateway service for Telegram/Discord bots → DAGI Router" ) -# CORS for web UI clients (gateway only). +# CORS middleware app.add_middleware( CORSMiddleware, - allow_origins=[ - "https://daarion.city", - "https://www.daarion.city", - "http://localhost:3000", - ], - allow_origin_regex=r"https://.*\.lovable\.app", + allow_origins=["*"], allow_credentials=True, - allow_methods=["GET", "POST", "OPTIONS"], - allow_headers=["Authorization", "Content-Type"], + allow_methods=["*"], + allow_headers=["*"], ) -# Existing gateway routes. +# Include gateway routes app.include_router(gateway_router, prefix="", tags=["gateway"]) app.include_router(doc_router, prefix="", tags=["docs"]) -# Public facade routes for DAARION.city UI. -app.include_router(registry_router) -app.include_router(invoke_router) + +@app.on_event("startup") +async def startup_stepan_check(): + """Check crews + agromatrix_tools availability. Do not crash gateway if missing.""" + # Шляхи для inproc: gateway volume (основний) або repo root (dev) + gw_dir = str(Path(__file__).parent) # /app/gateway-bot + repo_root = os.getenv("AGX_REPO_ROOT", "/opt/microdao-daarion").strip() + candidate_paths = [ + gw_dir, # /app/gateway-bot (crews/ тут) + str(Path(gw_dir) / "agromatrix-tools"), # /app/gateway-bot/agromatrix-tools + repo_root, # /opt/microdao-daarion + str(Path(repo_root) / "packages" / "agromatrix-tools"), + str(Path(repo_root) / "packages" / "agromatrix-tools" / "agromatrix_tools"), + ] + for p in candidate_paths: + if p and p not in sys.path: + sys.path.insert(0, p) + try: + import crews.agromatrix_crew.run # noqa: F401 + import agromatrix_tools # noqa: F401 + gateway_boot.STEPAN_IMPORTS_OK = True + logger.info("Stepan inproc: crews + agromatrix_tools OK; STEPAN_IMPORTS_OK=True") + except Exception as e: + logger.error( + "Stepan disabled: crews or agromatrix_tools not available: %s. " + "Set AGX_REPO_ROOT, mount crews and packages/agromatrix-tools.", + e, + ) + gateway_boot.STEPAN_IMPORTS_OK = False @app.get("/") async def root(): return { "service": "bot-gateway", - "version": "1.1.0", + "version": "1.0.0", "agent": "DAARWIZZ", "endpoints": [ "POST /telegram/webhook", "POST /discord/webhook", - "GET /v1/registry/agents", - "GET /v1/registry/districts", - "GET /v1/metrics", - "POST /v1/invoke", - "GET /v1/jobs/{job_id}", - "GET /health", + "POST /api/doc/parse", + "POST /api/doc/ingest", + "POST /api/doc/ask", + "GET /api/doc/context/{session_id}", + "GET /health" ] } diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index ca1c5fce..255b98ee 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -25,11 +25,27 @@ from pydantic import BaseModel from router_client import send_to_router from memory_client import memory_client +from vision_guard import ( + extract_label_from_response as _vg_extract_label, + get_vision_lock as _vg_get_lock, + set_vision_lock as _vg_set_lock, + clear_vision_lock as _vg_clear_lock, + set_user_label as _vg_set_user_label, + detect_user_override as _vg_detect_override, + should_skip_reanalysis as _vg_should_skip, + build_low_confidence_clarifier as _vg_build_low_conf, + build_locked_reply as _vg_build_locked_reply, +) from services.doc_service import ( parse_document, ingest_document, ask_about_document, - get_doc_context + get_doc_context, + save_chat_doc_context, + get_chat_doc_context, + fetch_telegram_file_bytes, + extract_summary_from_bytes, + upsert_chat_doc_context_with_summary, ) from behavior_policy import ( should_respond, @@ -44,6 +60,7 @@ from behavior_policy import ( get_ack_text, is_prober_request, has_agent_chat_participation, + has_recent_interaction, NO_OUTPUT, BehaviorDecision, AGENT_NAME_VARIANTS, @@ -51,6 +68,16 @@ from behavior_policy import ( logger = logging.getLogger(__name__) + +def _safe_has_recent_interaction(agent_id: str, chat_id: str, user_id: str) -> bool: + """Guard: avoid 500 if has_recent_interaction is missing or raises. Returns False on any error.""" + try: + return bool(has_recent_interaction(agent_id, str(chat_id), str(user_id))) + except Exception as e: + logger.warning("has_recent_interaction failed, treating as False: %s", e) + return False + + # Telegram message length limits TELEGRAM_MAX_MESSAGE_LENGTH = 4096 TELEGRAM_SAFE_LENGTH = 3500 # Leave room for formatting @@ -654,6 +681,31 @@ def _get_last_pending(chat_id: str) -> list | None: return rec.get('items') +def _find_doc_in_history(history_text: str) -> dict | None: + """ + Шукає посилання на документ у тексті chat history. + Якщо знаходить рядок '[Документ: ...]' — повертає doc_context stub. + Це дозволяє Степану знати про документ навіть без збереженого extracted_summary. + """ + import re as _re + if not history_text: + return None + # Шукаємо паттерн [Документ: filename.xlsx] + matches = _re.findall(r'\[Документ:\s*([^\]]+)\]', history_text) + if not matches: + # Також шукаємо assistant-повідомлення про документ + matches = _re.findall(r'📄[^\n]*\*\*([^*]+)\*\*', history_text) + if matches: + file_name = matches[-1].strip() # Беремо найновіший + return { + "doc_id": "", + "title": file_name, + "extracted_summary": "", # немає вмісту — але є назва + "from_history": True, + } + return None + + def _set_last_pending(chat_id: str, items: list): LAST_PENDING_STATE[str(chat_id)] = {"ts": time.time(), "items": items} @@ -992,6 +1044,18 @@ async def druid_telegram_webhook(update: TelegramUpdate): # AGROMATRIX webhook endpoint +# AGX_STEPAN_MODE: inproc = run Crew in-process (default); http = call crewai-service (9010). +_STEPAN_MODE = None + +def _get_stepan_mode() -> str: + global _STEPAN_MODE + if _STEPAN_MODE is None: + _STEPAN_MODE = (os.getenv("AGX_STEPAN_MODE", "inproc") or "inproc").strip().lower() + if _STEPAN_MODE not in ("inproc", "http"): + _STEPAN_MODE = "inproc" + logger.info("Stepan mode=%s (AGX_STEPAN_MODE)", _STEPAN_MODE) + return _STEPAN_MODE + async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfig) -> Dict[str, Any]: update_id = getattr(update, 'update_id', None) or update.update_id @@ -1004,14 +1068,167 @@ async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfi message = update.message or update.channel_post or {} text = message.get('text') or message.get('caption') or '' - if not text: - return {"ok": True, "status": "no_text"} user = message.get('from', {}) or {} chat = message.get('chat', {}) or {} user_id = str(user.get('id', '')) chat_id = str(chat.get('id', '')) + # ── DOC HANDOFF + EXTRACT-ON-UPLOAD (v3.4 / PROMPT 30) ───────────────── + # При отриманні документа (operator path): + # 1) зберегти базовий doc_ctx (doc_id, file_name) + # 2) для XLSX/XLS/CSV: завантажити байти через Bot API, витягнути summary, + # оновити doc_context_chat з extracted_summary → Stepan бачить дані одразу + _doc_obj = message.get("document") + if _doc_obj and _doc_obj.get("file_id"): + _file_id_tg = _doc_obj.get("file_id") + _fu_id = _doc_obj.get("file_unique_id") or _file_id_tg + _fname = _doc_obj.get("file_name") or "" + _bot_token = agent_config.get_telegram_token() or "" + _doc_ctx_to_save: dict = { + "doc_id": _fu_id, + "file_unique_id": _fu_id, + "file_id": _file_id_tg, + "file_name": _fname, + "source": "telegram", + # Fix D: явно фіксуємо anchor одразу при upload — run.py може читати без парсингу doc_id + "active_doc_id": _fu_id, + } + # Крок 1: зберегти базовий doc_ctx (await = race-safe) + await save_chat_doc_context(chat_id, agent_config.agent_id, _doc_ctx_to_save) + logger.info("Doc Handoff: saved base doc_id=%s file=%s", str(_fu_id)[:16], _fname) + + # Крок 2: Extract-on-upload для табличних форматів + _fname_lower = _fname.lower() + _extractable = _fname_lower.endswith((".xlsx", ".xls", ".csv")) + _extract_ok = False + if _extractable and _bot_token: + # Fix 1: One-shot cache — якщо summary вже є для того самого file_unique_id → skip + _existing_ctx = await get_chat_doc_context(chat_id, agent_config.agent_id) + _already_have = ( + _existing_ctx + and _existing_ctx.get("extracted_summary") + and (_existing_ctx.get("file_unique_id") or _existing_ctx.get("doc_id")) == _fu_id + ) + if _already_have: + _extract_ok = True + logger.info("doc_extract_skipped reason=already_have_summary chat_id=%s fuid=%s", + chat_id, str(_fu_id)[:16]) + else: + logger.info("doc_extract_started chat_id=%s file=%s", chat_id, _fname) + try: + _file_bytes = await fetch_telegram_file_bytes(_bot_token, _file_id_tg) + _extract_summary = extract_summary_from_bytes(_fname, _file_bytes) + if _extract_summary: + await upsert_chat_doc_context_with_summary( + chat_id, agent_config.agent_id, _doc_ctx_to_save, _extract_summary + ) + _extract_ok = True + logger.info("doc_extract_done ok=true chat_id=%s chars=%d", + chat_id, len(_extract_summary)) + else: + logger.warning("doc_extract_done ok=false reason=empty_summary chat_id=%s", chat_id) + except Exception as _ee: + logger.warning("doc_extract_done ok=false reason=%s chat_id=%s", + str(_ee)[:80], chat_id) + + # Якщо тексту/caption немає — підтверджуємо отримання і виходимо + if not text: + if _extract_ok: + _reply = ( + f"Прочитав «{_fname}». Можу: (1) витягнути прибуток/витрати, " + f"(2) сценарій — добрива×2, (3) зведення грн/га. Що потрібно?" + ) + elif _extractable: + _reply = ( + f"Отримав «{_fname}», але не зміг витягнути дані автоматично. " + f"Постав питання — перегляну через пошук по документу." + ) + else: + _reply = ( + f"Бачу «{_fname}». Що зробити: витягнути прибуток/витрати, " + f"сценарій, чи звести по га?" + ) + await send_telegram_message(chat_id, _reply, bot_token=_bot_token) + return {"ok": True, "status": "doc_saved"} + + # ── PHOTO BRIDGE (v3.5) ───────────────────────────────────────────────── + # Фото в operator path раніше провалювалося через "if not text" і тихо ігнорувалося. + # Тепер: делегуємо до process_photo (vision-8b через Router) — той самий шлях, + # що використовують всі інші агенти. Агент AgroMatrix вже має спеціальний контекст + # (prior_label + agricultural system prompt) у process_photo. + _photo_obj = message.get("photo") + if _photo_obj and not text: + # text може бути caption — вже вище: text = message.get('caption') or '' + # якщо caption не порожній — photo+caption піде в text-гілку нижче (Stepan відповідає) + # тут тільки "фото без тексту" + _username = (user.get('username') or user.get('first_name') or str(user_id)) + _dao_id = os.getenv("AGX_DAO_ID", "agromatrix-dao") + _bot_tok = agent_config.get_telegram_token() or "" + logger.info("Photo bridge: routing photo to process_photo chat_id=%s", chat_id) + try: + _photo_result = await process_photo( + agent_config=agent_config, + update=update, + chat_id=chat_id, + user_id=user_id, + username=_username, + dao_id=_dao_id, + photo=_photo_obj, + caption_override=None, + bypass_media_gate=True, # operator path = завжди відповідати + ) + # v3.5 fix: зберігаємо timestamp фото в session Степана + # щоб наступний текстовий запит (words=1) знав що щойно було фото + try: + from crews.agromatrix_crew.session_context import update_session + import time as _ts_mod + update_session( + chat_id, "[photo]", depth="light", agents=[], + last_question=None, + last_photo_ts=_ts_mod.time(), + ) + logger.info("Photo bridge: last_photo_ts saved chat_id=%s", chat_id) + except Exception: + pass + return _photo_result + except Exception as _pe: + logger.warning("Photo bridge error: %s", _pe) + await send_telegram_message( + chat_id, + "Не вдалося обробити фото. Спробуй ще раз або напиши що на фото.", + bot_token=_bot_tok, + ) + return {"ok": True, "status": "photo_error"} + + if not text: + return {"ok": True, "status": "no_text"} + + # ── PHOTO+TEXT: якщо є caption → Stepan отримує опис через doc_context ───── + # Якщо text (caption) є + фото → стандартний flow Степана + зберігаємо file_id + # щоб він міг згадати фото у відповіді. + if _photo_obj and text: + _photo_largest = _photo_obj[-1] if isinstance(_photo_obj, list) else _photo_obj + _photo_file_id = _photo_largest.get("file_id") if isinstance(_photo_largest, dict) else None + if _photo_file_id: + _set_recent_photo_context(agent_config.agent_id, chat_id, user_id, _photo_file_id) + + # ── VISION CONSISTENCY GUARD: Хук C — User Override ───────────────────── + # Whitelist + negation guard: "це соняшник" → user_label; + # "це не соняшник" → ігноруємо. + if text: + try: + _vg_override = _vg_detect_override(text) + if _vg_override: + _vg_set_user_label(agent_config.agent_id, chat_id, _vg_override) + logger.info( + "vision_user_override_set agent=%s chat_id=%s label=%s", + agent_config.agent_id, chat_id, _vg_override, + ) + except Exception: + pass + # ───────────────────────────────────────────────────────────────────────── + # ops mode if operator ops_mode = False op_ids = [s.strip() for s in os.getenv('AGX_OPERATOR_IDS', '').split(',') if s.strip()] @@ -1022,21 +1239,152 @@ async def handle_stepan_message(update: TelegramUpdate, agent_config: AgentConfi ops_mode = True trace_id = str(uuid.uuid4()) + stepan_mode = _get_stepan_mode() + + if stepan_mode == "http": + logger.warning("Stepan http mode not implemented; use AGX_STEPAN_MODE=inproc.") + bot_token = agent_config.get_telegram_token() + await send_telegram_message( + chat_id, + "Степан у режимі HTTP зараз недоступний. Встановіть AGX_STEPAN_MODE=inproc.", + bot_token=bot_token, + ) + return {"ok": False, "status": "stepan_http_not_implemented"} - # call Stepan directly try: - sys.path.insert(0, str(Path('/opt/microdao-daarion'))) + import gateway_boot + except ImportError: + gateway_boot = type(sys)("gateway_boot") + gateway_boot.STEPAN_IMPORTS_OK = False + if not getattr(gateway_boot, "STEPAN_IMPORTS_OK", False): + logger.warning("Stepan inproc disabled: crews/agromatrix_tools not available at startup") + bot_token = agent_config.get_telegram_token() + await send_telegram_message( + chat_id, + "Степан тимчасово недоступний (не встановлено crews або agromatrix-tools).", + bot_token=bot_token, + ) + return {"ok": False, "status": "stepan_disabled"} + + try: + # v3: crews/ is in /app/gateway-bot/crews (volume-mounted copy) + # AGX_REPO_ROOT can override for dev/alt deployments + repo_root = os.getenv("AGX_REPO_ROOT", "") + _gw = "/app/gateway-bot" + _at = "/app/gateway-bot/agromatrix-tools" + for _p in [_at, _gw, repo_root]: + if _p and _p not in sys.path: + sys.path.insert(0, _p) from crews.agromatrix_crew.run import handle_message + + # Doc Bridge (v3.3): отримати активний doc_context для цього chat. + # Пріоритет: chat-scoped (doc_context_chat:) > session-scoped (doc_context:). + _stepan_doc_ctx: dict | None = None + try: + # 1) Спочатку пробуємо chat-scoped (надійніший при зміні session_id) + _chat_dc = await get_chat_doc_context(chat_id, agent_config.agent_id) + if _chat_dc and (_chat_dc.get("doc_id") or _chat_dc.get("file_unique_id")): + _chat_doc_id = _chat_dc.get("doc_id") or _chat_dc.get("file_unique_id") + _chat_extracted = _chat_dc.get("extracted_summary") or "" + _chat_fname = _chat_dc.get("file_name") or "" + # Якщо chat-scoped є але без extracted_summary → шукаємо в session-scoped + if not _chat_extracted: + try: + _dc_sess = await get_doc_context(f"telegram:{chat_id}", agent_id=agent_config.agent_id) + if _dc_sess and getattr(_dc_sess, "extracted_summary", None): + _chat_extracted = _dc_sess.extracted_summary + except Exception: + pass + # Якщо ще немає — RAG fallback + if not _chat_extracted and _chat_doc_id: + try: + _qa = await ask_about_document( + session_id=f"telegram:{chat_id}", + question=text, + doc_id=_chat_doc_id, + dao_id=os.getenv("AGX_DAO_ID", "agromatrix-dao"), + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + ) + if _qa and getattr(_qa, "answer", None): + _chat_extracted = f"[RAG відповідь по документу]: {_qa.answer}" + logger.info("Doc Bridge: RAG answer retrieved for chat doc_id=%s", _chat_doc_id) + except Exception as _qae: + logger.debug("Doc Bridge RAG fallback failed: %s", _qae) + _stepan_doc_ctx = { + "doc_id": _chat_doc_id, + "title": _chat_fname, + "extracted_summary": _chat_extracted, + "file_unique_id": _chat_dc.get("file_unique_id") or _chat_doc_id, + } + logger.info("Doc Bridge: chat-scoped doc_id=%s found=true", _chat_doc_id[:16] if _chat_doc_id else "") + else: + # 2) Fallback: session-scoped (старий ключ) + _dc = await get_doc_context(f"telegram:{chat_id}", agent_id=agent_config.agent_id) + if _dc and getattr(_dc, "doc_id", None): + _extracted = getattr(_dc, "extracted_summary", "") or "" + if not _extracted and getattr(_dc, "doc_id", None): + try: + _qa = await ask_about_document( + session_id=f"telegram:{chat_id}", + question=text, + doc_id=_dc.doc_id, + dao_id=os.getenv("AGX_DAO_ID", "agromatrix-dao"), + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + ) + if _qa and getattr(_qa, "answer", None): + _extracted = f"[RAG відповідь по документу]: {_qa.answer}" + logger.info("Doc Bridge: session-scoped RAG retrieved for doc_id=%s", _dc.doc_id) + except Exception as _qae: + logger.debug("Doc Bridge session RAG fallback failed: %s", _qae) + _stepan_doc_ctx = { + "doc_id": _dc.doc_id, + "title": getattr(_dc, "file_name", "") or "", + "extracted_summary": _extracted, + "file_unique_id": _dc.doc_id, + } + logger.info("Doc Bridge: session-scoped doc_id=%s found=true", _dc.doc_id) + except Exception as _dce: + logger.debug("Doc Bridge: could not fetch doc_context: %s", _dce) + + # Chat History Bridge (v3.2): передаємо history з memory-service в Степана. + # Степан інакше не має доступу до переписки — він викликається поза Router pipeline. + _stepan_chat_history: str = "" + try: + _ctx = await memory_client.get_context( + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + team_id=os.getenv("AGX_DAO_ID", "agromatrix-dao"), + channel_id=chat_id, + limit=40, + ) + _stepan_chat_history = _ctx.get("local_context_text", "") or "" + # Якщо в history є документ — і _stepan_doc_ctx порожній, шукаємо в history + if not _stepan_doc_ctx and _stepan_chat_history: + _doc_in_history = _find_doc_in_history(_stepan_chat_history) + if _doc_in_history: + _stepan_doc_ctx = _doc_in_history + logger.info("Doc Bridge: found doc reference in chat history: %s", + _doc_in_history.get("title", "")) + except Exception as _che: + logger.debug("Chat History Bridge failed (non-blocking): %s", _che) + started = time.time() last_pending = _get_last_pending(chat_id) response_text = await asyncio.wait_for( - asyncio.to_thread(handle_message, text, user_id, chat_id, trace_id, ops_mode, last_pending), - timeout=25 + asyncio.to_thread( + handle_message, text, user_id, chat_id, trace_id, ops_mode, last_pending, + None, None, bool(_stepan_doc_ctx), _stepan_doc_ctx, + _stepan_chat_history, + ), + timeout=55 ) duration_ms = int((time.time() - started) * 1000) except Exception as e: logger.error(f"Stepan handler error: {e}; trace_id={trace_id}") - response_text = f"Помилка обробки. trace_id={trace_id}" + # SANITIZE: без trace_id для юзера (trace_id тільки в логах) + response_text = "Щось пішло не так. Спробуй ще раз або переформулюй запит." duration_ms = 0 # If JSON, try to show summary @@ -1078,35 +1426,19 @@ async def agromatrix_telegram_webhook(update: TelegramUpdate): if user_id and user_id in op_ids: is_ops = True - # Operator NL or operator slash commands -> handle via Stepan handler. - # Important: do NOT treat generic slash commands (/start, /agromatrix) as operator commands, - # otherwise regular users will see "Недостатньо прав" or Stepan errors. - operator_slash_cmds = { - "whoami", - "pending", - "pending_show", - "approve", - "reject", - "apply_dict", - "pending_stats", - } - slash_cmd = "" - if is_slash: - try: - slash_cmd = (msg_text.strip().split()[0].lstrip("/").strip().lower()) - except Exception: - slash_cmd = "" - is_operator_slash = bool(slash_cmd) and slash_cmd in operator_slash_cmds - - # Stepan handler currently depends on ChatOpenAI (OPENAI_API_KEY). If key is not configured, - # never route production traffic there (avoid "Помилка обробки..." and webhook 5xx). - stepan_enabled = bool(os.getenv("OPENAI_API_KEY", "").strip()) - if stepan_enabled and (is_ops or is_operator_slash): + # Operator: any message (not only slash) goes to Stepan when is_ops. + # v3: stepan_enabled checks DEEPSEEK_API_KEY (preferred) OR OPENAI_API_KEY (fallback) + stepan_enabled = bool( + os.getenv("DEEPSEEK_API_KEY", "").strip() + or os.getenv("OPENAI_API_KEY", "").strip() + ) + if stepan_enabled and is_ops: return await handle_stepan_message(update, AGROMATRIX_CONFIG) - if (is_ops or is_operator_slash) and not stepan_enabled: + if is_ops and not stepan_enabled: logger.warning( - "Stepan handler disabled (OPENAI_API_KEY missing); falling back to Router pipeline " - f"for chat_id={chat_id}, user_id={user_id}, slash_cmd={slash_cmd!r}" + "Stepan handler disabled (no DEEPSEEK_API_KEY / OPENAI_API_KEY); " + "falling back to Router pipeline " + f"for chat_id={chat_id}, user_id={user_id}" ) # General conversation -> standard Router pipeline (like all other agents) @@ -1672,11 +2004,16 @@ async def process_photo( # Telegram sends multiple sizes, get the largest one (last in array) photo_obj = photo[-1] if isinstance(photo, list) else photo file_id = photo_obj.get("file_id") if isinstance(photo_obj, dict) else None - + # file_unique_id стабільний між розмірами — використовуємо як lock key + file_unique_id: str | None = (photo_obj.get("file_unique_id") if isinstance(photo_obj, dict) else None) or None + if not file_id: return {"ok": False, "error": "No file_id in photo"} - - logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}") + + logger.info( + "%s: Photo from %s (tg:%s), file_id: %s file_unique_id: %s", + agent_config.name, username, user_id, file_id, file_unique_id or "n/a", + ) _set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id) if agent_config.agent_id == "agromatrix": await _set_agromatrix_last_photo_ref( @@ -1717,7 +2054,28 @@ async def process_photo( username=username, ) return {"ok": True, "skipped": True, "reason": "media_no_question"} - + + # ── VISION CONSISTENCY GUARD: Rule 1 ───────────────────────────────────── + # Те саме фото (file_unique_id або file_id) вже аналізувалось → + # повертаємо збережений результат без запиту до Router. + # reeval_request → clear_lock → продовжуємо до Router. + _vg_caption_text = caption.strip() if caption else "" + if agent_config.agent_id == "agromatrix" and _vg_should_skip( + agent_config.agent_id, chat_id, file_id, _vg_caption_text, + file_unique_id=file_unique_id, + ): + _vg_lock = _vg_get_lock(agent_config.agent_id, chat_id) + _vg_reply = _vg_build_locked_reply(_vg_lock, _vg_caption_text) + logger.info( + "vision_skip_reanalysis agent=%s chat_id=%s photo_key=%s label=%s", + agent_config.agent_id, chat_id, + file_unique_id or file_id, _vg_lock.get("label", "?"), + ) + telegram_token = agent_config.get_telegram_token() or "" + if telegram_token: + await send_telegram_message(chat_id, _vg_reply, telegram_token) + return {"ok": True, "skipped": True, "reason": "vision_lock_same_photo"} + try: # Get file path from Telegram telegram_token = agent_config.get_telegram_token() @@ -1796,6 +2154,32 @@ async def process_photo( answer_text = response.get("data", {}).get("text") or response.get("response", "") if answer_text: + # ── VISION CONSISTENCY GUARD: Hooks A+B ────────────────────── + # A: persist lock (label + confidence) keyed by file_unique_id + if agent_config.agent_id == "agromatrix": + try: + _vg_label, _vg_conf = _vg_extract_label(answer_text) + _vg_set_lock( + agent_config.agent_id, chat_id, file_id, + _vg_label, _vg_conf, + file_unique_id=file_unique_id, + ) + logger.info( + "vision_lock_set agent=%s chat_id=%s photo_key=%s label=%s conf=%s", + agent_config.agent_id, chat_id, + file_unique_id or file_id, _vg_label, _vg_conf, + ) + except Exception: + pass + # B: low-confidence → append clarifier if not already present + answer_text, _vg_low_added = _vg_build_low_conf(answer_text) + if _vg_low_added: + logger.info( + "vision_low_conf_clarifier_added agent=%s chat_id=%s", + agent_config.agent_id, chat_id, + ) + # ───────────────────────────────────────────────────────────── + # Photo processed - send LLM response directly await send_telegram_message( chat_id, @@ -1941,7 +2325,8 @@ async def process_document( dao_id=dao_id, user_id=f"tg:{user_id}", output_mode="qa_pairs", - metadata={"username": username, "chat_id": chat_id} + metadata={"username": username, "chat_id": chat_id}, + agent_id=agent_config.agent_id, ) if not result.success: @@ -1953,7 +2338,42 @@ async def process_document( if not doc_text and result.chunks_meta: chunks = result.chunks_meta.get("chunks", []) doc_text = "\n".join(chunks[:5]) if chunks else "" - + + # v3.2 Doc Bridge: зберігаємо parsed text щоб Stepan міг відповідати на питання + if doc_text and result.doc_id: + try: + from services.doc_service import save_doc_context as _save_doc_ctx + await _save_doc_ctx( + session_id=session_id, + doc_id=result.doc_id, + doc_url=file_url, + file_name=file_name, + dao_id=dao_id, + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + extracted_summary=doc_text[:4000], + ) + logger.info(f"Doc Bridge: saved extracted_summary ({len(doc_text)} chars) for doc_id={result.doc_id}") + except Exception as _dbe: + logger.warning(f"Doc Bridge save_doc_context failed (non-blocking): {_dbe}") + # v3.3 Doc Handoff: зберігаємо chat-scoped ключ (пріоритет для Stepan) + try: + from services.doc_service import _sanitize_summary as _ss + _file_unique = document.get("file_unique_id") or result.doc_id + await save_chat_doc_context( + chat_id=chat_id, + agent_id=agent_config.agent_id, + doc_ctx={ + "doc_id": result.doc_id, + "file_unique_id": _file_unique, + "file_name": file_name, + "extracted_summary": _ss(doc_text)[:4000], + "source": "telegram", + }, + ) + except Exception as _cdbe: + logger.warning("Doc Handoff: save_chat_doc_context failed: %s", _cdbe) + # Ask LLM to summarize the document (human-friendly) if doc_text: zip_hint = None @@ -3097,7 +3517,7 @@ async def handle_telegram_webhook( # Check if there's a document context for follow-up questions session_id = f"telegram:{chat_id}" - doc_context = await get_doc_context(session_id) + doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id) # If there's a doc_id and the message looks like a question about the document if doc_context and doc_context.doc_id: @@ -3788,7 +4208,8 @@ async def _old_telegram_webhook(update: TelegramUpdate): dao_id=dao_id, user_id=f"tg:{user_id}", output_mode="qa_pairs", - metadata={"username": username, "chat_id": chat_id} + metadata={"username": username, "chat_id": chat_id}, + agent_id=agent_config.agent_id, ) if not result.success: @@ -3991,7 +4412,7 @@ async def _old_telegram_webhook(update: TelegramUpdate): # Check if there's a document context for follow-up questions session_id = f"telegram:{chat_id}" - doc_context = await get_doc_context(session_id) + doc_context = await get_doc_context(session_id, agent_id=agent_config.agent_id) # If there's a doc_id and the message looks like a question about the document if doc_context and doc_context.doc_id: diff --git a/gateway-bot/services/doc_service.py b/gateway-bot/services/doc_service.py index 4ad2a691..6fa26b49 100644 --- a/gateway-bot/services/doc_service.py +++ b/gateway-bot/services/doc_service.py @@ -11,23 +11,18 @@ This service can be used by: import os import logging import hashlib -import base64 import json import re from typing import Optional, Dict, Any, List from pydantic import BaseModel from datetime import datetime -from io import BytesIO +from router_client import send_to_router from memory_client import memory_client logger = logging.getLogger(__name__) SHARED_EXCEL_POLICY_AGENTS = {"agromatrix", "helion", "nutra", "greenfood"} -ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000") -ARTIFACT_REGISTRY_URL = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9220").rstrip("/") -DOC_WRITEBACK_CREATED_BY = os.getenv("DOC_WRITEBACK_CREATED_BY", "gateway-doc-service") -GATEWAY_PUBLIC_BASE_URL = os.getenv("GATEWAY_PUBLIC_BASE_URL", "").rstrip("/") class QAItem(BaseModel): @@ -56,35 +51,6 @@ class IngestResult(BaseModel): error: Optional[str] = None -class UpdateResult(BaseModel): - """Result of document update with version bump.""" - success: bool - doc_id: Optional[str] = None - version_no: Optional[int] = None - version_id: Optional[int] = None - updated_chunks: int = 0 - status: str = "unknown" - publish_error: Optional[str] = None - artifact_id: Optional[str] = None - artifact_version_id: Optional[str] = None - artifact_storage_key: Optional[str] = None - artifact_mime: Optional[str] = None - artifact_download_url: Optional[str] = None - error: Optional[str] = None - - -class PublishResult(BaseModel): - """Result of artifact write-back publish.""" - success: bool - artifact_id: Optional[str] = None - version_id: Optional[str] = None - storage_key: Optional[str] = None - mime: Optional[str] = None - file_name: Optional[str] = None - download_url: Optional[str] = None - error: Optional[str] = None - - class QAResult(BaseModel): """Result of RAG query about a document""" success: bool @@ -102,6 +68,7 @@ class DocContext(BaseModel): doc_url: Optional[str] = None file_name: Optional[str] = None saved_at: Optional[str] = None + extracted_summary: Optional[str] = None # v3.2: parsed text snippet для in-context LLM class DocumentService: @@ -118,266 +85,6 @@ class DocumentService: """Initialize document service""" self.memory_client = memory_client - async def _router_post_json( - self, - path: str, - payload: Dict[str, Any], - timeout: float = 45.0, - ) -> Dict[str, Any]: - import httpx - - base = ROUTER_URL.rstrip("/") - url = f"{base}{path}" - async with httpx.AsyncClient(timeout=timeout) as client: - resp = await client.post(url, json=payload) - body = {} - try: - body = resp.json() - except Exception: - body = {"ok": False, "error": f"Invalid JSON from router ({resp.status_code})"} - if resp.status_code >= 400: - err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}" - raise RuntimeError(f"Router error on {path}: {err}") - return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"} - - async def _router_get_json( - self, - path: str, - timeout: float = 30.0, - ) -> Dict[str, Any]: - import httpx - - base = ROUTER_URL.rstrip("/") - url = f"{base}{path}" - async with httpx.AsyncClient(timeout=timeout) as client: - resp = await client.get(url) - body = {} - try: - body = resp.json() - except Exception: - body = {"ok": False, "error": f"Invalid JSON from router ({resp.status_code})"} - if resp.status_code >= 400: - err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}" - raise RuntimeError(f"Router error on {path}: {err}") - return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"} - - async def _artifact_post_json( - self, - path: str, - payload: Dict[str, Any], - timeout: float = 45.0, - ) -> Dict[str, Any]: - import httpx - - base = ARTIFACT_REGISTRY_URL.rstrip("/") - url = f"{base}{path}" - async with httpx.AsyncClient(timeout=timeout) as client: - resp = await client.post(url, json=payload) - body = {} - try: - body = resp.json() - except Exception: - body = {"ok": False, "error": f"Invalid JSON from artifact-registry ({resp.status_code})"} - if resp.status_code >= 400: - err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}" - raise RuntimeError(f"Artifact registry error on {path}: {err}") - return body if isinstance(body, dict) else {"ok": False, "error": "Invalid artifact response type"} - - async def _artifact_get_json( - self, - path: str, - timeout: float = 30.0, - ) -> Dict[str, Any]: - import httpx - - base = ARTIFACT_REGISTRY_URL.rstrip("/") - url = f"{base}{path}" - async with httpx.AsyncClient(timeout=timeout) as client: - resp = await client.get(url) - body = {} - try: - body = resp.json() - except Exception: - body = {"ok": False, "error": f"Invalid JSON from artifact-registry ({resp.status_code})"} - if resp.status_code >= 400: - err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}" - raise RuntimeError(f"Artifact registry error on {path}: {err}") - return body if isinstance(body, dict) else {"ok": False, "error": "Invalid artifact response type"} - - def _resolve_format(self, file_name: Optional[str], target_format: Optional[str]) -> str: - fmt = (target_format or "").strip().lower().lstrip(".") - if fmt: - return fmt - if file_name and "." in file_name: - return file_name.rsplit(".", 1)[1].strip().lower() - return "txt" - - def _compose_output_name(self, file_name: Optional[str], doc_id: str, fmt: str) -> str: - base = "document" - if file_name: - base = file_name.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] - if "." in base: - base = base.rsplit(".", 1)[0] - elif doc_id: - base = doc_id - safe_base = re.sub(r"[^A-Za-z0-9._-]+", "_", base).strip("._") or "document" - return f"{safe_base}.{fmt}" - - def _gateway_artifact_download_path(self, artifact_id: str, version_id: str) -> str: - aid = (artifact_id or "").strip() - vid = (version_id or "").strip() - return f"/api/doc/artifacts/{aid}/versions/{vid}/download" - - def _gateway_artifact_download_url(self, artifact_id: str, version_id: str) -> str: - path = self._gateway_artifact_download_path(artifact_id, version_id) - if GATEWAY_PUBLIC_BASE_URL: - return f"{GATEWAY_PUBLIC_BASE_URL}{path}" - return path - - def _render_document_bytes( - self, - text: str, - file_name: Optional[str], - doc_id: str, - target_format: Optional[str] = None, - ) -> Dict[str, Any]: - body = (text or "").strip() - if not body: - raise ValueError("Cannot render empty document text") - - fmt = self._resolve_format(file_name=file_name, target_format=target_format) - output_name = self._compose_output_name(file_name=file_name, doc_id=doc_id, fmt=fmt) - - if fmt in {"txt"}: - payload = body.encode("utf-8") - return {"bytes": payload, "mime": "text/plain; charset=utf-8", "file_name": output_name} - if fmt in {"md", "markdown"}: - payload = body.encode("utf-8") - return {"bytes": payload, "mime": "text/markdown; charset=utf-8", "file_name": output_name} - if fmt in {"json"}: - parsed: Any - try: - parsed = json.loads(body) - except Exception: - parsed = {"text": body} - payload = json.dumps(parsed, ensure_ascii=False, indent=2).encode("utf-8") - return {"bytes": payload, "mime": "application/json", "file_name": output_name} - if fmt in {"csv"}: - payload = body.encode("utf-8") - return {"bytes": payload, "mime": "text/csv; charset=utf-8", "file_name": output_name} - if fmt in {"xlsx", "xlsm", "xls"}: - try: - from openpyxl import Workbook - except Exception as e: - raise RuntimeError(f"openpyxl is required for {fmt} rendering: {e}") - wb = Workbook() - ws = wb.active - ws.title = "Document" - lines = [ln for ln in body.splitlines()] or [body] - for idx, line in enumerate(lines, start=1): - ws.cell(row=idx, column=1, value=line) - buf = BytesIO() - wb.save(buf) - mime = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - return {"bytes": buf.getvalue(), "mime": mime, "file_name": self._compose_output_name(file_name, doc_id, "xlsx")} - if fmt in {"docx"}: - try: - from docx import Document - except Exception as e: - raise RuntimeError(f"python-docx is required for docx rendering: {e}") - doc = Document() - for line in body.splitlines(): - doc.add_paragraph(line if line else " ") - buf = BytesIO() - doc.save(buf) - mime = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" - return {"bytes": buf.getvalue(), "mime": mime, "file_name": self._compose_output_name(file_name, doc_id, "docx")} - - payload = body.encode("utf-8") - fallback_name = self._compose_output_name(file_name=file_name, doc_id=doc_id, fmt="txt") - return {"bytes": payload, "mime": "text/plain; charset=utf-8", "file_name": fallback_name} - - async def _publish_text_artifact( - self, - text: str, - doc_id: str, - file_name: Optional[str] = None, - dao_id: Optional[str] = None, - user_id: Optional[str] = None, - artifact_id: Optional[str] = None, - target_format: Optional[str] = None, - label: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, - ) -> PublishResult: - try: - rendered = self._render_document_bytes( - text=text, - file_name=file_name, - doc_id=doc_id, - target_format=target_format, - ) - content_bytes = rendered["bytes"] - content_b64 = base64.b64encode(content_bytes).decode("ascii") - - effective_artifact_id = (artifact_id or "").strip() - if not effective_artifact_id: - create_resp = await self._artifact_post_json( - "/artifacts", - { - "type": "doc", - "title": file_name or doc_id, - "project_id": dao_id, - "acl_ref": dao_id, - "created_by": user_id or DOC_WRITEBACK_CREATED_BY, - }, - timeout=30.0, - ) - effective_artifact_id = str(create_resp.get("artifact_id") or "").strip() - if not effective_artifact_id: - return PublishResult(success=False, error="Artifact create failed: empty artifact_id") - - meta = {"doc_id": doc_id, "source": "doc_update_publish"} - if isinstance(metadata, dict): - meta.update(metadata) - - version_resp = await self._artifact_post_json( - f"/artifacts/{effective_artifact_id}/versions/from_base64", - { - "content_base64": content_b64, - "mime": rendered["mime"], - "filename": rendered["file_name"], - "label": label or "edited", - "meta_json": meta, - }, - timeout=45.0, - ) - version_id = str(version_resp.get("version_id") or "").strip() - storage_key = version_resp.get("storage_key") - if not version_id: - return PublishResult( - success=False, - artifact_id=effective_artifact_id, - error="Artifact version create failed: empty version_id", - ) - - download_url = self._gateway_artifact_download_url( - artifact_id=effective_artifact_id, - version_id=version_id, - ) - - return PublishResult( - success=True, - artifact_id=effective_artifact_id, - version_id=version_id, - storage_key=storage_key, - mime=rendered["mime"], - file_name=rendered["file_name"], - download_url=download_url, - ) - except Exception as e: - logger.error(f"publish_text_artifact failed: {e}", exc_info=True) - return PublishResult(success=False, error=str(e)) - def _is_excel_filename(self, file_name: Optional[str]) -> bool: if not file_name: return False @@ -492,37 +199,33 @@ class DocumentService: file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, + agent_id: Optional[str] = None, + extracted_summary: Optional[str] = None, # v3.2: зміст документа для Stepan ) -> bool: """ - Save document context for a session. - - Uses Memory Service to persist document context across channels. + Save document context for a session (scoped by agent_id to avoid cross-agent leak). Args: - session_id: Session identifier (e.g., "telegram:123", "web:user456") + session_id: Session identifier doc_id: Document ID from parser - doc_url: Optional document URL - file_name: Optional file name - dao_id: Optional DAO ID - - Returns: - True if saved successfully + agent_id: Optional; if set, context is isolated per agent (key: doc_context:{agent_id}:{session_id}). + extracted_summary: Optional parsed text / summary of the document (for in-context LLM use). """ try: - # Use stable synthetic user key per session, so context can be - # retrieved later using only session_id (without caller user_id). - fact_user_id = f"session:{session_id}" - - # Save as fact in Memory Service - fact_key = f"doc_context:{session_id}" + aid = (agent_id or "default").lower() + fact_user_id = f"session:{aid}:{session_id}" + fact_key = f"doc_context:{aid}:{session_id}" fact_value_json = { "doc_id": doc_id, "doc_url": doc_url, "file_name": file_name, "dao_id": dao_id, "user_id": user_id, - "saved_at": datetime.utcnow().isoformat() + "saved_at": datetime.utcnow().isoformat(), } + # Зберігаємо перші 4000 символів parsed тексту щоб Stepan мав реальний зміст + if extracted_summary: + fact_value_json["extracted_summary"] = extracted_summary[:4000] result = await self.memory_client.upsert_fact( user_id=fact_user_id, @@ -533,36 +236,28 @@ class DocumentService: team_id=None, ) - logger.info(f"Saved doc context for session {session_id}: doc_id={doc_id}") + logger.info(f"Saved doc context for session {session_id} agent={aid}: doc_id={doc_id}") return result except Exception as e: logger.error(f"Failed to save doc context: {e}", exc_info=True) return False - async def get_doc_context(self, session_id: str) -> Optional[DocContext]: + async def get_doc_context(self, session_id: str, agent_id: Optional[str] = None) -> Optional[DocContext]: """ - Get document context for a session. - - Args: - session_id: Session identifier - - Returns: - DocContext or None + Get document context for a session (scoped by agent_id when provided). + Backward-compat: if new key missing, tries legacy doc_context:{session_id} (read-only). """ try: - user_id = f"session:{session_id}" - - fact_key = f"doc_context:{session_id}" - - # Get fact from Memory Service + aid = (agent_id or "default").lower() + user_id = f"session:{aid}:{session_id}" + fact_key = f"doc_context:{aid}:{session_id}" fact = await self.memory_client.get_fact( user_id=user_id, fact_key=fact_key ) - if fact and fact.get("fact_value_json"): - logger.debug(f"Retrieved doc context for session {session_id}") + logger.debug(f"Retrieved doc context for session {session_id} agent={aid}") ctx_data = fact.get("fact_value_json") if isinstance(ctx_data, str): try: @@ -571,9 +266,23 @@ class DocumentService: logger.warning("doc_context fact_value_json is not valid JSON string") return None return DocContext(**ctx_data) - + # Backward-compat: legacy key + legacy_user_id = f"session:{session_id}" + legacy_key = f"doc_context:{session_id}" + fact_legacy = await self.memory_client.get_fact( + user_id=legacy_user_id, + fact_key=legacy_key + ) + if fact_legacy and fact_legacy.get("fact_value_json"): + logger.debug(f"Retrieved doc context from legacy key for session {session_id}") + ctx_data = fact_legacy.get("fact_value_json") + if isinstance(ctx_data, str): + try: + ctx_data = json.loads(ctx_data) + except Exception: + return None + return DocContext(**ctx_data) return None - except Exception as e: logger.error(f"Failed to get doc context: {e}", exc_info=True) return None @@ -586,7 +295,8 @@ class DocumentService: dao_id: str, user_id: str, output_mode: str = "qa_pairs", - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None, + agent_id: Optional[str] = None, ) -> ParsedResult: """ Parse a document directly through Swapper service. @@ -666,7 +376,6 @@ class DocumentService: # Generate a simple doc_id based on filename and timestamp doc_id = hashlib.md5(f"{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12] - # Save document context for follow-up queries await self.save_doc_context( session_id=session_id, doc_id=doc_id, @@ -674,6 +383,7 @@ class DocumentService: file_name=file_name, dao_id=dao_id, user_id=user_id, + agent_id=agent_id, ) # Convert text to markdown format @@ -727,6 +437,7 @@ class DocumentService: file_name=file_name, dao_id=dao_id, user_id=user_id, + agent_id=agent_id, ) return ParsedResult( @@ -756,8 +467,7 @@ class DocumentService: doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: str = None, - user_id: str = None, - agent_id: str = "daarwizz", + user_id: str = None ) -> IngestResult: """ Ingest document chunks into RAG/Memory. @@ -783,60 +493,64 @@ class DocumentService: file_name = file_name or doc_context.file_name dao_id = dao_id or doc_context.dao_id - if not doc_url: + if not doc_id and not doc_url: return IngestResult( success=False, - error="No document URL available for ingest" + error="No document ID or URL provided" ) - - parsed = await self.parse_document( - session_id=session_id, - doc_url=doc_url, - file_name=file_name or "document", - dao_id=dao_id or "", - user_id=user_id or "", - output_mode="markdown", - metadata={"source": self._extract_source(session_id), "mode": "ingest"}, - ) - if not parsed.success: - return IngestResult(success=False, error=parsed.error or "Document parse failed") - - effective_doc_id = doc_id or parsed.doc_id - if not effective_doc_id: - effective_doc_id = hashlib.md5(f"{session_id}:{file_name}:{datetime.utcnow().isoformat()}".encode()).hexdigest()[:12] - - doc_text = (parsed.markdown or "").strip() - if not doc_text: - return IngestResult(success=False, error="No extractable text for ingestion") - - payload = { - "agent_id": (agent_id or "daarwizz").lower(), - "doc_id": effective_doc_id, - "file_name": file_name or "document", - "text": doc_text, - "dao_id": dao_id, - "user_id": user_id, + + # Build request to Router with ingest flag + router_request = { + "mode": "doc_parse", + "agent": "parser", "metadata": { - "session_id": session_id, "source": self._extract_source(session_id), + "dao_id": dao_id, + "user_id": user_id, + "session_id": session_id, + }, + "payload": { + "output_mode": "chunks", # Use chunks for RAG ingestion + "dao_id": dao_id, + "user_id": user_id, + "ingest": True, # Flag for ingestion }, } - response = await self._router_post_json("/v1/documents/ingest", payload, timeout=90.0) - - if response.get("ok"): + + if doc_url: + router_request["payload"]["doc_url"] = doc_url + router_request["payload"]["file_name"] = file_name or "document.pdf" + + if doc_id: + router_request["payload"]["doc_id"] = doc_id + + logger.info(f"Ingesting document: session={session_id}, doc_id={doc_id}") + + # Send to Router + response = await send_to_router(router_request) + + if not isinstance(response, dict): + return IngestResult( + success=False, + error="Invalid response from router" + ) + + data = response.get("data", {}) + chunks = data.get("chunks", []) + + if chunks: return IngestResult( success=True, - doc_id=response.get("doc_id") or effective_doc_id, - ingested_chunks=int(response.get("chunks_stored", 0) or 0), - status="ingested", + doc_id=doc_id or data.get("doc_id"), + ingested_chunks=len(chunks), + status="ingested" + ) + else: + return IngestResult( + success=False, + status="failed", + error="No chunks to ingest" ) - - return IngestResult( - success=False, - doc_id=effective_doc_id, - status="failed", - error=response.get("error", "Router ingest failed"), - ) except Exception as e: logger.error(f"Document ingestion failed: {e}", exc_info=True) @@ -844,245 +558,6 @@ class DocumentService: success=False, error=str(e) ) - - async def update_document( - self, - session_id: str, - doc_id: Optional[str] = None, - doc_url: Optional[str] = None, - file_name: Optional[str] = None, - text: Optional[str] = None, - dao_id: Optional[str] = None, - user_id: Optional[str] = None, - agent_id: str = "daarwizz", - storage_ref: Optional[str] = None, - publish_artifact: bool = False, - artifact_id: Optional[str] = None, - target_format: Optional[str] = None, - artifact_label: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, - ) -> UpdateResult: - """ - Update existing document content and bump version in router memory. - """ - try: - context = await self.get_doc_context(session_id) - if context: - if not doc_id: - doc_id = context.doc_id - if not doc_url: - doc_url = context.doc_url - if not file_name: - file_name = context.file_name - if not dao_id: - dao_id = context.dao_id - - if not doc_id: - return UpdateResult( - success=False, - status="failed", - error="No document context found. Provide doc_id or parse/ingest first.", - ) - - effective_text = (text or "").strip() - if not effective_text: - if not doc_url: - return UpdateResult( - success=False, - doc_id=doc_id, - status="failed", - error="No text or doc_url provided for update", - ) - parsed = await self.parse_document( - session_id=session_id, - doc_url=doc_url, - file_name=file_name or "document", - dao_id=dao_id or "", - user_id=user_id or "", - output_mode="markdown", - metadata={"source": self._extract_source(session_id), "mode": "update"}, - ) - if not parsed.success: - return UpdateResult( - success=False, - doc_id=doc_id, - status="failed", - error=parsed.error or "Document parse failed", - ) - effective_text = (parsed.markdown or "").strip() - - if not effective_text: - return UpdateResult( - success=False, - doc_id=doc_id, - status="failed", - error="No extractable text for update", - ) - - meta = { - "session_id": session_id, - "source": self._extract_source(session_id), - } - if isinstance(metadata, dict): - meta.update(metadata) - - response = await self._router_post_json( - "/v1/documents/update", - { - "agent_id": (agent_id or "daarwizz").lower(), - "doc_id": doc_id, - "file_name": file_name, - "text": effective_text, - "dao_id": dao_id, - "user_id": user_id, - "storage_ref": storage_ref, - "metadata": meta, - }, - timeout=90.0, - ) - - if not response.get("ok"): - return UpdateResult( - success=False, - doc_id=doc_id, - status="failed", - error=response.get("error", "Router update failed"), - ) - - await self.save_doc_context( - session_id=session_id, - doc_id=doc_id, - doc_url=doc_url, - file_name=file_name, - dao_id=dao_id, - user_id=user_id, - ) - - publish = PublishResult(success=False) - if publish_artifact: - publish = await self._publish_text_artifact( - text=effective_text, - doc_id=doc_id, - file_name=file_name, - dao_id=dao_id, - user_id=user_id, - artifact_id=artifact_id, - target_format=target_format, - label=artifact_label, - metadata=meta, - ) - - return UpdateResult( - success=True, - doc_id=response.get("doc_id") or doc_id, - version_no=int(response.get("version_no", 0) or 0) or None, - version_id=int(response.get("version_id", 0) or 0) or None, - updated_chunks=int(response.get("chunks_stored", 0) or 0), - status="updated_published" if publish_artifact and publish.success else ("updated_publish_failed" if publish_artifact else "updated"), - publish_error=publish.error if publish_artifact and not publish.success else None, - artifact_id=publish.artifact_id if publish_artifact else None, - artifact_version_id=publish.version_id if publish_artifact else None, - artifact_storage_key=publish.storage_key if publish_artifact else None, - artifact_mime=publish.mime if publish_artifact else None, - artifact_download_url=publish.download_url if publish_artifact else None, - ) - except Exception as e: - logger.error(f"Document update failed: {e}", exc_info=True) - return UpdateResult( - success=False, - doc_id=doc_id, - status="failed", - error=str(e), - ) - - async def list_document_versions( - self, - agent_id: str, - doc_id: str, - limit: int = 20, - ) -> Dict[str, Any]: - aid = (agent_id or "daarwizz").lower() - did = (doc_id or "").strip() - if not did: - return {"ok": False, "error": "doc_id is required", "items": []} - try: - response = await self._router_get_json( - f"/v1/documents/{did}/versions?agent_id={aid}&limit={max(1, min(int(limit or 20), 200))}", - timeout=30.0, - ) - return response if isinstance(response, dict) else {"ok": False, "error": "invalid_response", "items": []} - except Exception as e: - logger.error(f"list_document_versions failed: {e}") - return {"ok": False, "error": str(e), "items": []} - - async def publish_document_artifact( - self, - session_id: str, - doc_id: Optional[str] = None, - doc_url: Optional[str] = None, - file_name: Optional[str] = None, - text: Optional[str] = None, - dao_id: Optional[str] = None, - user_id: Optional[str] = None, - artifact_id: Optional[str] = None, - target_format: Optional[str] = None, - artifact_label: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, - ) -> PublishResult: - """ - Publish text as a physical artifact version (.docx/.xlsx/.txt/...) without changing RAG index. - """ - try: - context = await self.get_doc_context(session_id) - if context: - if not doc_id: - doc_id = context.doc_id - if not doc_url: - doc_url = context.doc_url - if not file_name: - file_name = context.file_name - if not dao_id: - dao_id = context.dao_id - if not user_id: - user_id = context.user_id - - if not doc_id: - return PublishResult(success=False, error="doc_id is required") - - body = (text or "").strip() - if not body: - if not doc_url: - return PublishResult(success=False, error="text or doc_url is required") - parsed = await self.parse_document( - session_id=session_id, - doc_url=doc_url, - file_name=file_name or "document", - dao_id=dao_id or "", - user_id=user_id or "", - output_mode="markdown", - metadata={"source": self._extract_source(session_id), "mode": "publish"}, - ) - if not parsed.success: - return PublishResult(success=False, error=parsed.error or "Document parse failed") - body = (parsed.markdown or "").strip() - - if not body: - return PublishResult(success=False, error="No text available for publish") - - return await self._publish_text_artifact( - text=body, - doc_id=doc_id, - file_name=file_name, - dao_id=dao_id, - user_id=user_id, - artifact_id=artifact_id, - target_format=target_format, - label=artifact_label, - metadata=metadata, - ) - except Exception as e: - logger.error(f"publish_document_artifact failed: {e}", exc_info=True) - return PublishResult(success=False, error=str(e)) async def ask_about_document( self, @@ -1155,30 +630,44 @@ class DocumentService: }], ) - logger.info( - f"RAG query: agent={agent_id}, session={session_id}, question={question[:50]}, doc_id={doc_id}" - ) - - response = await self._router_post_json( - "/v1/documents/query", - { - "agent_id": (agent_id or "daarwizz").lower(), - "question": question, - "doc_id": doc_id, + # Build RAG query request — DETERMINISTIC (PROMPT 25) + # top_k=8, temperature=0, no rerank randomness + router_request = { + "mode": "rag_query", + "agent": agent_id, + "metadata": { + "source": self._extract_source(session_id), "dao_id": dao_id, "user_id": user_id, - "limit": 5, + "session_id": session_id, }, - timeout=60.0, + "payload": { + "question": question, + "dao_id": dao_id, + "user_id": user_id, + "doc_id": doc_id, + "top_k": 8, + "temperature": 0, + "rerank": False, + "include_neighbors": True, + }, + } + + logger.info( + "RAG query (deterministic k=8): agent=%s, session=%s, question=%s, doc_id=%s", + agent_id, session_id, question[:50], doc_id, ) - if isinstance(response, dict) and not response.get("ok", False): + # Send to Router + response = await send_to_router(router_request) + + if not isinstance(response, dict): return QAResult( success=False, - error=response.get("error", "Document query failed"), + error="Invalid response from router" ) - - data = response.get("data", {}) if isinstance(response, dict) else {} + + data = response.get("data", {}) answer = data.get("answer") or data.get("text") sources = data.get("citations", []) or data.get("sources", []) @@ -1219,9 +708,10 @@ async def parse_document( dao_id: str, user_id: str, output_mode: str = "qa_pairs", - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None, + agent_id: Optional[str] = None, ) -> ParsedResult: - """Parse a document through DAGI Router""" + """Parse a document (agent_id scopes doc_context key).""" return await doc_service.parse_document( session_id=session_id, doc_url=doc_url, @@ -1229,7 +719,8 @@ async def parse_document( dao_id=dao_id, user_id=user_id, output_mode=output_mode, - metadata=metadata + metadata=metadata, + agent_id=agent_id, ) @@ -1239,8 +730,7 @@ async def ingest_document( doc_url: Optional[str] = None, file_name: Optional[str] = None, dao_id: Optional[str] = None, - user_id: Optional[str] = None, - agent_id: str = "daarwizz", + user_id: Optional[str] = None ) -> IngestResult: """Ingest document chunks into RAG/Memory""" return await doc_service.ingest_document( @@ -1249,8 +739,7 @@ async def ingest_document( doc_url=doc_url, file_name=file_name, dao_id=dao_id, - user_id=user_id, - agent_id=agent_id, + user_id=user_id ) @@ -1273,79 +762,6 @@ async def ask_about_document( ) -async def update_document( - session_id: str, - doc_id: Optional[str] = None, - doc_url: Optional[str] = None, - file_name: Optional[str] = None, - text: Optional[str] = None, - dao_id: Optional[str] = None, - user_id: Optional[str] = None, - agent_id: str = "daarwizz", - storage_ref: Optional[str] = None, - publish_artifact: bool = False, - artifact_id: Optional[str] = None, - target_format: Optional[str] = None, - artifact_label: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, -) -> UpdateResult: - """Update document chunks and bump version.""" - return await doc_service.update_document( - session_id=session_id, - doc_id=doc_id, - doc_url=doc_url, - file_name=file_name, - text=text, - dao_id=dao_id, - user_id=user_id, - agent_id=agent_id, - storage_ref=storage_ref, - publish_artifact=publish_artifact, - artifact_id=artifact_id, - target_format=target_format, - artifact_label=artifact_label, - metadata=metadata, - ) - - -async def list_document_versions(agent_id: str, doc_id: str, limit: int = 20) -> Dict[str, Any]: - """List document versions from router.""" - return await doc_service.list_document_versions( - agent_id=agent_id, - doc_id=doc_id, - limit=limit, - ) - - -async def publish_document_artifact( - session_id: str, - doc_id: Optional[str] = None, - doc_url: Optional[str] = None, - file_name: Optional[str] = None, - text: Optional[str] = None, - dao_id: Optional[str] = None, - user_id: Optional[str] = None, - artifact_id: Optional[str] = None, - target_format: Optional[str] = None, - artifact_label: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None, -) -> PublishResult: - """Publish physical artifact version for document text.""" - return await doc_service.publish_document_artifact( - session_id=session_id, - doc_id=doc_id, - doc_url=doc_url, - file_name=file_name, - text=text, - dao_id=dao_id, - user_id=user_id, - artifact_id=artifact_id, - target_format=target_format, - artifact_label=artifact_label, - metadata=metadata, - ) - - async def save_doc_context( session_id: str, doc_id: str, @@ -1353,8 +769,9 @@ async def save_doc_context( file_name: Optional[str] = None, dao_id: Optional[str] = None, user_id: Optional[str] = None, + agent_id: Optional[str] = None, ) -> bool: - """Save document context for a session""" + """Save document context for a session (scoped by agent_id when provided).""" return await doc_service.save_doc_context( session_id=session_id, doc_id=doc_id, @@ -1362,9 +779,271 @@ async def save_doc_context( file_name=file_name, dao_id=dao_id, user_id=user_id, + agent_id=agent_id, ) -async def get_doc_context(session_id: str) -> Optional[DocContext]: - """Get document context for a session""" - return await doc_service.get_doc_context(session_id) +async def get_doc_context(session_id: str, agent_id: Optional[str] = None) -> Optional[DocContext]: + """Get document context for a session (scoped by agent_id when provided).""" + return await doc_service.get_doc_context(session_id, agent_id=agent_id) + + +# --------------------------------------------------------------------------- +# Chat-scoped doc_context (PROMPT 28 / v3.3) +# Ключ: doc_context_chat:{agent_id}:{chat_id} +# Пріоритет вищий ніж session-scoped, бо в Telegram "файл → питання" +# може прийти з різними session_id між update-ами. +# --------------------------------------------------------------------------- + +# Regex для sanitize extracted_summary (Fix B) +import re as _re +_RAG_PREFIX_RE = _re.compile(r"^\[RAG[^\]]*\]:\s*", _re.IGNORECASE) +_TRACE_ID_RE = _re.compile(r"\btrace_id=[\w\-]{4,}\b", _re.IGNORECASE) + + +def _sanitize_summary(text: str) -> str: + """Прибрати технічні префікси/артефакти з extracted_summary перед збереженням у LLM-контекст.""" + if not text: + return text + text = _RAG_PREFIX_RE.sub("", text) + text = _TRACE_ID_RE.sub("", text) + return text.strip() + + +async def save_chat_doc_context( + chat_id: str, + agent_id: str, + doc_ctx: dict, +) -> bool: + """ + Зберегти активний doc_context для чату (chat-scoped). + + doc_ctx: {doc_id, file_unique_id?, file_name?, extracted_summary?, ts?, source?} + Ключ у memory: doc_context_chat:{agent_id}:{chat_id} + + Fix A (dedup): якщо file_unique_id не змінився — no-op. + Fix B (sanitize): видаляємо [RAG...]: та trace_id= перед збереженням. + """ + try: + aid = (agent_id or "default").lower() + fact_user_id = f"chat:{aid}:{chat_id}" + fact_key = f"doc_context_chat:{aid}:{chat_id}" + + # Fix A: dedup — перевіряємо чи файл змінився + new_fuid = doc_ctx.get("file_unique_id") or doc_ctx.get("doc_id") or "" + if new_fuid: + existing = await memory_client.get_fact( + user_id=fact_user_id, fact_key=fact_key + ) + if existing and existing.get("fact_value_json"): + ex_val = existing["fact_value_json"] + if isinstance(ex_val, str): + try: + ex_val = json.loads(ex_val) + except Exception: + ex_val = {} + existing_fuid = ex_val.get("file_unique_id") or ex_val.get("doc_id") or "" + if existing_fuid and existing_fuid == new_fuid: + logger.info("doc_context_chat_unchanged agent=%s chat_id=%s fuid=%s", + aid, chat_id, str(new_fuid)[:16]) + return True # no-op + + payload = dict(doc_ctx) + payload.setdefault("saved_at", datetime.utcnow().isoformat()) + + # Fix B: sanitize + deterministic truncation (Fix 2) + if payload.get("extracted_summary"): + payload["extracted_summary"] = _truncate_by_line( + _sanitize_summary(payload["extracted_summary"]) + ) + + result = await memory_client.upsert_fact( + user_id=fact_user_id, + fact_key=fact_key, + fact_value_json=payload, + team_id=None, + ) + logger.info("doc_context_chat_saved agent=%s chat_id=%s doc_id=%s", + aid, chat_id, str(doc_ctx.get("doc_id", ""))[:16]) + return bool(result) + except Exception as exc: + logger.warning("save_chat_doc_context failed (non-blocking): %s", exc) + return False + + +async def get_chat_doc_context( + chat_id: str, + agent_id: str, +) -> Optional[dict]: + """ + Отримати останній активний doc_context для чату (chat-scoped). + + Повертає dict або None. Пріоритет вищий ніж session-scoped get_doc_context. + Fallback: None (fail-safe). + """ + try: + aid = (agent_id or "default").lower() + fact_user_id = f"chat:{aid}:{chat_id}" + fact_key = f"doc_context_chat:{aid}:{chat_id}" + fact = await memory_client.get_fact( + user_id=fact_user_id, + fact_key=fact_key, + ) + if fact and fact.get("fact_value_json"): + ctx = fact["fact_value_json"] + if isinstance(ctx, str): + try: + ctx = json.loads(ctx) + except Exception: + return None + if ctx.get("doc_id") or ctx.get("file_unique_id"): + logger.info("doc_context_chat_loaded agent=%s chat_id=%s found=true doc_id=%s", + aid, chat_id, ctx.get("doc_id", "")) + return ctx + logger.debug("doc_context_chat_loaded agent=%s chat_id=%s found=false", aid, chat_id) + return None + except Exception as exc: + logger.warning("get_chat_doc_context failed (non-blocking): %s", exc) + return None + + +# --------------------------------------------------------------------------- +# Extract-on-upload helpers (PROMPT 30 / v3.4) +# --------------------------------------------------------------------------- + +async def fetch_telegram_file_bytes(bot_token: str, file_id: str) -> bytes: + """ + Завантажити байти файлу через Telegram Bot API. + Raises: RuntimeError якщо файл недоступний або розмір >15MB. + """ + import httpx + MAX_BYTES = 15 * 1024 * 1024 # 15 MB guard + + async with httpx.AsyncClient(timeout=10.0) as client: + # 1. getFile → file_path + r = await client.get( + f"https://api.telegram.org/bot{bot_token}/getFile", + params={"file_id": file_id}, + ) + r.raise_for_status() + data = r.json() + if not data.get("ok"): + raise RuntimeError(f"Telegram getFile failed: {data.get('description')}") + file_path = data["result"]["file_path"] + + # 2. Download bytes + dl_url = f"https://api.telegram.org/file/bot{bot_token}/{file_path}" + dl = await client.get(dl_url) + dl.raise_for_status() + content = dl.content + if len(content) > MAX_BYTES: + raise RuntimeError(f"File too large: {len(content)} bytes (max {MAX_BYTES})") + return content + + +_SUMMARY_MAX_CHARS = 4000 +_SUMMARY_MAX_SHEETS = 5 +_SUMMARY_MAX_ROWS_TOTAL = 600 + + +def _truncate_by_line(text: str, max_chars: int = _SUMMARY_MAX_CHARS) -> str: + """ + Fix 2: Обрізає текст по межі рядка — ніколи не рве посередині. + Якщо текст коротший за max_chars — повертає як є. + """ + if len(text) <= max_chars: + return text + # Шукаємо останній \n перед межею + cut_at = text.rfind("\n", 0, max_chars) + if cut_at <= 0: + cut_at = max_chars # fallback — рядок занадто довгий, ріжемо + return text[:cut_at] + + +def extract_summary_from_bytes(file_name: str, content: bytes) -> str: + """ + Витягнути текстовий summary з байтів файлу. + + XLSX/XLS → openpyxl: max 5 аркушів, max 600 рядків сумарно, + формат: "Аркуш: \n