Files
microdao-daarion/crews/agromatrix_crew/memory_manager.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

870 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Memory Manager для Степана — v2.8.
Завантажує/зберігає UserProfile і FarmProfile через memory-service.
Використовує sync httpx.Client (run.py sync).
При недоступності memory-service — деградує до процесного in-memory кешу (TTL 30 хв).
Fact-ключі в memory-service:
user_profile:agromatrix:{user_id} — per-user (interaction history, style, topics)
farm_profile:agromatrix:chat:{chat_id} — per-chat (shared farm context, v2.8+)
farm_profile:agromatrix:{user_id} — legacy per-user key (мігрується lazy)
v2.8 Multi-user farm model:
- Кілька операторів в одному chat_id ділять один FarmProfile.
- UserProfile (recent_topics, style, тощо) — per-user.
- Lazy migration: якщо нового ключа нема — спробуємо legacy, скопіюємо (write-through).
- Conflict policy: перший user задає chat-profile; наступний з відмінним legacy — не перезаписує, лише logить.
"""
from __future__ import annotations
import json
import logging
import os
import re
import threading
import time
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any
from crews.agromatrix_crew.telemetry import tlog
logger = logging.getLogger(__name__)
MEMORY_SERVICE_URL = os.getenv("AGX_MEMORY_SERVICE_URL", os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"))
_HTTP_TIMEOUT = float(os.getenv("AGX_MEMORY_TIMEOUT", "2.0"))
# ─── In-memory fallback cache ────────────────────────────────────────────────
_CACHE_TTL = 1800 # 30 хвилин
_cache: dict[str, tuple[float, dict]] = {} # key → (ts, data)
_cache_lock = threading.Lock()
def _cache_get(key: str) -> dict | None:
with _cache_lock:
entry = _cache.get(key)
if entry and (time.monotonic() - entry[0]) < _CACHE_TTL:
return deepcopy(entry[1])
return None
def _cache_set(key: str, data: dict) -> None:
with _cache_lock:
_cache[key] = (time.monotonic(), deepcopy(data))
# ─── Defaults ────────────────────────────────────────────────────────────────
_RECENT_TOPICS_MAX = 5
def _default_user_profile(user_id: str) -> dict:
return {
"_version": 4,
"user_id": user_id,
"name": None,
"role": "unknown",
"style": "conversational",
"preferred_kpi": [],
"interaction_summary": None,
# recent_topics: список до 5 останніх deep-тем
# Кожен елемент: {"label": str, "intent": str, "ts": str}
"recent_topics": [],
# last_topic / last_topic_label — derived aliases (backward-compat, оновлюються авто)
"last_topic": None,
"last_topic_label": None,
"interaction_count": 0,
"preferences": {
"units": "ha",
"report_format": "conversational",
"tone_constraints": {
"no_emojis": False,
"no_exclamations": False,
},
},
"updated_at": None,
}
# ─── Topic horizon helpers ────────────────────────────────────────────────────
_STOP_WORDS = frozenset({
"будь", "ласка", "привіт", "дякую", "спасибі", "ок", "добре", "зрозумів",
"я", "ти", "він", "вона", "ми", "ви", "що", "як", "де", "коли", "чому",
"і", "та", "але", "або", "якщо", "по", "до", "на", "за", "від", "у", "в", "з",
})
# Поля/культури/числа — зберігати у label обов'язково
_LABEL_PRESERVE_RE = re.compile(
r'\b(\d[\d.,]*\s*(?:га|кг|л|т|%)?|поле\s+\w+|поля\s+\w+|культура\s+\w+|'
r'пшениця|кукурудза|соняшник|ріпак|соя|ячмінь|жито|завтра|сьогодні|тиждень)\b',
re.IGNORECASE | re.UNICODE,
)
def summarize_topic_label(text: str) -> str:
"""
Rule-based: формує 610 слів людяний ярлик теми з тексту.
Приклад:
"зроби план на завтра по полю 12""план на завтра, поле 12"
"перевір вологість на полі north-01""вологість поле north-01"
"""
# Remove leading action verb (зроби, перевір, etc.)
action_re = re.compile(
r'^\s*(зроби|зробити|перевір|перевірити|порахуй|підготуй|онови|створи|'
r'запиши|зафіксуй|внеси|проаналізуй|покажи|сплануй|заплануй)\s*',
re.IGNORECASE | re.UNICODE,
)
cleaned = action_re.sub('', text).strip()
words = cleaned.split()
# Keep words: not stop-words, or matches preserve pattern
kept: list[str] = []
for w in words:
wl = w.lower().rstrip('.,!?')
if wl in _STOP_WORDS:
continue
kept.append(w.rstrip('.,!?'))
if len(kept) >= 8:
break
label = ' '.join(kept) if kept else text[:50]
# Capitalize first letter
return label[:1].upper() + label[1:] if label else text[:50]
def push_recent_topic(profile: dict, intent: str, label: str) -> None:
"""
Додає новий topic до recent_topics (max 5).
Оновлює last_topic і last_topic_label як aliases.
Не дублює якщо останній topic має той самий intent і подібний label.
"""
now_ts = datetime.now(timezone.utc).isoformat()
topics: list[dict] = profile.setdefault("recent_topics", [])
# Dedup: не додавати якщо той самий intent і label протягом сесії
if topics and topics[-1].get("intent") == intent and topics[-1].get("label") == label:
tlog(logger, "topics_push", pushed=False, reason="dedup", intent=intent)
return
topics.append({"label": label, "intent": intent, "ts": now_ts})
# Keep only last N
if len(topics) > _RECENT_TOPICS_MAX:
profile["recent_topics"] = topics[-_RECENT_TOPICS_MAX:]
# Keep aliases in sync
last = profile["recent_topics"][-1]
profile["last_topic"] = last["intent"]
profile["last_topic_label"] = last["label"]
tlog(logger, "topics_push", pushed=True, intent=intent, label=label,
horizon=len(profile["recent_topics"]))
def migrate_profile_topics(profile: dict) -> bool:
"""
Backward-compat міграція: якщо profile має last_topic (str) але немає recent_topics
→ створити recent_topics=[{"label": last_topic, "intent": last_topic, "ts": now}].
Повертає True якщо профіль змінено.
"""
changed = False
# Ensure recent_topics exists
if "recent_topics" not in profile:
lt = profile.get("last_topic")
if lt:
now_ts = datetime.now(timezone.utc).isoformat()
profile["recent_topics"] = [{"label": lt.replace("_", " "), "intent": lt, "ts": now_ts}]
else:
profile["recent_topics"] = []
changed = True
# Ensure last_topic_label exists
if "last_topic_label" not in profile:
topics = profile.get("recent_topics", [])
profile["last_topic_label"] = topics[-1]["label"] if topics else None
changed = True
# Ensure preferences.tone_constraints exists (older profiles)
prefs = profile.setdefault("preferences", {})
if "tone_constraints" not in prefs:
prefs["tone_constraints"] = {"no_emojis": False, "no_exclamations": False}
changed = True
return changed
def _default_farm_profile(chat_id: str) -> dict:
return {
"_version": 5,
"chat_id": chat_id,
"farm_name": None,
"region": None,
"crops": [],
"field_ids": [],
"fields": [], # backward-compat alias для field_ids
"crop_ids": [], # structured list (доповнює crops)
"systems": [],
"active_integrations": [],
"iot_sensors": [],
"alert_thresholds": {},
"seasonal_context": {},
"season_state": None, # backward-compat alias
"updated_at": None,
}
# Chat-keyed fact key (v2.8+)
def _chat_fact_key(chat_id: str) -> str:
return f"farm_profile:agromatrix:chat:{chat_id}"
# Legacy per-user fact key (pre-v2.8)
def _legacy_farm_fact_key(user_id: str) -> str:
return f"farm_profile:agromatrix:{user_id}"
def _farm_profiles_differ(a: dict, b: dict) -> bool:
"""
Перевіряє чи два farm-профілі суттєво відрізняються.
Порівнює: crops, field_ids, fields, region, season_state.
Ігнорує metadata (updated_at, _version, chat_id).
"""
compare_keys = ("crops", "field_ids", "fields", "region", "season_state", "active_integrations")
for k in compare_keys:
if a.get(k) != b.get(k):
return True
return False
# ─── HTTP helpers (sync) ─────────────────────────────────────────────────────
def _http_get_fact(user_id: str, fact_key: str) -> dict | None:
try:
import httpx
url = f"{MEMORY_SERVICE_URL}/facts/get"
resp = httpx.get(url, params={"user_id": user_id, "fact_key": fact_key}, timeout=_HTTP_TIMEOUT)
if resp.status_code == 200:
data = resp.json()
val = data.get("fact_value_json") or data.get("fact_value")
if isinstance(val, str):
try:
val = json.loads(val)
except Exception:
pass
return val if isinstance(val, dict) else None
return None
except Exception as exc:
logger.debug("memory_manager: get_fact failed key=%s: %s", fact_key, exc)
return None
def _http_upsert_fact(user_id: str, fact_key: str, data: dict) -> bool:
try:
import httpx
url = f"{MEMORY_SERVICE_URL}/facts/upsert"
payload = {
"user_id": user_id,
"fact_key": fact_key,
"fact_value_json": data,
}
resp = httpx.post(url, json=payload, timeout=_HTTP_TIMEOUT)
return resp.status_code in (200, 201)
except Exception as exc:
logger.debug("memory_manager: upsert_fact failed key=%s: %s", fact_key, exc)
return False
# ─── Public API ──────────────────────────────────────────────────────────────
def load_user_profile(user_id: str) -> dict:
"""
Завантажити UserProfile з memory-service.
Виконує backward-compat міграцію (recent_topics, last_topic_label, tone_constraints).
При будь-якій помилці — повертає профіль за замовчуванням.
"""
if not user_id:
return _default_user_profile("")
fact_key = f"user_profile:agromatrix:{user_id}"
cached = _cache_get(fact_key)
if cached:
return cached
profile = _http_get_fact(user_id, fact_key)
if profile:
# Apply backward-compat migration; if changed, update cache + persist async
if migrate_profile_topics(profile):
_cache_set(fact_key, profile)
else:
_cache_set(fact_key, profile)
return profile
default = _default_user_profile(user_id)
_cache_set(fact_key, default)
return default
def save_user_profile(user_id: str, profile: dict) -> None:
"""
Зберегти UserProfile у memory-service і оновити кеш.
Не кидає виняток.
"""
if not user_id:
return
fact_key = f"user_profile:agromatrix:{user_id}"
profile = deepcopy(profile)
profile["updated_at"] = datetime.now(timezone.utc).isoformat()
_cache_set(fact_key, profile)
ok = _http_upsert_fact(user_id, fact_key, profile)
if ok:
tlog(logger, "memory_save", entity="UserProfile", user_id=user_id, ok=True)
else:
tlog(logger, "memory_fallback", entity="UserProfile", user_id=user_id,
reason="memory_service_unavailable", level_hint="warning")
logger.warning("UserProfile NOT saved to memory-service (fallback cache only)")
def load_farm_profile(chat_id: str, user_id: str | None = None) -> dict:
"""
Завантажити FarmProfile з memory-service (v2.8: per-chat key).
Стратегія (lazy migration):
1. Спробувати новий chat-key: farm_profile:agromatrix:chat:{chat_id}
2. Якщо нема і є user_id — спробувати legacy key: farm_profile:agromatrix:{user_id}
- Якщо legacy знайдено: write-through міграція (зберегти в chat-key, видалити конфлікт)
3. Якщо нічого нема — default profile для chat_id
"""
if not chat_id:
return _default_farm_profile("")
chat_key = _chat_fact_key(chat_id)
synthetic_uid = f"farm:{chat_id}"
# 1. Cache hit (chat-key)
cached = _cache_get(chat_key)
if cached:
return cached
# 2. Try chat-key from memory-service
profile = _http_get_fact(synthetic_uid, chat_key)
if profile:
_cache_set(chat_key, profile)
return profile
# 3. Lazy migration: try legacy per-user key
if user_id:
legacy_key = _legacy_farm_fact_key(user_id)
legacy_cached = _cache_get(legacy_key)
legacy_profile = legacy_cached or _http_get_fact(user_id, legacy_key)
if legacy_profile:
# Write-through: copy to chat-key
legacy_profile = deepcopy(legacy_profile)
legacy_profile["chat_id"] = chat_id
legacy_profile["_migrated_from"] = f"legacy:{user_id}"
_cache_set(chat_key, legacy_profile)
# Persist to new key async (best-effort)
try:
_http_upsert_fact(synthetic_uid, chat_key, legacy_profile)
tlog(logger, "farm_profile_migrated", chat_id=chat_id, user_id=user_id, ok=True)
except Exception:
pass
return legacy_profile
# 4. Default
default = _default_farm_profile(chat_id)
_cache_set(chat_key, default)
return default
def save_farm_profile(chat_id: str, profile: dict) -> None:
"""
Зберегти FarmProfile у memory-service під chat-key (v2.8).
Не кидає виняток.
"""
if not chat_id:
return
synthetic_uid = f"farm:{chat_id}"
chat_key = _chat_fact_key(chat_id)
profile = deepcopy(profile)
profile["updated_at"] = datetime.now(timezone.utc).isoformat()
_cache_set(chat_key, profile)
ok = _http_upsert_fact(synthetic_uid, chat_key, profile)
if ok:
tlog(logger, "memory_save", entity="FarmProfile", chat_id=chat_id, ok=True)
else:
tlog(logger, "memory_fallback", entity="FarmProfile", chat_id=chat_id,
reason="memory_service_unavailable", level_hint="warning")
logger.warning("FarmProfile NOT saved to memory-service (fallback cache only)")
def migrate_farm_profile_legacy_to_chat(
chat_id: str,
user_id: str,
legacy_profile: dict,
) -> dict:
"""
Публічна функція явної міграції legacy farm_profile:{user_id} → farm_profile:chat:{chat_id}.
Conflict policy:
- Якщо chat-profile вже існує і суттєво відрізняється від legacy — НЕ перезаписуємо.
- Логуємо telemetry event 'farm_profile_conflict'.
- Повертаємо існуючий chat-profile як актуальний.
Якщо chat-profile ще нема або не відрізняється — копіюємо legacy у chat-key.
"""
chat_key = _chat_fact_key(chat_id)
synthetic_uid = f"farm:{chat_id}"
existing = _http_get_fact(synthetic_uid, chat_key)
if existing and _farm_profiles_differ(existing, legacy_profile):
# Conflict: log only, do not overwrite
tlog(logger, "farm_profile_conflict", chat_id=chat_id, user_id=user_id,
reason="legacy_diff")
logger.warning(
"FarmProfile conflict: chat-profile already exists with different data "
"(user=%s chat=%s); keeping existing chat-profile",
# user_id та chat_id не логуються сирими — tlog вже містить анонімізовані
"***", "***",
)
return existing
# No conflict or no existing — write-through
migrated = deepcopy(legacy_profile)
migrated["chat_id"] = chat_id
migrated["_migrated_from"] = f"legacy:{user_id}"
_cache_set(chat_key, migrated)
_http_upsert_fact(synthetic_uid, chat_key, migrated)
tlog(logger, "farm_profile_migrated", chat_id=chat_id, user_id=user_id, ok=True)
return migrated
# ─── Selective update helpers ────────────────────────────────────────────────
_ROLE_HINTS: dict[str, list[str]] = {
"owner": ["власник", "господар", "власниця", "засновник"],
"agronomist": ["агроном", "агрономка"],
"operator": ["оператор"],
"mechanic": ["механік", "тракторист", "водій"],
}
_STYLE_HINTS: dict[str, list[str]] = {
"concise": ["коротко", "без деталей", "стисло", "коротку", "коротку відповідь"],
"checklist": ["списком", "маркерами", "у списку", "по пунктах"],
"analytical": ["аналіз", "причини", "наслідки", "детальний аналіз"],
"detailed": ["детально", "докладно", "розгорнуто", "повністю"],
}
# ─── Interaction summary (rule-based) ────────────────────────────────────────
_ROLE_LABELS: dict[str, str] = {
"owner": "власник господарства",
"agronomist": "агроном",
"operator": "оператор",
"mechanic": "механік",
"unknown": "оператор",
}
_STYLE_LABELS: dict[str, str] = {
"concise": "надає перевагу стислим відповідям",
"checklist": "любить відповіді у вигляді списку",
"analytical": "цікавиться аналізом причин і наслідків",
"detailed": "воліє розгорнуті пояснення",
"conversational": "спілкується в розмовному стилі",
}
_TOPIC_LABELS_SUMMARY: dict[str, str] = {
"plan_day": "плануванні на день",
"plan_week": "плануванні на тиждень",
"plan_vs_fact": "аналізі план/факт",
"show_critical_tomorrow": "критичних задачах",
"close_plan": "закритті планів",
"iot_status": "стані датчиків",
"general": "загальних питаннях",
}
def build_interaction_summary(profile: dict) -> str:
"""
Формує коротке (12 речення) резюме профілю користувача з наявних полів.
Без LLM. Повертає рядок.
"""
parts: list[str] = []
name = profile.get("name")
role = profile.get("role", "unknown")
style = profile.get("style", "conversational")
last_topic = profile.get("last_topic")
count = profile.get("interaction_count", 0)
role_label = _ROLE_LABELS.get(role, "оператор")
name_part = f"{name}{role_label}" if name else role_label.capitalize()
parts.append(name_part + ".")
style_label = _STYLE_LABELS.get(style, "")
if style_label:
parts.append(style_label.capitalize() + ".")
if last_topic and last_topic in _TOPIC_LABELS_SUMMARY:
parts.append(f"Частіше питає про {_TOPIC_LABELS_SUMMARY[last_topic]}.")
if count > 0:
parts.append(f"Взаємодій: {count}.")
return " ".join(parts)
def _jaccard_similarity(a: str, b: str) -> float:
"""
Проста word-level Jaccard схожість між двома рядками.
Використовується для захисту від 'дрижання' summary.
"""
if not a or not b:
return 0.0
set_a = set(a.lower().split())
set_b = set(b.lower().split())
union = set_a | set_b
if not union:
return 0.0
return len(set_a & set_b) / len(union)
def _should_update_summary(profile: dict, prev_role: str, prev_style: str) -> bool:
"""Оновлювати summary кожні 10 взаємодій або при зміні role/style."""
count = profile.get("interaction_count", 0)
role_changed = profile.get("role") != prev_role
style_changed = profile.get("style") != prev_style
return count > 0 and (count % 10 == 0 or role_changed or style_changed)
def _summary_changed_enough(old_summary: str | None, new_summary: str) -> bool:
"""
Перезаписувати summary лише якщо зміна суттєва (Jaccard < 0.7).
При Jaccard ≥ 0.7 — зміна косметична, summary 'дрижить' — пропускаємо.
"""
if not old_summary:
return True # перший запис — завжди зберігаємо
similarity = _jaccard_similarity(old_summary, new_summary)
return similarity < 0.7
# ─── Memory Consolidation (v2.9) ─────────────────────────────────────────────
# Ліміти для UserProfile
_PREF_WHITELIST = frozenset({"units", "report_format", "tone_constraints", "language"})
_TC_BOOL_KEYS = frozenset({"no_emojis", "no_exclamations"})
_LIMIT_CONTEXT_NOTES = 20
_LIMIT_KNOWN_INTENTS = 30
_LIMIT_FIELD_IDS = 200
_LIMIT_CROP_IDS = 100
_LIMIT_ACTIVE_INTEG = 20
_SUMMARY_MAX_CHARS = 220
# Запускати consolidation кожні N взаємодій
_CONSOLIDATION_PERIOD = 25
def _trim_dedup(lst: list, limit: int) -> list:
"""Прибирає дублікати (stable order), обрізає до ліміту."""
seen: set = set()
result: list = []
for item in lst:
key = item if not isinstance(item, dict) else json.dumps(item, sort_keys=True)
if key not in seen:
seen.add(key)
result.append(item)
return result[-limit:]
def _cap_summary(text: str, max_chars: int = _SUMMARY_MAX_CHARS) -> str:
"""Обрізає рядок до max_chars не посередині слова."""
if len(text) <= max_chars:
return text
truncated = text[:max_chars]
last_space = truncated.rfind(' ')
if last_space > 0:
return truncated[:last_space]
return truncated
def consolidate_user_profile(profile: dict) -> dict:
"""
Нормалізує і обрізає поля UserProfile — прибирає шум без зміни семантики.
Операції:
- Trim/dedup: context_notes (≤20), known_intents (≤30)
- Preferences: залишити тільки whitelist ключів; tone_constraints — тільки bool-ключі
- interaction_summary: прибрати зайві пробіли; hard cap ≤220 символів (без обрізки слова)
- recent_topics: dedup за (intent, label) — вже є horizon 5, dedup для безпеки
Deterministic та idempotent: повторний виклик не змінює результат.
Fail-safe: помилка → повертає profile як є (без модифікацій).
"""
try:
p = deepcopy(profile)
# context_notes
notes = p.get("context_notes")
if isinstance(notes, list):
p["context_notes"] = _trim_dedup(notes, _LIMIT_CONTEXT_NOTES)
# known_intents
intents = p.get("known_intents")
if isinstance(intents, list):
p["known_intents"] = _trim_dedup(intents, _LIMIT_KNOWN_INTENTS)
# preferences whitelist
prefs = p.get("preferences")
if isinstance(prefs, dict):
cleaned_prefs: dict = {}
for k in _PREF_WHITELIST:
if k in prefs:
cleaned_prefs[k] = prefs[k]
# tone_constraints: normalize booleans, remove unknown keys
tc = cleaned_prefs.get("tone_constraints")
if isinstance(tc, dict):
cleaned_tc: dict = {}
for bk in _TC_BOOL_KEYS:
if bk in tc:
cleaned_tc[bk] = bool(tc[bk])
cleaned_prefs["tone_constraints"] = cleaned_tc
elif tc is None and "tone_constraints" not in cleaned_prefs:
cleaned_prefs["tone_constraints"] = {"no_emojis": False, "no_exclamations": False}
p["preferences"] = cleaned_prefs
# interaction_summary: normalize whitespace + cap
summary = p.get("interaction_summary")
if isinstance(summary, str):
normalized = " ".join(summary.split())
p["interaction_summary"] = _cap_summary(normalized)
# recent_topics: dedup by (intent+label) — safety guard on top of horizon
topics = p.get("recent_topics")
if isinstance(topics, list):
p["recent_topics"] = _trim_dedup(topics, _RECENT_TOPICS_MAX)
return p
except Exception as exc:
logger.warning("consolidate_user_profile error (returning original): %s", exc)
return profile
def consolidate_farm_profile(profile: dict) -> dict:
"""
Нормалізує і обрізає поля FarmProfile — запобігає необмеженому зростанню.
Операції:
- field_ids ≤200, crop_ids ≤100, active_integrations ≤20 (dedup + trim)
- Зберігає chat_id і _version без змін
Deterministic та idempotent. Fail-safe.
"""
try:
p = deepcopy(profile)
for field, limit in (
("field_ids", _LIMIT_FIELD_IDS),
("crop_ids", _LIMIT_CROP_IDS),
("active_integrations", _LIMIT_ACTIVE_INTEG),
("crops", _LIMIT_CROP_IDS), # legacy alias also capped
("fields", _LIMIT_FIELD_IDS), # legacy alias also capped
):
val = p.get(field)
if isinstance(val, list):
p[field] = _trim_dedup(val, limit)
return p
except Exception as exc:
logger.warning("consolidate_farm_profile error (returning original): %s", exc)
return profile
def _should_consolidate(interaction_count: int, profile: dict) -> tuple[bool, str]:
"""
Повертає (should_run, reason).
Запускати якщо:
- interaction_count % 25 == 0 (periodic)
- або будь-який список перевищив soft-ліміт * 1.5 (hard trigger)
"""
if interaction_count > 0 and interaction_count % _CONSOLIDATION_PERIOD == 0:
return True, "periodic"
# Hard trigger: list overflows
for field, limit in (
("context_notes", _LIMIT_CONTEXT_NOTES),
("known_intents", _LIMIT_KNOWN_INTENTS),
):
lst = profile.get(field)
if isinstance(lst, list) and len(lst) > int(limit * 1.5):
return True, "hard_trigger"
return False, ""
def _detect_role(text: str) -> str | None:
tl = text.lower()
for role, hints in _ROLE_HINTS.items():
if any(h in tl for h in hints):
return role
return None
def _detect_style(text: str) -> str | None:
tl = text.lower()
for style, hints in _STYLE_HINTS.items():
if any(h in tl for h in hints):
return style
return None
def update_profile_if_needed(
user_id: str,
chat_id: str,
text: str,
response: str,
intent: str | None = None,
depth: str = "deep", # "light" follow-ups не додають новий topic
) -> None:
"""
Оновлює UserProfile і FarmProfile лише якщо зʼявився новий факт.
depth="light" → recent_topics НЕ оновлюється (щоб не шуміло від followup).
Запускається в daemon thread — не блокує відповідь.
"""
def _do_update():
try:
user_changed = False
farm_changed = False
u = load_user_profile(user_id)
f = load_farm_profile(chat_id, user_id=user_id)
prev_role = u.get("role", "unknown")
prev_style = u.get("style", "conversational")
# interaction count
u["interaction_count"] = u.get("interaction_count", 0) + 1
user_changed = True
# ensure preferences field exists (migration for older profiles)
if "preferences" not in u:
u["preferences"] = {"no_emojis": False, "units": "ha", "report_format": "conversational"}
user_changed = True
# Ensure migration (recent_topics, last_topic_label)
if migrate_profile_topics(u):
user_changed = True
# last topic + recent_topics horizon
# Only deep interactions add to horizon (light follow-ups don't add noise)
if intent and intent != "general" and depth == "deep":
label = summarize_topic_label(text)
prev_last = u.get("last_topic")
push_recent_topic(u, intent, label)
if u.get("last_topic") != prev_last:
user_changed = True
elif intent and depth == "light":
tlog(logger, "topics_push", pushed=False, reason="light_followup", intent=intent)
# role detection
new_role = _detect_role(text)
if new_role and u.get("role") != new_role:
u["role"] = new_role
user_changed = True
# style detection
new_style = _detect_style(text)
if new_style and u.get("style") != new_style:
u["style"] = new_style
user_changed = True
# ensure preferences and tone_constraints fields exist (migration)
prefs = u.setdefault("preferences", {})
tc = prefs.setdefault("tone_constraints", {"no_emojis": False, "no_exclamations": False})
# Remove legacy flat no_emojis if present (migrate to tone_constraints)
if "no_emojis" in prefs and "no_emojis" not in tc:
tc["no_emojis"] = prefs.pop("no_emojis")
user_changed = True
tl = text.lower()
# Detect "no_emojis" constraint
if any(ph in tl for ph in ["без емодзі", "без смайлів", "без значків"]):
if not tc.get("no_emojis"):
tc["no_emojis"] = True
user_changed = True
# Detect "no_exclamations" constraint (стриманий стиль)
if any(ph in tl for ph in ["без окликів", "стримано", "офіційно", "без емоцій"]):
if not tc.get("no_exclamations"):
tc["no_exclamations"] = True
user_changed = True
# interaction_summary: update every 10 interactions or on role/style change
# Jaccard guard: skip if new summary too similar to old (prevents "shimmering")
if _should_update_summary(u, prev_role, prev_style):
new_summary = build_interaction_summary(u)
if _summary_changed_enough(u.get("interaction_summary"), new_summary):
u["interaction_summary"] = new_summary
user_changed = True
tlog(logger, "memory_summary_updated", user_id=user_id)
else:
logger.debug("UserProfile summary unchanged (Jaccard guard) user_id=%s", user_id)
# ── Memory consolidation (v2.9) ─────────────────────────────────
# Runs every 25 interactions (or on hard trigger if lists overflow)
should_con, con_reason = _should_consolidate(
u.get("interaction_count", 0), u
)
if should_con:
try:
u_before = deepcopy(u)
u = consolidate_user_profile(u)
con_changed = (u != u_before)
tlog(logger, "memory_consolidated", entity="user_profile",
user_id=user_id, changed=con_changed, reason=con_reason)
if con_changed:
user_changed = True
except Exception as exc:
tlog(logger, "memory_consolidation_error", entity="user_profile",
user_id=user_id, error=str(exc), level_hint="warning")
logger.warning("consolidate_user_profile failed (no-op): %s", exc)
if user_changed:
save_user_profile(user_id, u)
# FarmProfile: accumulate crops from text (minimal keyword heuristic)
for word in text.split():
w = word.strip(".,;:!?\"'").lower()
if len(w) > 3 and w not in f.get("crops", []):
if any(kw in w for kw in ["пшениця", "кукурудза", "соняшник", "ріпак", "соя", "ячмінь", "жито"]):
f.setdefault("crops", []).append(w)
farm_changed = True
# Farm consolidation (hard trigger only — farms change slowly)
_, farm_con_reason = _should_consolidate(0, {}) # periodic not used for farm
for field, limit in (
("field_ids", _LIMIT_FIELD_IDS),
("crop_ids", _LIMIT_CROP_IDS),
("active_integrations", _LIMIT_ACTIVE_INTEG),
):
lst = f.get(field)
if isinstance(lst, list) and len(lst) > int(limit * 1.5):
try:
f_before = deepcopy(f)
f = consolidate_farm_profile(f)
tlog(logger, "memory_consolidated", entity="farm_profile",
chat_id=chat_id, changed=(f != f_before), reason="hard_trigger")
farm_changed = True
except Exception as exc:
logger.warning("consolidate_farm_profile failed (no-op): %s", exc)
break # consolidation done once per update
if farm_changed:
save_farm_profile(chat_id, f)
except Exception as exc:
logger.warning("update_profile_if_needed failed: %s", exc)
t = threading.Thread(target=_do_update, daemon=True)
t.start()