import sys import os import json import logging import re import subprocess from pathlib import Path from crewai import Crew, Task from crews.agromatrix_crew.agents.stepan_orchestrator import build_stepan from crews.agromatrix_crew.agents.operations_agent import build_operations from crews.agromatrix_crew.agents.iot_agent import build_iot from crews.agromatrix_crew.agents.platform_agent import build_platform from crews.agromatrix_crew.agents.spreadsheet_agent import build_spreadsheet from crews.agromatrix_crew.agents.sustainability_agent import build_sustainability from crews.agromatrix_crew.audit import audit_event, new_trace from agromatrix_tools import tool_dictionary from agromatrix_tools import tool_operation_plan from crews.agromatrix_crew.operator_commands import route_operator_command, route_operator_text from crews.agromatrix_crew.memory_manager import ( load_user_profile, save_user_profile, load_farm_profile, save_farm_profile, update_profile_if_needed, ) from crews.agromatrix_crew.style_adapter import adapt_response_style, build_style_prefix from crews.agromatrix_crew.reflection_engine import reflect_on_response from crews.agromatrix_crew.light_reply import build_light_reply, classify_light_event from crews.agromatrix_crew.depth_classifier import classify_depth from crews.agromatrix_crew.telemetry import tlog from crews.agromatrix_crew.session_context import ( load_session, update_session, is_doc_focus_active, is_doc_focus_cooldown_active, DOC_FOCUS_TTL, DOC_FOCUS_COOLDOWN_S, ) from crews.agromatrix_crew.proactivity import maybe_add_proactivity from crews.agromatrix_crew.doc_facts import ( extract_doc_facts, merge_doc_facts, can_answer_from_facts, compute_scenario, format_facts_as_text, extract_fact_claims, build_self_correction, ) logger = logging.getLogger(__name__) # ── Doc Focus Gate helpers (v3.5 / v3.6 / v3.7) ──────────────────────────── # Логіка винесена в doc_focus.py (без залежностей від crewai/agromatrix_tools) from crews.agromatrix_crew.doc_focus import ( # noqa: E402 _is_doc_question, _detect_domain, detect_context_signals, build_mode_clarifier, ) from crews.agromatrix_crew.farm_state import ( # noqa: E402 detect_farm_state_updates, update_farm_state, build_farm_state_prefix, ) # ── v4.2: Vision → Agronomy Bridge ────────────────────────────────────────── # Fail-safe lazy import: vision_guard живе в gateway-bot/, не в crews/. # Якщо модуль недоступний (наприклад, юніт-тести без gateway-bot) — мовчки # пропускаємо; вся логіка нижче захищена try/except. def _vb_get_vision_lock(agent_id: str, chat_id: str) -> dict: try: from vision_guard import get_vision_lock as _gvl return _gvl(agent_id, chat_id) or {} except Exception: return {} # ── v4.7: FarmOS Farm State Bridge ─────────────────────────────────────────── # Читаємо збережений /farm state snapshot з memory-service. # Fail-closed: timeout 2s, не кидає виняток, не блокує відповідь. # Максимальний вік snapshot: 24h (86400s). _FARM_STATE_SNAPSHOT_TTL_S = 86400.0 def _load_farm_state_snapshot(chat_id: str) -> str | None: """ Завантажує snapshot тексту з memory-service (ключ farm_state:agromatrix:chat:{chat_id}). Повертає текст якщо snapshot є і не старший за 24h, інакше None. Fail-closed: будь-яка помилка → None. """ try: import httpx import json as _json from datetime import datetime, timezone mem_url = os.getenv( "AGX_MEMORY_SERVICE_URL", os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"), ) fact_key = f"farm_state:agromatrix:chat:{chat_id}" synthetic_uid = f"farm:{chat_id}" # memory-service API: GET /facts/{fact_key}?user_id=... # (не /facts/get — той endpoint не існує) resp = httpx.get( f"{mem_url}/facts/{fact_key}", params={"user_id": synthetic_uid}, timeout=2.0, ) if resp.status_code != 200: return None data = resp.json() # fact_value_json може повертатися як рядок або dict val = data.get("fact_value_json") or data.get("fact_value") if isinstance(val, str): try: val = _json.loads(val) except Exception: return None if not isinstance(val, dict): return None # Перевірка TTL: generated_at має бути не старішим за 24h generated_at = val.get("generated_at", "") if generated_at: try: ts = datetime.fromisoformat(generated_at.replace("Z", "+00:00")) age_s = (datetime.now(timezone.utc) - ts).total_seconds() if age_s > _FARM_STATE_SNAPSHOT_TTL_S: return None except Exception: pass # якщо не можемо перевірити вік — все одно повертаємо snapshot text = val.get("text", "").strip() return text if text else None except Exception: return None # --------------------------------------------------------------------------- # Light / Deep activation layer — реалізація у depth_classifier.py # --------------------------------------------------------------------------- def _stepan_light_response(text: str, stepan, trace: dict, user_profile: dict | None) -> str: """ Відповідь Степана у Light mode. Спочатку намагається побудувати детерміновану (seeded) відповідь без LLM — для чітких greeting/thanks/ack/short_followup подій. Якщо без-LLM відповідь є — повертає її одразу (нульова затримка). Інакше — LLM одним агентом без делегування. Логує crew_launch=false. """ logger.info("crew_launch=false depth=light") # Fast path: no LLM needed for clear social events fast_reply = build_light_reply(text, user_profile) if fast_reply: audit_event({**trace, 'agent': 'stepan', 'action': 'light_nollm'}) return fast_reply # LLM path: single Stepan task, no sub-agents task = Task( description=( f"Повідомлення від користувача: {text}\n\n" "Дай коротку, природну, людську відповідь. " "Не запускай жодних операційних інструментів. " "Не форматуй як JSON. " "Не починай з 'Звісно', 'Чудово', 'Дозвольте'. " "Одне питання максимум, якщо потрібно." ), expected_output="Коротка розмовна відповідь українською, 1–4 речення.", agent=stepan, ) crew = Crew(agents=[stepan], tasks=[task], verbose=False) result = crew.kickoff() audit_event({**trace, 'agent': 'stepan', 'action': 'light_response'}) return adapt_response_style(str(result), user_profile) # --------------------------------------------------------------------------- def farmos_ui_hint(): port = os.getenv('FARMOS_UI_PORT', '18080') try: out = subprocess.check_output(['docker','ps','--format','{{.Names}}'], text=True) if 'farmos_ui_proxy' in out: return "\n[UI] farmOS доступний локально: http://127.0.0.1:{} (basic auth)".format(port) except Exception: return "" return "" def detect_intent(text: str) -> str: t = text.lower() if 'сплануй' in t and 'тиж' in t: return 'plan_week' if 'сплануй' in t: return 'plan_day' if 'критично' in t or 'на завтра' in t: return 'show_critical_tomorrow' if 'план/факт' in t or 'план факт' in t: return 'plan_vs_fact' if 'закрий план' in t: return 'close_plan' return 'general' def validate_payload(obj: dict): required = ['status', 'summary', 'artifacts', 'tool_calls', 'next_actions'] for k in required: if k not in obj: return False, f'missing:{k}' if obj['status'] not in ['ok', 'error']: return False, 'bad_status' if not isinstance(obj['summary'], str): return False, 'summary_not_string' if not isinstance(obj['artifacts'], list): return False, 'artifacts_not_list' if not isinstance(obj['tool_calls'], list): return False, 'tool_calls_not_list' if not isinstance(obj['next_actions'], list): return False, 'next_actions_not_list' return True, 'ok' def run_task_with_retry(agent, description: str, trace_id: str, max_retries: int = 2): instruction = "Return ONLY valid JSON matching schema in crews/agromatrix_crew/schema.json. No extra text." last_error = '' for attempt in range(max_retries + 1): desc = description if attempt == 0 else (description + "\n\n" + instruction) task = Task( description=desc, expected_output="JSON strictly matching schema.json", agent=agent ) crew = Crew(agents=[agent], tasks=[task], verbose=True) result = crew.kickoff() raw = str(result) try: data = json.loads(raw) ok, reason = validate_payload(data) if ok: return data last_error = reason except Exception as e: last_error = f'json_error:{e}' audit_event({ 'trace_id': trace_id, 'agent': agent.role, 'action': 'json_validation_failed', 'error': last_error }) return { 'status': 'error', 'summary': f'JSON validation failed: {last_error}', 'artifacts': [], 'tool_calls': [], 'next_actions': [] } def handle_message( text: str, user_id: str = '', chat_id: str = '', trace_id: str = '', ops_mode: bool = False, last_pending_list: list | None = None, # injected by memory layer (Промт 3); None until that layer is wired user_profile: dict | None = None, farm_profile: dict | None = None, has_doc_context: bool = False, # Doc Bridge (v3.1): короткий контекст активного документа з gateway doc_context: dict | None = None, # Chat History Bridge (v3.2): текст переписки з memory-service (до 40 повідомлень) chat_history: str = '', ) -> str: trace = new_trace(text) if trace_id: trace['trace_id'] = trace_id os.environ['AGX_TRACE_ID'] = trace['trace_id'] os.environ['AGX_USER_ID'] = str(user_id) os.environ['AGX_CHAT_ID'] = str(chat_id) os.environ['AGX_OPS_MODE'] = '1' if ops_mode else '0' # Load profiles (fail-safe: always returns a valid dict) if user_profile is None and user_id: user_profile = load_user_profile(str(user_id)) if farm_profile is None and chat_id: # v2.8: pass user_id for lazy legacy migration farm_profile = load_farm_profile(str(chat_id), user_id=str(user_id) if user_id else None) # operator commands (unchanged routing) if text.strip().startswith('/'): op_res = route_operator_command(text, str(user_id), str(chat_id)) if op_res: return json.dumps(op_res, ensure_ascii=False) elif ops_mode: op_res = route_operator_text(text, str(user_id), str(chat_id), last_pending_list=last_pending_list) if op_res: return json.dumps(op_res, ensure_ascii=False) # Load session context (v3: TTL 15 min, in-memory) session = load_session(str(chat_id)) # ── v4: FARM STATE UPDATE ──────────────────────────────────────────────── # Оновлюємо farm_state до будь-якої іншої логіки (ізольовано, fail-safe). # Не впливає на doc_mode, depth classifier, memory_manager. _farm_updates = detect_farm_state_updates(text or "") if _farm_updates: update_farm_state(session, _farm_updates) tlog(logger, "farm_state_updated", chat_id=str(chat_id), fields=",".join(_farm_updates.keys())) # ── v4.2: VISION → AGRONOMY BRIDGE ────────────────────────────────────── # Читаємо vision lock (per agent_id:chat_id) і зберігаємо label у session. # Тільки якщо lock свіжий (TTL перевіряє get_vision_lock всередині). # User override ("це соняшник") вже записаний у lock.user_label. # Fail-safe: будь-яка помилка → пропускаємо. _agent_id_str = os.getenv("AGX_AGENT_ID", "agromatrix") try: _vb_lock = _vb_get_vision_lock(_agent_id_str, str(chat_id)) if _vb_lock: _vb_label = (_vb_lock.get("user_label") or _vb_lock.get("label") or "").strip() if _vb_label: session["vision_last_label"] = _vb_label tlog(logger, "vision_bridge_label_loaded", chat_id=str(chat_id), label=_vb_label) except Exception: pass # ── v4.2: USER TEXT OVERRIDE for vision_last_label ─────────────────────── # Якщо юзер явно змінює культуру текстом ("тепер це кукурудза") → # перезаписуємо vision_last_label безпосередньо. # Використовуємо той самий detect_user_override з vision_guard (fail-safe). if text: try: from vision_guard import detect_user_override as _vb_detect_override _vb_text_label = _vb_detect_override(text) if _vb_text_label: session["vision_last_label"] = _vb_text_label tlog(logger, "vision_bridge_text_override", chat_id=str(chat_id), label=_vb_text_label) except Exception: pass # ── DOC CONTEXT FALLBACK (v3.3) ────────────────────────────────────────── # Якщо gateway не передав doc_context — пробуємо підтягнути chat-scoped з memory. # Це дозволяє Stepan бачити документ навіть якщо gateway не синхронізований. if not doc_context and chat_id: try: import httpx as _httpx import os as _os _mem_url = _os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000") _agent_id_env = _os.getenv("AGX_AGENT_ID", "agromatrix") _aid = _agent_id_env.lower() _fact_user = f"chat:{_aid}:{chat_id}" _fact_key = f"doc_context_chat:{_aid}:{chat_id}" with _httpx.Client(timeout=3.0) as _hc: _r = _hc.post( f"{_mem_url}/facts/get", json={"user_id": _fact_user, "fact_key": _fact_key}, ) if _r.status_code == 200: _fdata = _r.json() _fval = _fdata.get("fact_value_json") or {} if isinstance(_fval, str): import json as _json _fval = _json.loads(_fval) if _fval.get("doc_id") or _fval.get("file_unique_id"): doc_context = _fval has_doc_context = True tlog(logger, "doc_context_fallback_loaded", chat_id=str(chat_id), doc_id=str(_fval.get("doc_id", ""))[:16]) except Exception as _fbe: logger.debug("Doc context fallback failed (non-blocking): %s", _fbe) # ── DOC ANCHOR RESET (v3.3) ────────────────────────────────────────────── # Якщо doc_id змінився — скидаємо doc_facts і fact_claims попереднього документу. # Fix D: active_doc_id пріоритетний — він явно фіксується при upload (навіть без тексту), # тому перший text-запит після upload одразу має правильний anchor. _current_doc_id: str | None = None if doc_context: _current_doc_id = ( doc_context.get("active_doc_id") or doc_context.get("doc_id") or doc_context.get("file_unique_id") or None ) if _current_doc_id: _prev_doc_id = session.get("active_doc_id") if _prev_doc_id and _prev_doc_id != _current_doc_id: session["doc_facts"] = {} session["fact_claims"] = [] tlog(logger, "doc_anchor_reset", old=str(_prev_doc_id)[:16], new=str(_current_doc_id)[:16]) # ── DOC FOCUS GATE (v3.5 / v3.6) ──────────────────────────────────────── import time as _time_mod _now_ts = _time_mod.time() # TTL auto-expire (самозцілювальний) if session.get("doc_focus") and not is_doc_focus_active(session, _now_ts): _expired_age = round(_now_ts - (session.get("doc_focus_ts") or 0.0)) session["doc_focus"] = False session["doc_focus_ts"] = 0.0 tlog(logger, "doc_focus_expired", chat_id=str(chat_id), ttl_s=_expired_age, last_doc=str(session.get("active_doc_id", ""))[:16]) _signals = detect_context_signals(text) _domain = _detect_domain(text, logger=logger) _focus_active = is_doc_focus_active(session, _now_ts) _cooldown_active = is_doc_focus_cooldown_active(session, _now_ts) # Auto-clear doc_focus + встановити cooldown при зміні домену (web/vision) _df_cooldown_until_update: float | None = None if _focus_active and _domain in ("vision", "web"): session["doc_focus"] = False session["doc_focus_ts"] = 0.0 _focus_active = False _cooldown_until = _now_ts + DOC_FOCUS_COOLDOWN_S session["doc_focus_cooldown_until"] = _cooldown_until _cooldown_active = True _df_cooldown_until_update = _cooldown_until tlog(logger, "doc_focus_cleared", chat_id=str(chat_id), reason=_domain) tlog(logger, "doc_focus_cooldown_set", chat_id=str(chat_id), seconds=int(DOC_FOCUS_COOLDOWN_S), reason=_domain) # Визначаємо context_mode з gating-правилами (v3.6) _context_mode: str _doc_denied_reason: str | None = None if _domain in ("doc",): _is_explicit = _signals["has_explicit_doc_token"] # Rule 1: Cooldown блокує implicit doc (не explicit) if _cooldown_active and not _is_explicit: _context_mode = "general" _doc_denied_reason = "cooldown" _ttl_left = int((session.get("doc_focus_cooldown_until") or 0.0) - _now_ts) tlog(logger, "doc_mode_denied", chat_id=str(chat_id), reason="cooldown", ttl_left=_ttl_left) # Rule 2: Без explicit — дозволити doc тільки якщо є fact-сигнал або факти покривають elif not _is_explicit: _session_facts_gate = session.get("doc_facts") or {} try: from crews.agromatrix_crew.doc_facts import can_answer_from_facts as _cafg _can_use_facts, _ = _cafg(text, _session_facts_gate) except Exception: _can_use_facts = False if _signals["has_fact_signal"] or _can_use_facts: # Дозволяємо doc-mode через факт-сигнал _context_mode = "doc" else: _context_mode = "general" _doc_denied_reason = "no_fact_signal" tlog(logger, "doc_mode_denied", chat_id=str(chat_id), reason="no_fact_signal", has_facts=bool(_session_facts_gate)) # Rule 3: Explicit doc-токен завжди дозволяє doc (навіть при cooldown) else: _context_mode = "doc" elif _domain == "general" and (_focus_active or _current_doc_id) and _signals["has_fact_signal"]: # Факт-сигнал без doc-домену + active doc → дозволяємо doc якщо фокус живий _context_mode = "doc" if _focus_active else "general" else: _context_mode = "general" # Активуємо/продовжуємо doc_focus при успішному doc-mode if _context_mode == "doc" and _current_doc_id: if not _focus_active: session["doc_focus"] = True session["doc_focus_ts"] = _now_ts _focus_active = True tlog(logger, "doc_focus_set", chat_id=str(chat_id), reason="doc_question_reactivated", doc_id=str(_current_doc_id)[:16]) tlog(logger, "context_mode", chat_id=str(chat_id), mode=_context_mode, domain=_domain, focus=_focus_active, cooldown=_cooldown_active) # v3.6: Якщо doc-mode заблокований — повернути clarifier одразу (без LLM) if _doc_denied_reason in ("cooldown", "no_fact_signal") and _domain == "doc": _clarifier = build_mode_clarifier(text) update_session(str(chat_id), text, depth="light", agents=[], last_question=None) return _clarifier # ── PHOTO CONTEXT GATE (v3.5 fix) ─────────────────────────────────────── # Якщо щойно було фото (< 120с) і запит короткий (<=3 слів) і без explicit doc — # відповідаємо коротким уточненням замість light/general відповіді без контексту. _last_photo_ts = float(session.get("last_photo_ts") or 0.0) _photo_ctx_ttl = 120.0 _photo_just_sent = (_now_ts - _last_photo_ts) < _photo_ctx_ttl and _last_photo_ts > 0 if _photo_just_sent and len(text.split()) <= 3 and not _signals.get("has_explicit_doc_token"): _photo_age = int(_now_ts - _last_photo_ts) tlog(logger, "photo_context_gate", chat_id=str(chat_id), age_s=_photo_age, words=len(text.split())) update_session(str(chat_id), text, depth="light", agents=[], last_question=None) return "Що саме хочеш дізнатися про фото?" # ── CONFIRMATION GATE (v3.1) ──────────────────────────────────────────── # Якщо є pending_action у сесії і повідомлення — підтвердження, # підставляємо контекст попереднього кроку і йдемо в deep. _CONFIRMATION_WORDS = re.compile( r"^(так|зроби|ок|ok|окей|погоджуюсь|погодився|роби|давай|підтверджую|yes|go|sure)[\W]*$", re.IGNORECASE | re.UNICODE, ) pending_action = session.get("pending_action") if pending_action and _CONFIRMATION_WORDS.match(text.strip()): tlog(logger, "confirmation_consumed", chat_id=str(chat_id), intent=pending_action.get("intent")) # Розширюємо контекст: використовуємо збережений intent і what_to_do_next text = pending_action.get("what_to_do_next") or text has_doc_context = has_doc_context or bool(pending_action.get("doc_context")) # ── DOC BRIDGE (v3.1/v3.2 / v3.5) ────────────────────────────────────── # PROMPT B: doc_context підмішуємо в промпт ТІЛЬКИ якщо context_mode == "doc". # Якщо "general" — документ є але мовчимо про нього (не нав'язуємо "у цьому звіті"). _doc_summary_snippet: str = "" if doc_context and _context_mode == "doc": doc_id = doc_context.get("doc_id") or doc_context.get("id", "") doc_title = doc_context.get("title") or doc_context.get("filename", "") doc_summary = doc_context.get("extracted_summary") or doc_context.get("summary", "") has_doc_context = True tlog(logger, "doc_context_used", chat_id=str(chat_id), doc_id=doc_id, has_content=bool(doc_summary)) elif doc_context and _context_mode == "general": # Документ є, але поточний запит не про нього — не підмішуємо doc_id = doc_context.get("doc_id") or "" doc_title = "" doc_summary = "" tlog(logger, "doc_context_suppressed", chat_id=str(chat_id), reason="context_mode_general", doc_id=doc_id[:16] if doc_id else "") if doc_context and _context_mode == "doc": # Будуємо snippet для deep-mode промпту (тільки в doc mode) parts = [] if doc_title: parts.append(f"=== ДОКУМЕНТ: «{doc_title}» ===") if doc_summary: # До 3000 символів — достатньо для xlsx з кількома листами parts.append(f"ЗМІСТ ДОКУМЕНТА:\n{doc_summary[:3000]}") parts.append("=== КІНЕЦЬ ДОКУМЕНТА ===") elif doc_title: # Fix E: є документ але summary порожній. # Якщо file_id відомий — витяг можливий; НЕ просимо "надіслати ще раз". _doc_file_id = doc_context.get("file_id") or doc_context.get("file_unique_id") or "" if _doc_file_id: parts.append( f"(Вміст «{doc_title}» ще не витягнутий у цій сесії — " f"але файл є. Перевір ІСТОРІЮ ДІАЛОГУ нижче: там можуть бути " f"попередні відповіді про цей документ. Якщо в history нічого — " f"відповідай: 'Зараз витягую дані — дай секунду.' і запропонуй " f"1 конкретне уточнюючи питання.)" ) else: # file_id невідомий — тоді можна попросити надіслати знову parts.append( f"(Вміст «{doc_title}» недоступний. " f"Перевір ІСТОРІЮ ДІАЛОГУ — там можуть бути попередні відповіді. " f"Якщо в history нічого — попроси надіслати файл ще раз, одним реченням.)" ) _doc_summary_snippet = "\n".join(parts) # Light / Deep classification last_topic = (user_profile or {}).get('last_topic') depth = classify_depth( text, has_doc_context=has_doc_context, last_topic=last_topic, user_profile=user_profile, session=session, ) audit_event({**trace, 'agent': 'stepan', 'action': 'intake', 'depth': depth}) style_prefix = build_style_prefix(user_profile) stepan = build_stepan(style_prefix=style_prefix) # ── LIGHT MODE ──────────────────────────────────────────────────────────── if depth == "light": tlog(logger, "crew_launch", launched=False, depth="light") response = _stepan_light_response(text, stepan, trace, user_profile) update_profile_if_needed(str(user_id), str(chat_id), text, response, intent=None, depth="light") update_session(str(chat_id), text, depth="light", agents=[], last_question=None) return response # ── DEEP MODE ───────────────────────────────────────────────────────────── tlog(logger, "crew_launch", launched=True, depth="deep", agents=["stepan"]) audit_event({**trace, 'agent': 'stepan', 'action': 'deep_single_agent'}) # ── FACT LOCK (v3.2) ──────────────────────────────────────────────────── # Якщо в сесії є зафіксовані числові факти — відповідаємо з кешу без RAG. _session_facts: dict = session.get("doc_facts") or {} if _session_facts: _can_reuse, _reuse_keys = can_answer_from_facts(text, _session_facts) if _can_reuse: tlog(logger, "fact_reused", chat_id=str(chat_id), keys=",".join(_reuse_keys)) # Спочатку перевіряємо сценарний розрахунок _scen_ok, _scen_text = compute_scenario(text, _session_facts) if _scen_ok: _self_corr = build_self_correction(text, _session_facts, session, current_doc_id=_current_doc_id) final = (_self_corr + _scen_text) if _self_corr else _scen_text final = adapt_response_style(final, user_profile) _new_claims = extract_fact_claims(final) update_session(str(chat_id), text, depth="deep", agents=[], last_question=None, doc_facts=_session_facts, fact_claims=_new_claims, active_doc_id=_current_doc_id, doc_focus=True, doc_focus_ts=_now_ts) return final # Без сценарію — форматуємо відомі факти _facts_text = format_facts_as_text( {k: _session_facts[k] for k in _reuse_keys if k in _session_facts} ) _self_corr = build_self_correction(text, _session_facts, session, current_doc_id=_current_doc_id) final = (_self_corr + _facts_text) if _self_corr else _facts_text final = adapt_response_style(final, user_profile) _new_claims = extract_fact_claims(final) update_session(str(chat_id), text, depth="deep", agents=[], last_question=None, doc_facts=_session_facts, fact_claims=_new_claims, active_doc_id=_current_doc_id, doc_focus=True, doc_focus_ts=_now_ts) return final # Preflight normalization — лише збагачення контексту, НЕ блокатор. norm = {} try: norm = tool_dictionary.normalize_from_text(text, trace_id=trace['trace_id'], source='telegram') except Exception as _ne: logger.debug("normalize_from_text error (non-blocking): %s", _ne) _all_pending = [item for cat in norm.values() for item in cat if item.get('status') == 'pending'] if _all_pending: tlog(logger, "pending_terms_info", chat_id=str(chat_id), count=len(_all_pending)) intent = detect_intent(text) # Специфічні intent-и що потребують tool-виклику if intent in ['plan_week', 'plan_day']: try: plan_id = tool_operation_plan.create_plan({ 'scope': { 'field_ids': [i.get('normalized_id') for i in norm.get('fields', []) if i.get('status')=='ok'], 'crop_ids': [i.get('normalized_id') for i in norm.get('crops', []) if i.get('status')=='ok'], 'date_window': {'start': '', 'end': ''} }, 'tasks': [] }, trace_id=trace['trace_id'], source='telegram') return f"План створено: {plan_id}. Уточни дати та операції." except Exception: pass # Будуємо промпт для Степана — з doc_context і chat_history task_parts = [] # 0. v4: Farm State prefix (тільки не в doc і не web режимі) if _context_mode != "doc" and _domain not in ("web",): _farm_prefix = build_farm_state_prefix(session) if _farm_prefix: task_parts.append(_farm_prefix) tlog(logger, "farm_state_injected", chat_id=str(chat_id), crop=str((session.get("farm_state") or {}).get("current_crop", ""))[:20]) # 0b. v4.2: Vision → Agronomy Bridge prefix # Додаємо контекст культури з останнього фото, якщо: # - НЕ doc mode (документ має пріоритет) # - НЕ web mode # - vision_last_label є і не дублює farm_state (щоб не плутати Степана) if _context_mode != "doc" and _domain not in ("web",): _vb_label_now = (session.get("vision_last_label") or "").strip() _farm_crop = str((session.get("farm_state") or {}).get("current_crop", "")).strip() if _vb_label_now and _vb_label_now != _farm_crop: task_parts.append( f"SYSTEM NOTE (не виводь це в відповідь): " f"Культура з останнього фото — {_vb_label_now}. " "Використовуй як контекст для агрономічних питань." ) tlog(logger, "vision_bridge_injected", chat_id=str(chat_id), label=_vb_label_now) # 0c. v4.7: FarmOS Farm State Bridge # Читаємо збережений /farm state snapshot з memory-service. # Умови injection (аналогічно vision bridge): # - НЕ doc mode # - НЕ web domain # - snapshot не старший за 24h if _context_mode != "doc" and _domain not in ("web",): _fs_text = _load_farm_state_snapshot(str(chat_id)) if _fs_text: task_parts.append( f"SYSTEM NOTE (не виводь це в відповідь): " f"Farm state snapshot (FarmOS, актуально):\n{_fs_text}" ) tlog(logger, "farm_state_snapshot_loaded", chat_id=str(chat_id), found=True, preview=_fs_text[:40]) else: tlog(logger, "farm_state_snapshot_loaded", chat_id=str(chat_id), found=False) # 1. Документ (якщо є вміст) if _doc_summary_snippet: task_parts.append(_doc_summary_snippet) # 2. Контекст переписки (chat history з memory-service) if chat_history: # Беремо останні 3000 символів — достатньо для контексту _history_snippet = chat_history[-3000:] if len(chat_history) > 3000 else chat_history task_parts.append( f"=== ІСТОРІЯ ДІАЛОГУ (до 40 повідомлень) ===\n" f"{_history_snippet}\n" f"=== КІНЕЦЬ ІСТОРІЇ ===" ) # 3. Контекст профілю якщо є if farm_profile: fields = farm_profile.get("fields", []) if fields: task_parts.append(f"Поля господарства: {', '.join(str(f) for f in fields[:5])}") task_parts.append(f"Поточний запит: {text}") if _doc_summary_snippet: task_parts.append( "ІНСТРУКЦІЯ: У тебе є вміст документа вище. " "Відповідай ТІЛЬКИ на основі цього документа. " "Якщо є числа — цитуй їх точно. " "Якщо потрібного числа немає — скажи в якому рядку/колонці шукати (1 речення)." ) elif chat_history: task_parts.append( "ІНСТРУКЦІЯ: Використовуй ІСТОРІЮ ДІАЛОГУ вище для відповіді. " "Якщо в history є згадка документа або дані — спирайся на них. " "Відповідай коротко і конкретно українською." ) elif doc_context and (doc_context.get("file_id") or doc_context.get("file_unique_id")): # Fix E: файл є, але summary порожній і history немає → НЕ казати "немає даних" _f_name = doc_context.get("file_name") or "документ" task_parts.append( f"ІНСТРУКЦІЯ: Файл «{_f_name}» отримано, але вміст ще не витягнутий. " f"НЕ кажи 'немає даних' або 'надішли ще раз'. " f"Відповідай: 'Зараз опрацьовую — дай хвилину' і постав 1 уточнюючи питання " f"про те, що саме потрібно знайти у файлі." ) else: task_parts.append( "Дай коротку, конкретну відповідь українською. " "Якщо даних недостатньо — скажи що саме потрібно уточнити (1 питання)." ) tlog(logger, "deep_context_ready", chat_id=str(chat_id), has_doc=bool(_doc_summary_snippet), has_history=bool(chat_history), history_len=len(chat_history)) final_task = Task( description="\n\n".join(task_parts), expected_output="Коротка відповідь для користувача українською мовою.", agent=stepan ) crew = Crew(agents=[stepan], tasks=[final_task], verbose=False) result = crew.kickoff() raw_response = str(result) + farmos_ui_hint() styled_response = adapt_response_style(raw_response, user_profile) # Reflection (Deep mode only, never recursive) reflection = reflect_on_response(text, styled_response, user_profile, farm_profile) if reflection.get("style_shift") and user_profile: user_profile["style"] = reflection["style_shift"] if reflection.get("clarifying_question"): styled_response = styled_response.rstrip() + "\n\n" + reflection["clarifying_question"] if reflection.get("new_facts") and user_id: u = user_profile or {} for k, v in reflection["new_facts"].items(): if k == "new_crops": # Handled by update_profile_if_needed / FarmProfile pass elif k in ("name", "role"): u[k] = v if user_id: save_user_profile(str(user_id), u) audit_event({**trace, 'agent': 'stepan', 'action': 'reflection', 'confidence': reflection.get("confidence"), 'new_facts': list(reflection.get("new_facts", {}).keys())}) # Soft proactivity (v3: 1 речення max, за умовами) clarifying_q = reflection.get("clarifying_question") if reflection else None styled_response, _ = maybe_add_proactivity( styled_response, user_profile or {}, depth="deep", reflection=reflection ) update_profile_if_needed(str(user_id), str(chat_id), text, styled_response, intent=intent, depth="deep") # ── v3.7: STATE-AWARE DOC ACK ──────────────────────────────────────────── # Якщо doc_focus щойно встановлений (focus_active раніше не був) і context_mode==doc, # додаємо короткий префікс (max 60 символів), щоб уникнути "Так, пам'ятаю". _doc_just_activated = ( _context_mode == "doc" and _current_doc_id and not _focus_active # _focus_active = стан ДО активації у цьому запиті ) if _doc_just_activated: _ack = "По звіту дивлюсь." if _signals.get("has_explicit_doc_token") else "Працюємо зі звітом." # Тільки якщо відповідь не починається з нашого ack if not styled_response.startswith(_ack): styled_response = f"{_ack}\n{styled_response}" tlog(logger, "doc_focus_acknowledged", chat_id=str(chat_id), ack=_ack[:20], explicit=_signals.get("has_explicit_doc_token", False)) # ── FACT LOCK: витягуємо факти з відповіді і зберігаємо в session ────── _new_facts = extract_doc_facts(styled_response) _merged_facts: dict | None = None if _new_facts: _merged_facts = merge_doc_facts(_session_facts, _new_facts) _conflicts = _merged_facts.get("conflicts", {}) tlog(logger, "fact_locked", chat_id=str(chat_id), keys=",".join(k for k in _new_facts if k not in ("conflicts","needs_recheck")), conflicts=bool(_conflicts)) # Якщо конфлікт — додаємо 1 речення до відповіді if _conflicts: conflict_key = next(iter(_conflicts)) tlog(logger, "fact_conflict", chat_id=str(chat_id), key=conflict_key) styled_response = styled_response.rstrip() + ( f"\n\nБачу розбіжність по \"{conflict_key.replace('_uah','').replace('_ha','')}\". " "Підтверди, яке значення правильне." ) # ── SELF-CORRECTION: prefix якщо нова відповідь суперечить попередній ── _self_corr_prefix = build_self_correction(styled_response, _session_facts, session, current_doc_id=_current_doc_id) if _self_corr_prefix: styled_response = _self_corr_prefix + styled_response tlog(logger, "self_corrected", chat_id=str(chat_id)) # ── pending_action ─────────────────────────────────────────────────────── _pending_action: dict | None = None if clarifying_q and intent: _pending_action = { "intent": intent, "what_to_do_next": text, "doc_context": {"doc_id": doc_context.get("doc_id")} if doc_context else None, } _new_claims = extract_fact_claims(styled_response) # Якщо відповідь в doc-режимі і факти знайдено — вмикаємо/продовжуємо doc_focus _df_update: bool | None = None _df_ts_update: float | None = None if _context_mode == "doc" and _current_doc_id: _df_update = True _df_ts_update = _now_ts if not _focus_active: tlog(logger, "doc_focus_set", chat_id=str(chat_id), reason="deep_doc_answer", doc_id=str(_current_doc_id)[:16]) elif _context_mode == "general" and session.get("doc_focus"): # Відповідь в general-режимі — скидаємо фокус (не продовжуємо TTL) _df_update = False _df_ts_update = 0.0 update_session( str(chat_id), text, depth="deep", agents=["stepan"], last_question=clarifying_q, pending_action=_pending_action, doc_facts=_merged_facts if _merged_facts is not None else (_session_facts or None), fact_claims=_new_claims, active_doc_id=_current_doc_id, doc_focus=_df_update, doc_focus_ts=_df_ts_update, doc_focus_cooldown_until=_df_cooldown_until_update, ) # ── CONTEXT BLEED GUARD (v3.5 / v3.6 / v3.7) ──────────────────────────── # Якщо відповідь містить doc-фрази але context_mode == "general" → замінити. # Це блокує "витік" шаблонних фраз навіть коли doc_context не підмішувався. if _context_mode == "general": _BLEED_RE = re.compile( r"у\s+(?:цьому|наданому|даному)\s+документі" r"|в\s+(?:цьому|наданому|даному)\s+документі" r"|у\s+(?:цьому\s+)?звіті|в\s+(?:цьому\s+)?звіті", re.IGNORECASE | re.UNICODE, ) _bleed_match = _BLEED_RE.search(styled_response) if _bleed_match: _bleed_phrase = _bleed_match.group(0) tlog(logger, "doc_phrase_suppressed", chat_id=str(chat_id), phrase=_bleed_phrase[:40], mode="general") # v3.6: використовуємо контекстний clarifier замість фіксованої фрази styled_response = build_mode_clarifier(text) # ── UX-PHRASE GUARD (v3.7) ──────────────────────────────────────────────── # Заміна шаблонних фраз "Так, пам'ятаю" / "Не бачу його перед собою" тощо. _DOC_AWARENESS_RE = re.compile( r"(так,\s*пам['\u2019]ятаю|не\s+бачу\s+його|не\s+бачу\s+перед\s+собою" r"|мені\s+(?:не\s+)?доступний\s+документ)", re.IGNORECASE | re.UNICODE, ) if _DOC_AWARENESS_RE.search(styled_response): tlog(logger, "doc_ux_phrase_suppressed", chat_id=str(chat_id)) styled_response = re.sub( _DOC_AWARENESS_RE, lambda m: build_mode_clarifier(text), styled_response, count=1, ) # v3.7: Заміна "у цьому документі" у general при будь-якому тексті if _context_mode == "general": _DOC_MENTION_RE = re.compile( r"\bзвіт\b", re.IGNORECASE | re.UNICODE, ) if _DOC_MENTION_RE.search(styled_response) and "doc_facts" not in str(styled_response[:100]): tlog(logger, "doc_mention_blocked_in_general", chat_id=str(chat_id)) return styled_response def main(): if len(sys.argv) < 2: print('Usage: python run.py [--trace ] ""') sys.exit(1) args = sys.argv[1:] trace_override = None if args and args[0] == '--trace': trace_override = args[1] args = args[2:] user_request = args[0] output = handle_message(user_request, user_id=os.getenv('AGX_USER_ID',''), chat_id=os.getenv('AGX_CHAT_ID',''), trace_id=trace_override or '', ops_mode=os.getenv('AGX_OPS_MODE','0')=='1') print(output) if __name__ == '__main__': main()