""" Memory Manager для Степана — v2.8. Завантажує/зберігає UserProfile і FarmProfile через memory-service. Використовує sync httpx.Client (run.py sync). При недоступності memory-service — деградує до процесного in-memory кешу (TTL 30 хв). Fact-ключі в memory-service: user_profile:agromatrix:{user_id} — per-user (interaction history, style, topics) farm_profile:agromatrix:chat:{chat_id} — per-chat (shared farm context, v2.8+) farm_profile:agromatrix:{user_id} — legacy per-user key (мігрується lazy) v2.8 Multi-user farm model: - Кілька операторів в одному chat_id ділять один FarmProfile. - UserProfile (recent_topics, style, тощо) — per-user. - Lazy migration: якщо нового ключа нема — спробуємо legacy, скопіюємо (write-through). - Conflict policy: перший user задає chat-profile; наступний з відмінним legacy — не перезаписує, лише logить. """ from __future__ import annotations import json import logging import os import re import threading import time from copy import deepcopy from datetime import datetime, timezone from typing import Any from crews.agromatrix_crew.telemetry import tlog logger = logging.getLogger(__name__) MEMORY_SERVICE_URL = os.getenv("AGX_MEMORY_SERVICE_URL", os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")) _HTTP_TIMEOUT = float(os.getenv("AGX_MEMORY_TIMEOUT", "2.0")) # ─── In-memory fallback cache ──────────────────────────────────────────────── _CACHE_TTL = 1800 # 30 хвилин _cache: dict[str, tuple[float, dict]] = {} # key → (ts, data) _cache_lock = threading.Lock() def _cache_get(key: str) -> dict | None: with _cache_lock: entry = _cache.get(key) if entry and (time.monotonic() - entry[0]) < _CACHE_TTL: return deepcopy(entry[1]) return None def _cache_set(key: str, data: dict) -> None: with _cache_lock: _cache[key] = (time.monotonic(), deepcopy(data)) # ─── Defaults ──────────────────────────────────────────────────────────────── _RECENT_TOPICS_MAX = 5 def _default_user_profile(user_id: str) -> dict: return { "_version": 4, "user_id": user_id, "name": None, "role": "unknown", "style": "conversational", "preferred_kpi": [], "interaction_summary": None, # recent_topics: список до 5 останніх deep-тем # Кожен елемент: {"label": str, "intent": str, "ts": str} "recent_topics": [], # last_topic / last_topic_label — derived aliases (backward-compat, оновлюються авто) "last_topic": None, "last_topic_label": None, "interaction_count": 0, "preferences": { "units": "ha", "report_format": "conversational", "tone_constraints": { "no_emojis": False, "no_exclamations": False, }, }, "updated_at": None, } # ─── Topic horizon helpers ──────────────────────────────────────────────────── _STOP_WORDS = frozenset({ "будь", "ласка", "привіт", "дякую", "спасибі", "ок", "добре", "зрозумів", "я", "ти", "він", "вона", "ми", "ви", "що", "як", "де", "коли", "чому", "і", "та", "але", "або", "якщо", "по", "до", "на", "за", "від", "у", "в", "з", }) # Поля/культури/числа — зберігати у label обов'язково _LABEL_PRESERVE_RE = re.compile( r'\b(\d[\d.,]*\s*(?:га|кг|л|т|%)?|поле\s+\w+|поля\s+\w+|культура\s+\w+|' r'пшениця|кукурудза|соняшник|ріпак|соя|ячмінь|жито|завтра|сьогодні|тиждень)\b', re.IGNORECASE | re.UNICODE, ) def summarize_topic_label(text: str) -> str: """ Rule-based: формує 6–10 слів людяний ярлик теми з тексту. Приклад: "зроби план на завтра по полю 12" → "план на завтра, поле 12" "перевір вологість на полі north-01" → "вологість поле north-01" """ # Remove leading action verb (зроби, перевір, etc.) action_re = re.compile( r'^\s*(зроби|зробити|перевір|перевірити|порахуй|підготуй|онови|створи|' r'запиши|зафіксуй|внеси|проаналізуй|покажи|сплануй|заплануй)\s*', re.IGNORECASE | re.UNICODE, ) cleaned = action_re.sub('', text).strip() words = cleaned.split() # Keep words: not stop-words, or matches preserve pattern kept: list[str] = [] for w in words: wl = w.lower().rstrip('.,!?') if wl in _STOP_WORDS: continue kept.append(w.rstrip('.,!?')) if len(kept) >= 8: break label = ' '.join(kept) if kept else text[:50] # Capitalize first letter return label[:1].upper() + label[1:] if label else text[:50] def push_recent_topic(profile: dict, intent: str, label: str) -> None: """ Додає новий topic до recent_topics (max 5). Оновлює last_topic і last_topic_label як aliases. Не дублює якщо останній topic має той самий intent і подібний label. """ now_ts = datetime.now(timezone.utc).isoformat() topics: list[dict] = profile.setdefault("recent_topics", []) # Dedup: не додавати якщо той самий intent і label протягом сесії if topics and topics[-1].get("intent") == intent and topics[-1].get("label") == label: tlog(logger, "topics_push", pushed=False, reason="dedup", intent=intent) return topics.append({"label": label, "intent": intent, "ts": now_ts}) # Keep only last N if len(topics) > _RECENT_TOPICS_MAX: profile["recent_topics"] = topics[-_RECENT_TOPICS_MAX:] # Keep aliases in sync last = profile["recent_topics"][-1] profile["last_topic"] = last["intent"] profile["last_topic_label"] = last["label"] tlog(logger, "topics_push", pushed=True, intent=intent, label=label, horizon=len(profile["recent_topics"])) def migrate_profile_topics(profile: dict) -> bool: """ Backward-compat міграція: якщо profile має last_topic (str) але немає recent_topics → створити recent_topics=[{"label": last_topic, "intent": last_topic, "ts": now}]. Повертає True якщо профіль змінено. """ changed = False # Ensure recent_topics exists if "recent_topics" not in profile: lt = profile.get("last_topic") if lt: now_ts = datetime.now(timezone.utc).isoformat() profile["recent_topics"] = [{"label": lt.replace("_", " "), "intent": lt, "ts": now_ts}] else: profile["recent_topics"] = [] changed = True # Ensure last_topic_label exists if "last_topic_label" not in profile: topics = profile.get("recent_topics", []) profile["last_topic_label"] = topics[-1]["label"] if topics else None changed = True # Ensure preferences.tone_constraints exists (older profiles) prefs = profile.setdefault("preferences", {}) if "tone_constraints" not in prefs: prefs["tone_constraints"] = {"no_emojis": False, "no_exclamations": False} changed = True return changed def _default_farm_profile(chat_id: str) -> dict: return { "_version": 5, "chat_id": chat_id, "farm_name": None, "region": None, "crops": [], "field_ids": [], "fields": [], # backward-compat alias для field_ids "crop_ids": [], # structured list (доповнює crops) "systems": [], "active_integrations": [], "iot_sensors": [], "alert_thresholds": {}, "seasonal_context": {}, "season_state": None, # backward-compat alias "updated_at": None, } # Chat-keyed fact key (v2.8+) def _chat_fact_key(chat_id: str) -> str: return f"farm_profile:agromatrix:chat:{chat_id}" # Legacy per-user fact key (pre-v2.8) def _legacy_farm_fact_key(user_id: str) -> str: return f"farm_profile:agromatrix:{user_id}" def _farm_profiles_differ(a: dict, b: dict) -> bool: """ Перевіряє чи два farm-профілі суттєво відрізняються. Порівнює: crops, field_ids, fields, region, season_state. Ігнорує metadata (updated_at, _version, chat_id). """ compare_keys = ("crops", "field_ids", "fields", "region", "season_state", "active_integrations") for k in compare_keys: if a.get(k) != b.get(k): return True return False # ─── HTTP helpers (sync) ───────────────────────────────────────────────────── def _http_get_fact(user_id: str, fact_key: str) -> dict | None: try: import httpx url = f"{MEMORY_SERVICE_URL}/facts/get" resp = httpx.get(url, params={"user_id": user_id, "fact_key": fact_key}, timeout=_HTTP_TIMEOUT) if resp.status_code == 200: data = resp.json() val = data.get("fact_value_json") or data.get("fact_value") if isinstance(val, str): try: val = json.loads(val) except Exception: pass return val if isinstance(val, dict) else None return None except Exception as exc: logger.debug("memory_manager: get_fact failed key=%s: %s", fact_key, exc) return None def _http_upsert_fact(user_id: str, fact_key: str, data: dict) -> bool: try: import httpx url = f"{MEMORY_SERVICE_URL}/facts/upsert" payload = { "user_id": user_id, "fact_key": fact_key, "fact_value_json": data, } resp = httpx.post(url, json=payload, timeout=_HTTP_TIMEOUT) return resp.status_code in (200, 201) except Exception as exc: logger.debug("memory_manager: upsert_fact failed key=%s: %s", fact_key, exc) return False # ─── Public API ────────────────────────────────────────────────────────────── def load_user_profile(user_id: str) -> dict: """ Завантажити UserProfile з memory-service. Виконує backward-compat міграцію (recent_topics, last_topic_label, tone_constraints). При будь-якій помилці — повертає профіль за замовчуванням. """ if not user_id: return _default_user_profile("") fact_key = f"user_profile:agromatrix:{user_id}" cached = _cache_get(fact_key) if cached: return cached profile = _http_get_fact(user_id, fact_key) if profile: # Apply backward-compat migration; if changed, update cache + persist async if migrate_profile_topics(profile): _cache_set(fact_key, profile) else: _cache_set(fact_key, profile) return profile default = _default_user_profile(user_id) _cache_set(fact_key, default) return default def save_user_profile(user_id: str, profile: dict) -> None: """ Зберегти UserProfile у memory-service і оновити кеш. Не кидає виняток. """ if not user_id: return fact_key = f"user_profile:agromatrix:{user_id}" profile = deepcopy(profile) profile["updated_at"] = datetime.now(timezone.utc).isoformat() _cache_set(fact_key, profile) ok = _http_upsert_fact(user_id, fact_key, profile) if ok: tlog(logger, "memory_save", entity="UserProfile", user_id=user_id, ok=True) else: tlog(logger, "memory_fallback", entity="UserProfile", user_id=user_id, reason="memory_service_unavailable", level_hint="warning") logger.warning("UserProfile NOT saved to memory-service (fallback cache only)") def load_farm_profile(chat_id: str, user_id: str | None = None) -> dict: """ Завантажити FarmProfile з memory-service (v2.8: per-chat key). Стратегія (lazy migration): 1. Спробувати новий chat-key: farm_profile:agromatrix:chat:{chat_id} 2. Якщо нема і є user_id — спробувати legacy key: farm_profile:agromatrix:{user_id} - Якщо legacy знайдено: write-through міграція (зберегти в chat-key, видалити конфлікт) 3. Якщо нічого нема — default profile для chat_id """ if not chat_id: return _default_farm_profile("") chat_key = _chat_fact_key(chat_id) synthetic_uid = f"farm:{chat_id}" # 1. Cache hit (chat-key) cached = _cache_get(chat_key) if cached: return cached # 2. Try chat-key from memory-service profile = _http_get_fact(synthetic_uid, chat_key) if profile: _cache_set(chat_key, profile) return profile # 3. Lazy migration: try legacy per-user key if user_id: legacy_key = _legacy_farm_fact_key(user_id) legacy_cached = _cache_get(legacy_key) legacy_profile = legacy_cached or _http_get_fact(user_id, legacy_key) if legacy_profile: # Write-through: copy to chat-key legacy_profile = deepcopy(legacy_profile) legacy_profile["chat_id"] = chat_id legacy_profile["_migrated_from"] = f"legacy:{user_id}" _cache_set(chat_key, legacy_profile) # Persist to new key async (best-effort) try: _http_upsert_fact(synthetic_uid, chat_key, legacy_profile) tlog(logger, "farm_profile_migrated", chat_id=chat_id, user_id=user_id, ok=True) except Exception: pass return legacy_profile # 4. Default default = _default_farm_profile(chat_id) _cache_set(chat_key, default) return default def save_farm_profile(chat_id: str, profile: dict) -> None: """ Зберегти FarmProfile у memory-service під chat-key (v2.8). Не кидає виняток. """ if not chat_id: return synthetic_uid = f"farm:{chat_id}" chat_key = _chat_fact_key(chat_id) profile = deepcopy(profile) profile["updated_at"] = datetime.now(timezone.utc).isoformat() _cache_set(chat_key, profile) ok = _http_upsert_fact(synthetic_uid, chat_key, profile) if ok: tlog(logger, "memory_save", entity="FarmProfile", chat_id=chat_id, ok=True) else: tlog(logger, "memory_fallback", entity="FarmProfile", chat_id=chat_id, reason="memory_service_unavailable", level_hint="warning") logger.warning("FarmProfile NOT saved to memory-service (fallback cache only)") def migrate_farm_profile_legacy_to_chat( chat_id: str, user_id: str, legacy_profile: dict, ) -> dict: """ Публічна функція явної міграції legacy farm_profile:{user_id} → farm_profile:chat:{chat_id}. Conflict policy: - Якщо chat-profile вже існує і суттєво відрізняється від legacy — НЕ перезаписуємо. - Логуємо telemetry event 'farm_profile_conflict'. - Повертаємо існуючий chat-profile як актуальний. Якщо chat-profile ще нема або не відрізняється — копіюємо legacy у chat-key. """ chat_key = _chat_fact_key(chat_id) synthetic_uid = f"farm:{chat_id}" existing = _http_get_fact(synthetic_uid, chat_key) if existing and _farm_profiles_differ(existing, legacy_profile): # Conflict: log only, do not overwrite tlog(logger, "farm_profile_conflict", chat_id=chat_id, user_id=user_id, reason="legacy_diff") logger.warning( "FarmProfile conflict: chat-profile already exists with different data " "(user=%s chat=%s); keeping existing chat-profile", # user_id та chat_id не логуються сирими — tlog вже містить анонімізовані "***", "***", ) return existing # No conflict or no existing — write-through migrated = deepcopy(legacy_profile) migrated["chat_id"] = chat_id migrated["_migrated_from"] = f"legacy:{user_id}" _cache_set(chat_key, migrated) _http_upsert_fact(synthetic_uid, chat_key, migrated) tlog(logger, "farm_profile_migrated", chat_id=chat_id, user_id=user_id, ok=True) return migrated # ─── Selective update helpers ──────────────────────────────────────────────── _ROLE_HINTS: dict[str, list[str]] = { "owner": ["власник", "господар", "власниця", "засновник"], "agronomist": ["агроном", "агрономка"], "operator": ["оператор"], "mechanic": ["механік", "тракторист", "водій"], } _STYLE_HINTS: dict[str, list[str]] = { "concise": ["коротко", "без деталей", "стисло", "коротку", "коротку відповідь"], "checklist": ["списком", "маркерами", "у списку", "по пунктах"], "analytical": ["аналіз", "причини", "наслідки", "детальний аналіз"], "detailed": ["детально", "докладно", "розгорнуто", "повністю"], } # ─── Interaction summary (rule-based) ──────────────────────────────────────── _ROLE_LABELS: dict[str, str] = { "owner": "власник господарства", "agronomist": "агроном", "operator": "оператор", "mechanic": "механік", "unknown": "оператор", } _STYLE_LABELS: dict[str, str] = { "concise": "надає перевагу стислим відповідям", "checklist": "любить відповіді у вигляді списку", "analytical": "цікавиться аналізом причин і наслідків", "detailed": "воліє розгорнуті пояснення", "conversational": "спілкується в розмовному стилі", } _TOPIC_LABELS_SUMMARY: dict[str, str] = { "plan_day": "плануванні на день", "plan_week": "плануванні на тиждень", "plan_vs_fact": "аналізі план/факт", "show_critical_tomorrow": "критичних задачах", "close_plan": "закритті планів", "iot_status": "стані датчиків", "general": "загальних питаннях", } def build_interaction_summary(profile: dict) -> str: """ Формує коротке (1–2 речення) резюме профілю користувача з наявних полів. Без LLM. Повертає рядок. """ parts: list[str] = [] name = profile.get("name") role = profile.get("role", "unknown") style = profile.get("style", "conversational") last_topic = profile.get("last_topic") count = profile.get("interaction_count", 0) role_label = _ROLE_LABELS.get(role, "оператор") name_part = f"{name} — {role_label}" if name else role_label.capitalize() parts.append(name_part + ".") style_label = _STYLE_LABELS.get(style, "") if style_label: parts.append(style_label.capitalize() + ".") if last_topic and last_topic in _TOPIC_LABELS_SUMMARY: parts.append(f"Частіше питає про {_TOPIC_LABELS_SUMMARY[last_topic]}.") if count > 0: parts.append(f"Взаємодій: {count}.") return " ".join(parts) def _jaccard_similarity(a: str, b: str) -> float: """ Проста word-level Jaccard схожість між двома рядками. Використовується для захисту від 'дрижання' summary. """ if not a or not b: return 0.0 set_a = set(a.lower().split()) set_b = set(b.lower().split()) union = set_a | set_b if not union: return 0.0 return len(set_a & set_b) / len(union) def _should_update_summary(profile: dict, prev_role: str, prev_style: str) -> bool: """Оновлювати summary кожні 10 взаємодій або при зміні role/style.""" count = profile.get("interaction_count", 0) role_changed = profile.get("role") != prev_role style_changed = profile.get("style") != prev_style return count > 0 and (count % 10 == 0 or role_changed or style_changed) def _summary_changed_enough(old_summary: str | None, new_summary: str) -> bool: """ Перезаписувати summary лише якщо зміна суттєва (Jaccard < 0.7). При Jaccard ≥ 0.7 — зміна косметична, summary 'дрижить' — пропускаємо. """ if not old_summary: return True # перший запис — завжди зберігаємо similarity = _jaccard_similarity(old_summary, new_summary) return similarity < 0.7 # ─── Memory Consolidation (v2.9) ───────────────────────────────────────────── # Ліміти для UserProfile _PREF_WHITELIST = frozenset({"units", "report_format", "tone_constraints", "language"}) _TC_BOOL_KEYS = frozenset({"no_emojis", "no_exclamations"}) _LIMIT_CONTEXT_NOTES = 20 _LIMIT_KNOWN_INTENTS = 30 _LIMIT_FIELD_IDS = 200 _LIMIT_CROP_IDS = 100 _LIMIT_ACTIVE_INTEG = 20 _SUMMARY_MAX_CHARS = 220 # Запускати consolidation кожні N взаємодій _CONSOLIDATION_PERIOD = 25 def _trim_dedup(lst: list, limit: int) -> list: """Прибирає дублікати (stable order), обрізає до ліміту.""" seen: set = set() result: list = [] for item in lst: key = item if not isinstance(item, dict) else json.dumps(item, sort_keys=True) if key not in seen: seen.add(key) result.append(item) return result[-limit:] def _cap_summary(text: str, max_chars: int = _SUMMARY_MAX_CHARS) -> str: """Обрізає рядок до max_chars не посередині слова.""" if len(text) <= max_chars: return text truncated = text[:max_chars] last_space = truncated.rfind(' ') if last_space > 0: return truncated[:last_space] return truncated def consolidate_user_profile(profile: dict) -> dict: """ Нормалізує і обрізає поля UserProfile — прибирає шум без зміни семантики. Операції: - Trim/dedup: context_notes (≤20), known_intents (≤30) - Preferences: залишити тільки whitelist ключів; tone_constraints — тільки bool-ключі - interaction_summary: прибрати зайві пробіли; hard cap ≤220 символів (без обрізки слова) - recent_topics: dedup за (intent, label) — вже є horizon 5, dedup для безпеки Deterministic та idempotent: повторний виклик не змінює результат. Fail-safe: помилка → повертає profile як є (без модифікацій). """ try: p = deepcopy(profile) # context_notes notes = p.get("context_notes") if isinstance(notes, list): p["context_notes"] = _trim_dedup(notes, _LIMIT_CONTEXT_NOTES) # known_intents intents = p.get("known_intents") if isinstance(intents, list): p["known_intents"] = _trim_dedup(intents, _LIMIT_KNOWN_INTENTS) # preferences whitelist prefs = p.get("preferences") if isinstance(prefs, dict): cleaned_prefs: dict = {} for k in _PREF_WHITELIST: if k in prefs: cleaned_prefs[k] = prefs[k] # tone_constraints: normalize booleans, remove unknown keys tc = cleaned_prefs.get("tone_constraints") if isinstance(tc, dict): cleaned_tc: dict = {} for bk in _TC_BOOL_KEYS: if bk in tc: cleaned_tc[bk] = bool(tc[bk]) cleaned_prefs["tone_constraints"] = cleaned_tc elif tc is None and "tone_constraints" not in cleaned_prefs: cleaned_prefs["tone_constraints"] = {"no_emojis": False, "no_exclamations": False} p["preferences"] = cleaned_prefs # interaction_summary: normalize whitespace + cap summary = p.get("interaction_summary") if isinstance(summary, str): normalized = " ".join(summary.split()) p["interaction_summary"] = _cap_summary(normalized) # recent_topics: dedup by (intent+label) — safety guard on top of horizon topics = p.get("recent_topics") if isinstance(topics, list): p["recent_topics"] = _trim_dedup(topics, _RECENT_TOPICS_MAX) return p except Exception as exc: logger.warning("consolidate_user_profile error (returning original): %s", exc) return profile def consolidate_farm_profile(profile: dict) -> dict: """ Нормалізує і обрізає поля FarmProfile — запобігає необмеженому зростанню. Операції: - field_ids ≤200, crop_ids ≤100, active_integrations ≤20 (dedup + trim) - Зберігає chat_id і _version без змін Deterministic та idempotent. Fail-safe. """ try: p = deepcopy(profile) for field, limit in ( ("field_ids", _LIMIT_FIELD_IDS), ("crop_ids", _LIMIT_CROP_IDS), ("active_integrations", _LIMIT_ACTIVE_INTEG), ("crops", _LIMIT_CROP_IDS), # legacy alias also capped ("fields", _LIMIT_FIELD_IDS), # legacy alias also capped ): val = p.get(field) if isinstance(val, list): p[field] = _trim_dedup(val, limit) return p except Exception as exc: logger.warning("consolidate_farm_profile error (returning original): %s", exc) return profile def _should_consolidate(interaction_count: int, profile: dict) -> tuple[bool, str]: """ Повертає (should_run, reason). Запускати якщо: - interaction_count % 25 == 0 (periodic) - або будь-який список перевищив soft-ліміт * 1.5 (hard trigger) """ if interaction_count > 0 and interaction_count % _CONSOLIDATION_PERIOD == 0: return True, "periodic" # Hard trigger: list overflows for field, limit in ( ("context_notes", _LIMIT_CONTEXT_NOTES), ("known_intents", _LIMIT_KNOWN_INTENTS), ): lst = profile.get(field) if isinstance(lst, list) and len(lst) > int(limit * 1.5): return True, "hard_trigger" return False, "" def _detect_role(text: str) -> str | None: tl = text.lower() for role, hints in _ROLE_HINTS.items(): if any(h in tl for h in hints): return role return None def _detect_style(text: str) -> str | None: tl = text.lower() for style, hints in _STYLE_HINTS.items(): if any(h in tl for h in hints): return style return None def update_profile_if_needed( user_id: str, chat_id: str, text: str, response: str, intent: str | None = None, depth: str = "deep", # "light" follow-ups не додають новий topic ) -> None: """ Оновлює UserProfile і FarmProfile лише якщо зʼявився новий факт. depth="light" → recent_topics НЕ оновлюється (щоб не шуміло від followup). Запускається в daemon thread — не блокує відповідь. """ def _do_update(): try: user_changed = False farm_changed = False u = load_user_profile(user_id) f = load_farm_profile(chat_id, user_id=user_id) prev_role = u.get("role", "unknown") prev_style = u.get("style", "conversational") # interaction count u["interaction_count"] = u.get("interaction_count", 0) + 1 user_changed = True # ensure preferences field exists (migration for older profiles) if "preferences" not in u: u["preferences"] = {"no_emojis": False, "units": "ha", "report_format": "conversational"} user_changed = True # Ensure migration (recent_topics, last_topic_label) if migrate_profile_topics(u): user_changed = True # last topic + recent_topics horizon # Only deep interactions add to horizon (light follow-ups don't add noise) if intent and intent != "general" and depth == "deep": label = summarize_topic_label(text) prev_last = u.get("last_topic") push_recent_topic(u, intent, label) if u.get("last_topic") != prev_last: user_changed = True elif intent and depth == "light": tlog(logger, "topics_push", pushed=False, reason="light_followup", intent=intent) # role detection new_role = _detect_role(text) if new_role and u.get("role") != new_role: u["role"] = new_role user_changed = True # style detection new_style = _detect_style(text) if new_style and u.get("style") != new_style: u["style"] = new_style user_changed = True # ensure preferences and tone_constraints fields exist (migration) prefs = u.setdefault("preferences", {}) tc = prefs.setdefault("tone_constraints", {"no_emojis": False, "no_exclamations": False}) # Remove legacy flat no_emojis if present (migrate to tone_constraints) if "no_emojis" in prefs and "no_emojis" not in tc: tc["no_emojis"] = prefs.pop("no_emojis") user_changed = True tl = text.lower() # Detect "no_emojis" constraint if any(ph in tl for ph in ["без емодзі", "без смайлів", "без значків"]): if not tc.get("no_emojis"): tc["no_emojis"] = True user_changed = True # Detect "no_exclamations" constraint (стриманий стиль) if any(ph in tl for ph in ["без окликів", "стримано", "офіційно", "без емоцій"]): if not tc.get("no_exclamations"): tc["no_exclamations"] = True user_changed = True # interaction_summary: update every 10 interactions or on role/style change # Jaccard guard: skip if new summary too similar to old (prevents "shimmering") if _should_update_summary(u, prev_role, prev_style): new_summary = build_interaction_summary(u) if _summary_changed_enough(u.get("interaction_summary"), new_summary): u["interaction_summary"] = new_summary user_changed = True tlog(logger, "memory_summary_updated", user_id=user_id) else: logger.debug("UserProfile summary unchanged (Jaccard guard) user_id=%s", user_id) # ── Memory consolidation (v2.9) ───────────────────────────────── # Runs every 25 interactions (or on hard trigger if lists overflow) should_con, con_reason = _should_consolidate( u.get("interaction_count", 0), u ) if should_con: try: u_before = deepcopy(u) u = consolidate_user_profile(u) con_changed = (u != u_before) tlog(logger, "memory_consolidated", entity="user_profile", user_id=user_id, changed=con_changed, reason=con_reason) if con_changed: user_changed = True except Exception as exc: tlog(logger, "memory_consolidation_error", entity="user_profile", user_id=user_id, error=str(exc), level_hint="warning") logger.warning("consolidate_user_profile failed (no-op): %s", exc) if user_changed: save_user_profile(user_id, u) # FarmProfile: accumulate crops from text (minimal keyword heuristic) for word in text.split(): w = word.strip(".,;:!?\"'").lower() if len(w) > 3 and w not in f.get("crops", []): if any(kw in w for kw in ["пшениця", "кукурудза", "соняшник", "ріпак", "соя", "ячмінь", "жито"]): f.setdefault("crops", []).append(w) farm_changed = True # Farm consolidation (hard trigger only — farms change slowly) _, farm_con_reason = _should_consolidate(0, {}) # periodic not used for farm for field, limit in ( ("field_ids", _LIMIT_FIELD_IDS), ("crop_ids", _LIMIT_CROP_IDS), ("active_integrations", _LIMIT_ACTIVE_INTEG), ): lst = f.get(field) if isinstance(lst, list) and len(lst) > int(limit * 1.5): try: f_before = deepcopy(f) f = consolidate_farm_profile(f) tlog(logger, "memory_consolidated", entity="farm_profile", chat_id=chat_id, changed=(f != f_before), reason="hard_trigger") farm_changed = True except Exception as exc: logger.warning("consolidate_farm_profile failed (no-op): %s", exc) break # consolidation done once per update if farm_changed: save_farm_profile(chat_id, f) except Exception as exc: logger.warning("update_profile_if_needed failed: %s", exc) t = threading.Thread(target=_do_update, daemon=True) t.start()