""" vision_guard.py — v4.0.1 Vision Consistency Guard. Зберігає lock на останній висновок vision для конкретного chat+photo: - vision_last_photo_key → file_unique_id (або file_id як fallback) - vision_last_label → витягнутий label (культура/діагноз) - vision_last_confidence → "high" | "low" | "unknown" - vision_user_label → підтвердження від юзера ("це соняшник") Правила: 1. Те саме фото (file_unique_id fallback file_id) → НЕ переоцінюємо без явного запиту "переоцінити/перевір ще раз". reeval_request → clear_lock → повний реаналіз. 2. Низький confidence → додаємо уточнення (якщо LLM сам не поставив '?'). 3. User override ("це соняшник") → whitelist + заборона негації. Записуємо як user_label; LLM не сперечається. Без залежностей від crewai, memory-service, httpx. Тільки in-memory TTL dict (per-process). Telemetry-теги (logger.info у caller): vision_lock_set, vision_skip_reanalysis, vision_user_override_set, vision_low_conf_clarifier_added, vision_reeval_forced """ from __future__ import annotations import logging import re import time logger = logging.getLogger(__name__) # ── In-memory store: key = "{agent_id}:{chat_id}" ─────────────────────────── _VISION_LOCK: dict[str, dict] = {} VISION_LOCK_TTL = 1800.0 # 30 хвилин def _cleanup() -> None: now = time.time() expired = [k for k, v in _VISION_LOCK.items() if now - float(v.get("ts", 0)) > VISION_LOCK_TTL] for k in expired: del _VISION_LOCK[k] # ── Photo key: file_unique_id має пріоритет над file_id ────────────────────── def _photo_key(file_id: str, file_unique_id: str | None = None) -> str: """ Telegram надсилає одне фото у кількох розмірах з різними file_id, але спільним file_unique_id. Lock прив'язуємо до file_unique_id якщо є. """ return (file_unique_id or "").strip() or file_id # ── Regex для витягу культури/діагнозу з vision відповіді ───────────────────── _LABEL_CROP_RE = re.compile( r"\b(кукурудза|пшениця|соняшник|ріпак|соя|ячмінь|горох|буряк|картопля|льон" r"|бур[''ʼ]ян|ґрунт|ґрунту|шкідник" r"|corn|wheat|sunflower|rapeseed|soybean|barley|weed|soil|pest)\b", re.IGNORECASE | re.UNICODE, ) _LABEL_DIAG_RE = re.compile( r"\b(хлороз|некроз|іржа|фузаріоз|борошниста\s+роса|септоріоз|попелиц[яі]|тля" r"|дефіцит\s+\w+|нестача\s+\w+|шкідник|хвороба|гниль)\b", re.IGNORECASE | re.UNICODE, ) # Низька впевненість — коли LLM сам сумнівається _LOW_CONFIDENCE_RE = re.compile( r"\b(можливо|схоже|не впевнений|важко сказати|без точної|не можу визначити" r"|потребує|потрібно перевірити|ймовірно|не однозначно|декілька варіантів)\b", re.IGNORECASE | re.UNICODE, ) # ── User override whitelist ─────────────────────────────────────────────────── # Дозволені лейбли (ті ж культури + агрономічні поняття). _OVERRIDE_WHITELIST = { "кукурудза", "пшениця", "соняшник", "ріпак", "соя", "ячмінь", "горох", "буряк", "картопля", "льон", "бур'ян", "бурʼян", "ґрунт", "шкідник", "хлороз", "некроз", "іржа", "фузаріоз", "corn", "wheat", "sunflower", "rapeseed", "soybean", "barley", "weed", "soil", "pest", } # Заборона: "не X", "то не X" → не записуємо _NEGATION_PREFIX_RE = re.compile( r"^(?:це|то|ось|тут)?\s*не\s+\S", re.IGNORECASE | re.UNICODE, ) # Позитивне підтвердження: "це X" / "то X" / "ось X" / "тепер це X" / просто "X" _USER_OVERRIDE_RE = re.compile( r"^(?:тепер\s+)?(?:це|ось|то|тут|так[,\s]|маю\s+на\s+увазі)?\s*" r"(кукурудза|пшениця|соняшник|ріпак|соя|ячмінь|горох|буряк|картопля|льон" r"|бур[''ʼ]ян|ґрунт|шкідник" r"|хлороз|некроз|іржа|фузаріоз|борошниста\s+роса|попелиця|тля|дефіцит\s+\w+" r"|corn|wheat|sunflower|rapeseed|soybean|weed|soil|pest)[\s!.]*$", re.IGNORECASE | re.UNICODE, ) # Явний запит переоцінки _REEVAL_RE = re.compile( r"переоцін|перевір\s+(?:ще\s+раз|знову)|переглянь|інша\s+думка|не\s+те|" r"помилив(?:ся)?|re[-\s]?eval", re.IGNORECASE | re.UNICODE, ) # ── Public API ──────────────────────────────────────────────────────────────── def extract_label_from_response(answer_text: str) -> tuple[str, str]: """ Витягує (label, confidence) з vision відповіді. label: перша знайдена культура або діагноз, нижній регістр. confidence: "high" | "low" | "unknown" Fail-safe: будь-яка помилка → ("", "unknown"). """ try: t = answer_text.strip() label = "" crop_m = _LABEL_CROP_RE.search(t) if crop_m: label = crop_m.group(0).lower() elif (diag_m := _LABEL_DIAG_RE.search(t)): label = diag_m.group(0).lower() confidence = "low" if _LOW_CONFIDENCE_RE.search(t) else ("high" if label else "unknown") return label, confidence except Exception: return "", "unknown" def get_vision_lock(agent_id: str, chat_id: str) -> dict: """ Повертає поточний vision lock для (agent_id, chat_id). {} якщо нема або протухло. """ try: _cleanup() key = f"{agent_id}:{chat_id}" rec = _VISION_LOCK.get(key) or {} if not rec: return {} age = time.time() - float(rec.get("ts", 0)) return rec if age <= VISION_LOCK_TTL else {} except Exception: return {} def set_vision_lock( agent_id: str, chat_id: str, file_id: str, label: str, confidence: str, file_unique_id: str | None = None, ) -> None: """ Зберігає vision lock після обробки фото. photo_key = file_unique_id якщо є, інакше file_id. Fail-safe: не кидає назовні. """ try: _cleanup() key = f"{agent_id}:{chat_id}" existing = _VISION_LOCK.get(key) or {} pk = _photo_key(file_id, file_unique_id) _VISION_LOCK[key] = { "photo_key": pk, "file_id": file_id, "label": label, "confidence": confidence, "user_label": existing.get("user_label", ""), # зберігаємо user override "ts": time.time(), } except Exception: pass def clear_vision_lock(agent_id: str, chat_id: str) -> None: """ Скидає vision lock (для reeval_request). Fail-safe. """ try: key = f"{agent_id}:{chat_id}" _VISION_LOCK.pop(key, None) except Exception: pass def set_user_label(agent_id: str, chat_id: str, user_label: str) -> None: """ Зберігає user override label (юзер явно підтвердив що це). Fail-safe. """ try: _cleanup() key = f"{agent_id}:{chat_id}" rec = _VISION_LOCK.get(key) or {} rec["user_label"] = user_label.strip().lower() rec["ts"] = time.time() _VISION_LOCK[key] = rec except Exception: pass def detect_user_override(text: str) -> str: """ Перевіряє чи текст є user override ("це соняшник" тощо). Правила (B): - Заборонено: "не X", "то не X" → повертаємо "" - Дозволено: тільки whitelist лейблів - Коротке підтвердження: "це X" / "то X" / просто "X" (<=4 слова) Повертає normalized label або "" якщо не override. """ try: stripped = text.strip() # Відхиляємо негацію ("це не соняшник") if _NEGATION_PREFIX_RE.search(stripped): return "" m = _USER_OVERRIDE_RE.match(stripped) if not m: return "" label = m.group(1).strip().lower() # Нормалізуємо аpostrophes для whitelist check label_norm = label.replace("ʼ", "'").replace("\u2019", "'") # Перевіряємо whitelist (часткове співпадіння для "дефіцит X") in_whitelist = any( label_norm == w or label_norm.startswith(w) for w in _OVERRIDE_WHITELIST ) return label if in_whitelist else "" except Exception: return "" def is_reeval_request(text: str) -> bool: """ Чи явний запит переоцінки ("переоцінити", "перевір ще раз" тощо). """ try: return bool(_REEVAL_RE.search(text.strip())) except Exception: return False def should_skip_reanalysis( agent_id: str, chat_id: str, file_id: str, user_text: str, file_unique_id: str | None = None, ) -> bool: """ Rule 1: Те саме фото + без запиту переоцінки → True (skip). Rule C: reeval_request → clear_lock → False (реаналіз). photo_key = file_unique_id якщо є, інакше file_id. """ try: if is_reeval_request(user_text): # C: очищуємо lock — наступний аналіз буде свіжим clear_vision_lock(agent_id, chat_id) logger.info( "vision_reeval_forced agent=%s chat_id=%s file_id=%s", agent_id, chat_id, file_id, ) return False lock = get_vision_lock(agent_id, chat_id) if not lock: return False pk = _photo_key(file_id, file_unique_id) return lock.get("photo_key") == pk except Exception: return False def build_low_confidence_clarifier(answer_text: str) -> tuple[str, bool]: """ Rule 2: Якщо confidence низький — додати уточнення. Повертає (modified_text, was_added). Fail-safe: повертає (original, False) при будь-якій помилці. """ try: _, conf = extract_label_from_response(answer_text) if conf != "low": return answer_text, False # Якщо LLM вже дав уточнення — не дублюємо if "?" in answer_text[-120:]: return answer_text, False result = ( answer_text.rstrip() + "\n\nЩоб визначити точніше: можеш надіслати фото листя ближче " "або уточнити — це нові ознаки чи давні?" ) return result, True except Exception: return answer_text, False def build_locked_reply(lock: dict, user_text: str) -> str: """ Повертає коротку відповідь якщо фото вже аналізувалось (same photo_key, no reeval). lock — словник з get_vision_lock(). """ try: user_lbl = lock.get("user_label") or "" label = user_lbl or lock.get("label") or "" conf = lock.get("confidence", "unknown") if not label: return "Це фото вже аналізував — повтори питання конкретніше або надішли нове." label_str = label.capitalize() if conf == "high" or user_lbl: src = "ти підтвердив" if user_lbl else "визначено" return ( f"Це фото вже аналізував: {label_str} ({src}). " "Що саме перевірити ще раз?" ) return ( f"Для цього фото раніше визначив: схоже на {label_str} (невисока впевненість). " "Надішли нове фото або уточни — переоцінити?" ) except Exception: return "Це фото вже аналізував. Що саме хочеш перевірити?"