""" Tests для Humanized Stepan v2.9 — Memory Consolidation. Покриває: 1. consolidate_user_profile: limits, dedup, preferences whitelist, summary cap 2. consolidate_farm_profile: field_ids/crop_ids/active_integrations limits 3. Trigger logic: periodic (%25) і hard_trigger (overflow * 1.5) 4. Idempotency 5. Fail-safe: виняток → профіль повертається без змін, warning у лог 6. Telemetry: memory_consolidated лог присутній з тегом """ import logging import sys from copy import deepcopy from pathlib import Path from unittest.mock import patch, MagicMock root = Path(__file__).resolve().parents[1] sys.path.insert(0, str(root)) sys.path.insert(0, str(root / 'packages' / 'agromatrix-tools')) from crews.agromatrix_crew.memory_manager import ( _default_user_profile, _default_farm_profile, consolidate_user_profile, consolidate_farm_profile, _should_consolidate, _cap_summary, _trim_dedup, _LIMIT_CONTEXT_NOTES, _LIMIT_KNOWN_INTENTS, _LIMIT_FIELD_IDS, _LIMIT_CROP_IDS, _LIMIT_ACTIVE_INTEG, _SUMMARY_MAX_CHARS, _CONSOLIDATION_PERIOD, _PREF_WHITELIST, ) from crews.agromatrix_crew.telemetry import TELEMETRY_TAG # ─── _cap_summary ───────────────────────────────────────────────────────────── def test_cap_summary_short_unchanged(): s = "Короткий текст" assert _cap_summary(s) == s def test_cap_summary_long_cuts_at_word_boundary(): words = ["слово"] * 100 long_text = " ".join(words) result = _cap_summary(long_text) assert len(result) <= _SUMMARY_MAX_CHARS assert not result.endswith("сло") # no mid-word cut def test_cap_summary_no_trailing_space(): long_text = "a " * 200 result = _cap_summary(long_text) assert len(result) <= _SUMMARY_MAX_CHARS assert not result.endswith(" ") def test_cap_summary_exactly_at_limit(): text = "x" * _SUMMARY_MAX_CHARS assert _cap_summary(text) == text def test_cap_summary_over_limit_no_mid_word(): text = "абвгд " * 50 # repeating 6-char + space = 7 chars per word result = _cap_summary(text) assert len(result) <= _SUMMARY_MAX_CHARS # result must end at a space boundary (last char is not a partial word) assert " " not in result or result == result.strip() # ─── _trim_dedup ────────────────────────────────────────────────────────────── def test_trim_dedup_removes_duplicates(): data = ["a", "b", "a", "c", "b"] result = _trim_dedup(data, 10) assert result == ["a", "b", "c"] def test_trim_dedup_preserves_order(): data = list(range(10)) assert _trim_dedup(data, 10) == list(range(10)) def test_trim_dedup_limits_to_max(): data = list(range(50)) result = _trim_dedup(data, 20) assert len(result) == 20 # Keeps the LAST N (most recent) assert result == list(range(30, 50)) def test_trim_dedup_empty_list(): assert _trim_dedup([], 10) == [] # ─── consolidate_user_profile: limits ──────────────────────────────────────── def test_user_context_notes_trimmed(): p = _default_user_profile("u1") p["context_notes"] = [f"note_{i}" for i in range(50)] result = consolidate_user_profile(p) assert len(result["context_notes"]) <= _LIMIT_CONTEXT_NOTES def test_user_context_notes_deduped(): p = _default_user_profile("u2") p["context_notes"] = ["note_A"] * 10 + ["note_B"] * 10 result = consolidate_user_profile(p) notes = result["context_notes"] assert notes.count("note_A") == 1 assert notes.count("note_B") == 1 def test_user_known_intents_trimmed(): p = _default_user_profile("u3") p["known_intents"] = [f"intent_{i}" for i in range(100)] result = consolidate_user_profile(p) assert len(result["known_intents"]) <= _LIMIT_KNOWN_INTENTS def test_user_known_intents_deduped(): p = _default_user_profile("u4") p["known_intents"] = ["plan_day"] * 20 result = consolidate_user_profile(p) assert result["known_intents"].count("plan_day") == 1 # ─── consolidate_user_profile: preferences whitelist ───────────────────────── def test_preferences_whitelist_removes_extra_keys(): p = _default_user_profile("u5") p["preferences"]["arbitrary_key"] = "value" p["preferences"]["another_extra"] = 123 result = consolidate_user_profile(p) prefs = result["preferences"] for k in prefs: assert k in _PREF_WHITELIST, f"Unexpected key in preferences: {k!r}" def test_preferences_whitelist_keeps_valid_keys(): p = _default_user_profile("u6") p["preferences"]["units"] = "ha" p["preferences"]["language"] = "uk" result = consolidate_user_profile(p) assert result["preferences"].get("units") == "ha" assert result["preferences"].get("language") == "uk" def test_preferences_tone_constraints_normalized(): p = _default_user_profile("u7") p["preferences"]["tone_constraints"] = { "no_emojis": 1, # int, not bool "no_exclamations": "yes", # str, not bool "unknown_key": "remove_me", } result = consolidate_user_profile(p) tc = result["preferences"]["tone_constraints"] assert isinstance(tc["no_emojis"], bool) assert isinstance(tc["no_exclamations"], bool) assert "unknown_key" not in tc def test_preferences_tone_constraints_preserved_if_missing(): """Якщо tone_constraints відсутній у префах — залишається без змін.""" p = _default_user_profile("u8") p["preferences"].pop("tone_constraints", None) result = consolidate_user_profile(p) # After consolidation either has defaults or is absent — no crash assert "preferences" in result # ─── consolidate_user_profile: interaction_summary ─────────────────────────── def test_summary_capped_at_220(): p = _default_user_profile("u9") p["interaction_summary"] = "Дуже довгий текст. " * 50 # ~1000 chars result = consolidate_user_profile(p) assert len(result["interaction_summary"]) <= _SUMMARY_MAX_CHARS def test_summary_no_mid_word_cut(): p = _default_user_profile("u10") p["interaction_summary"] = ("слово " * 50).strip() result = consolidate_user_profile(p) s = result["interaction_summary"] # Should not end mid-word (should end at a complete word) assert not s.endswith("сло") assert not s.endswith("сл") def test_summary_whitespace_normalized(): p = _default_user_profile("u11") p["interaction_summary"] = "Іван агроном. Часто питає." result = consolidate_user_profile(p) assert " " not in result["interaction_summary"] def test_summary_none_untouched(): p = _default_user_profile("u12") p["interaction_summary"] = None result = consolidate_user_profile(p) assert result["interaction_summary"] is None # ─── consolidate_user_profile: idempotency ─────────────────────────────────── def test_consolidation_idempotent(): p = _default_user_profile("u_idem") p["context_notes"] = ["note_x"] * 5 p["interaction_summary"] = "Іван агроном, короткі відповіді." p["preferences"]["units"] = "ha" once = consolidate_user_profile(p) twice = consolidate_user_profile(once) assert once == twice, "Consolidation not idempotent" # ─── consolidate_farm_profile ───────────────────────────────────────────────── def test_farm_field_ids_trimmed(): f = _default_farm_profile("chat_f1") f["field_ids"] = [f"field_{i}" for i in range(500)] result = consolidate_farm_profile(f) assert len(result["field_ids"]) <= _LIMIT_FIELD_IDS def test_farm_crop_ids_trimmed(): f = _default_farm_profile("chat_f2") f["crop_ids"] = [f"crop_{i}" for i in range(200)] result = consolidate_farm_profile(f) assert len(result["crop_ids"]) <= _LIMIT_CROP_IDS def test_farm_active_integrations_trimmed(): f = _default_farm_profile("chat_f3") f["active_integrations"] = [f"svc_{i}" for i in range(50)] result = consolidate_farm_profile(f) assert len(result["active_integrations"]) <= _LIMIT_ACTIVE_INTEG def test_farm_crops_legacy_trimmed(): """Legacy 'crops' field also capped.""" f = _default_farm_profile("chat_f4") f["crops"] = [f"crop_{i}" for i in range(200)] result = consolidate_farm_profile(f) assert len(result["crops"]) <= _LIMIT_CROP_IDS def test_farm_consolidation_preserves_chat_id(): f = _default_farm_profile("chat_preserve") f["field_ids"] = [f"x_{i}" for i in range(500)] result = consolidate_farm_profile(f) assert result["chat_id"] == "chat_preserve" def test_farm_consolidation_preserves_version(): f = _default_farm_profile("chat_ver") f["field_ids"] = [f"y_{i}" for i in range(500)] result = consolidate_farm_profile(f) assert result["_version"] == f["_version"] def test_farm_consolidation_idempotent(): f = _default_farm_profile("chat_idem") f["field_ids"] = [f"f_{i}" for i in range(500)] once = consolidate_farm_profile(f) twice = consolidate_farm_profile(once) assert once == twice # ─── _should_consolidate triggers ──────────────────────────────────────────── def test_periodic_trigger_at_25(): run, reason = _should_consolidate(25, {}) assert run is True assert reason == "periodic" def test_periodic_trigger_at_50(): run, reason = _should_consolidate(50, {}) assert run is True assert reason == "periodic" def test_no_trigger_at_24(): run, _ = _should_consolidate(24, {}) assert run is False def test_no_trigger_at_26(): run, _ = _should_consolidate(26, {}) assert run is False def test_no_trigger_at_zero(): run, _ = _should_consolidate(0, {}) assert run is False def test_hard_trigger_on_context_notes_overflow(): profile = {"context_notes": ["n"] * int(_LIMIT_CONTEXT_NOTES * 1.6)} run, reason = _should_consolidate(7, profile) # not a %25 count assert run is True assert reason == "hard_trigger" def test_hard_trigger_on_known_intents_overflow(): profile = {"known_intents": ["x"] * int(_LIMIT_KNOWN_INTENTS * 1.6)} run, reason = _should_consolidate(7, profile) assert run is True assert reason == "hard_trigger" def test_no_hard_trigger_under_1_5x(): profile = {"context_notes": ["n"] * (_LIMIT_CONTEXT_NOTES + 1)} run, _ = _should_consolidate(7, profile) assert run is False # ─── fail-safe ─────────────────────────────────────────────────────────────── def test_consolidate_user_profile_fail_safe(): """Якщо _trim_dedup кидає — повертає profile без змін.""" p = _default_user_profile("u_safe") p["context_notes"] = ["a", "b"] with patch( "crews.agromatrix_crew.memory_manager._trim_dedup", side_effect=RuntimeError("simulated crash"), ): result = consolidate_user_profile(p) # Must not raise; returns original assert result["user_id"] == "u_safe" def test_consolidate_farm_profile_fail_safe(): """Якщо deepcopy кидає — повертає profile без змін (fallback).""" f = _default_farm_profile("chat_safe") with patch( "crews.agromatrix_crew.memory_manager.deepcopy", side_effect=RuntimeError("simulated crash"), ): result = consolidate_farm_profile(f) assert result is not None # ─── telemetry: memory_consolidated tag present ─────────────────────────────── class _CaptureHandler(logging.Handler): def __init__(self): super().__init__() self.records: list[logging.LogRecord] = [] def emit(self, record): self.records.append(record) @property def messages(self): return [r.getMessage() for r in self.records] def test_consolidation_telemetry_tagged(): """ consolidate_user_profile + tlog → лог-рядок містить AGX_STEPAN_METRIC memory_consolidated. Перевіряємо напряму через tlog (не через thread). """ from crews.agromatrix_crew.telemetry import tlog mm_logger = logging.getLogger("crews.agromatrix_crew.memory_manager") mm_logger.setLevel(logging.DEBUG) h = _CaptureHandler() mm_logger.addHandler(h) try: p = _default_user_profile("u_tlog_test") p["context_notes"] = ["a"] * 5 p_before = deepcopy(p) result = consolidate_user_profile(p) changed = (result != p_before) tlog(mm_logger, "memory_consolidated", entity="user_profile", user_id="u_tlog_test", changed=changed, reason="periodic") tagged = [m for m in h.messages if TELEMETRY_TAG in m] assert any("memory_consolidated" in m for m in tagged), \ f"Expected memory_consolidated telemetry. Got: {tagged}" # Verify user_id is anonymized (no raw value) for m in tagged: if "memory_consolidated" in m: assert "u_tlog_test" not in m, f"Raw user_id in telemetry: {m!r}" assert "user_id=h:" in m finally: mm_logger.removeHandler(h) def test_consolidation_period_constant(): """CONSOLIDATION_PERIOD має бути 25.""" assert _CONSOLIDATION_PERIOD == 25 def test_consolidation_summary_limit_constant(): """_SUMMARY_MAX_CHARS має бути 220.""" assert _SUMMARY_MAX_CHARS == 220