agromatrix: harden correction learning and invalidate wrong labels

This commit is contained in:
Apple
2026-02-21 02:25:40 -08:00
parent 2b0b142f95
commit e00e7af1e7

View File

@@ -5,6 +5,7 @@ Handles incoming webhooks from Telegram, Discord, etc.
import asyncio import asyncio
import base64 import base64
import copy import copy
import hashlib
import json import json
import re import re
import logging import logging
@@ -69,6 +70,7 @@ USER_RESPONSE_STYLE_PREF_TTL = 30 * 24 * 3600 # 30 days
# Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts}) # Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts})
RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {} RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {}
RECENT_PHOTO_TTL = 30 * 60 # 30 minutes RECENT_PHOTO_TTL = 30 * 60 # 30 minutes
AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID = "agent:agromatrix:global"
def _cleanup_recent_photo_context() -> None: def _cleanup_recent_photo_context() -> None:
@@ -147,6 +149,248 @@ def _looks_like_photo_followup(text: str) -> bool:
return False return False
def _extract_agromatrix_correction_label(text: str) -> Optional[str]:
"""
Extract corrected plant label from free-form user feedback.
Examples:
- "це соняшник"
- "це не кабачок, а гарбуз"
- "правильна відповідь: кукурудза"
"""
raw = (text or "").strip()
if not raw:
return None
t = re.sub(r"\s+", " ", raw.lower())
patterns = [
r"правильн\w*\s+відповід\w*[:\-]?\s*([a-zаіїєґ0-9'\-\s]{2,60})",
r"це\s+не\s+[a-zаіїєґ0-9'\-\s]{1,60},?\s+а\s+([a-zаіїєґ0-9'\-\s]{2,60})",
]
for pat in patterns:
m = re.search(pat, t)
if not m:
continue
label = re.sub(r"\s+", " ", (m.group(1) or "").strip(" .,!?:;\"'()[]{}"))
if not label:
continue
if len(label.split()) > 4:
continue
if label in {"не знаю", "помилка", "невірно", "не вірно"}:
continue
# Filter imperative/meta phrases that are not plant labels.
bad_prefixes = (
"не ", "в чат", "зробити", "напиши", "потрібно", "навіщо",
"ти ", "він ", "вона ", "це ", "а ", "і ",
)
if label.startswith(bad_prefixes):
continue
if any(x in label for x in ("повідом", "чат", "відповід", "потрібно", "не потрібно")):
continue
if re.search(r"\d", label):
continue
return label
return None
def _is_agromatrix_negative_feedback(text: str) -> bool:
t = (text or "").strip().lower()
if not t:
return False
markers = (
"це помилка",
"це не вірно",
"це невірно",
"неправильно",
"не вірно",
"невірно",
"не так",
"помилка у відповіді",
"відповідь не вірна",
"відповідь невірна",
)
return any(m in t for m in markers)
def _is_agromatrix_correction_only_message(text: str) -> bool:
t = (text or "").strip().lower()
if not t:
return False
# Treat as "correction only" when there is no direct question.
if "?" in t:
return False
markers = (
"це ", "правильна відповідь", "невірно", "не вірно", "це не",
"не так", "неправильно", "виправ",
)
return any(m in t for m in markers)
def _truncate_context_for_prompt(raw: str, *, max_chars: int = 2200, max_lines: int = 28) -> str:
if not raw:
return ""
lines = [ln for ln in raw.splitlines() if ln.strip()]
if len(lines) > max_lines:
lines = lines[-max_lines:]
out = "\n".join(lines)
if len(out) > max_chars:
out = out[-max_chars:]
# try to cut from next line boundary for cleaner prompt
pos = out.find("\n")
if 0 <= pos < 200:
out = out[pos + 1 :]
return out.strip()
def _agromatrix_observation_doc_id(file_id: str, label: str) -> str:
digest = hashlib.sha1(f"{file_id}:{label}".encode("utf-8")).hexdigest()[:16]
return f"agromatrix-photo-{digest}"
async def _save_agromatrix_photo_learning(
*,
file_id: str,
label: str,
source: str,
chat_id: str,
user_id: str,
dao_id: str,
) -> None:
"""
Persist non-private photo learning:
1) fact keyed by file_id for deterministic follow-ups
2) anonymized doc card in agromatrix_docs for semantic reuse
"""
if not file_id or not label:
return
now_iso = datetime.utcnow().isoformat()
try:
await memory_client.upsert_fact(
user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID,
fact_key=f"agromatrix:photo_label:{file_id}",
fact_value=label,
fact_value_json={
"label": label,
"source": source,
"updated_at": now_iso,
},
team_id=dao_id,
)
except Exception as e:
logger.warning(f"AgroMatrix photo learning fact save failed: {e}")
# Best-effort semantic card, no personal data/chat text.
card_text = (
f"AgroMatrix plant observation.\n"
f"Validated label: {label}.\n"
f"Use as a prior hint for similar seedling/leaf photos.\n"
f"Source: {source}. Updated: {now_iso}."
)
try:
router_url = os.getenv("ROUTER_URL", "http://router:8000").rstrip("/")
async with httpx.AsyncClient(timeout=20.0) as client:
await client.post(
f"{router_url}/v1/documents/ingest",
json={
"agent_id": "agromatrix",
"doc_id": _agromatrix_observation_doc_id(file_id, label),
"file_name": f"agromatrix_photo_learning_{label}.txt",
"text": card_text,
"dao_id": dao_id,
"user_id": AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID,
},
)
except Exception as e:
logger.warning(f"AgroMatrix photo learning ingest failed: {e}")
async def _invalidate_agromatrix_photo_learning(
*,
file_id: str,
reason: str,
dao_id: str,
) -> None:
if not file_id:
return
try:
await memory_client.upsert_fact(
user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID,
fact_key=f"agromatrix:photo_label:{file_id}",
fact_value="",
fact_value_json={
"label": "",
"invalidated": True,
"reason": reason,
"updated_at": datetime.utcnow().isoformat(),
},
team_id=dao_id,
)
except Exception as e:
logger.warning(f"AgroMatrix photo learning invalidation failed: {e}")
async def _get_agromatrix_photo_prior(file_id: str, dao_id: str) -> Optional[str]:
if not file_id:
return None
try:
fact = await memory_client.get_fact(
user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID,
fact_key=f"agromatrix:photo_label:{file_id}",
team_id=dao_id,
)
if not fact:
return None
data = fact.get("fact_value_json") if isinstance(fact, dict) else None
if isinstance(data, dict):
if bool(data.get("invalidated")):
return None
label = str(data.get("label") or "").strip()
if label:
return label
label = str(fact.get("fact_value") or "").strip() if isinstance(fact, dict) else ""
return label or None
except Exception as e:
logger.warning(f"AgroMatrix photo prior lookup failed: {e}")
return None
async def _set_agromatrix_last_photo_ref(*, chat_id: str, user_id: str, file_id: str, dao_id: str) -> None:
if not (chat_id and user_id and file_id):
return
try:
await memory_client.upsert_fact(
user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID,
fact_key=f"agromatrix:last_photo:{chat_id}:{user_id}",
fact_value=file_id,
fact_value_json={"file_id": file_id, "updated_at": datetime.utcnow().isoformat()},
team_id=dao_id,
)
except Exception as e:
logger.warning(f"AgroMatrix last photo ref save failed: {e}")
async def _get_agromatrix_last_photo_ref(*, chat_id: str, user_id: str, dao_id: str) -> Optional[str]:
if not (chat_id and user_id):
return None
try:
fact = await memory_client.get_fact(
user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID,
fact_key=f"agromatrix:last_photo:{chat_id}:{user_id}",
team_id=dao_id,
)
if not fact:
return None
data = fact.get("fact_value_json") if isinstance(fact, dict) else None
if isinstance(data, dict):
file_id = str(data.get("file_id") or "").strip()
if file_id:
return file_id
file_id = str(fact.get("fact_value") or "").strip() if isinstance(fact, dict) else ""
return file_id or None
except Exception as e:
logger.warning(f"AgroMatrix last photo ref lookup failed: {e}")
return None
def _needs_photo_only_response(text: str) -> bool: def _needs_photo_only_response(text: str) -> bool:
""" """
Return True only for explicit requests to analyze/describe image content. Return True only for explicit requests to analyze/describe image content.
@@ -1434,6 +1678,13 @@ async def process_photo(
logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}") logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}")
_set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id) _set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id)
if agent_config.agent_id == "agromatrix":
await _set_agromatrix_last_photo_ref(
chat_id=chat_id,
user_id=user_id,
file_id=file_id,
dao_id=dao_id,
)
# Get caption for media question check # Get caption for media question check
caption = caption_override if caption_override is not None else ((update.message or {}).get("caption") or "") caption = caption_override if caption_override is not None else ((update.message or {}).get("caption") or "")
@@ -1500,6 +1751,14 @@ async def process_photo(
# Send to Router with specialist_vision_8b model (Swapper) # Send to Router with specialist_vision_8b model (Swapper)
# IMPORTANT: Default prompt must request BRIEF description (2-3 sentences max) # IMPORTANT: Default prompt must request BRIEF description (2-3 sentences max)
prompt = caption.strip() if caption else "Коротко (2-3 речення) скажи, що на цьому зображенні та яке його значення." prompt = caption.strip() if caption else "Коротко (2-3 речення) скажи, що на цьому зображенні та яке його значення."
if agent_config.agent_id == "agromatrix":
prior_label = await _get_agromatrix_photo_prior(file_id=file_id, dao_id=dao_id)
if prior_label:
prompt = (
f"{prompt}\n\n"
f"[Контекст навчання AgroMatrix: для цього фото раніше підтверджено мітку: '{prior_label}'. "
"Використай як пріоритетну гіпотезу, але перевір ознаки і коротко поясни.]"
)
router_request = { router_request = {
"message": f"{prompt}\n\n[Зображення передано окремо у context.images]", "message": f"{prompt}\n\n[Зображення передано окремо у context.images]",
"mode": "chat", "mode": "chat",
@@ -1557,7 +1816,7 @@ async def process_photo(
agent_metadata={"context": "photo"}, agent_metadata={"context": "photo"},
username=username, username=username,
) )
return {"ok": True, "agent": agent_config.agent_id, "model": "specialist_vision_8b"} return {"ok": True, "agent": agent_config.agent_id, "model": "specialist_vision_8b"}
else: else:
await send_telegram_message( await send_telegram_message(
@@ -2577,6 +2836,131 @@ async def handle_telegram_webhook(
) )
return {"ok": True, "agent": agent_config.agent_id, "mode": "greeting_fast_path"} return {"ok": True, "agent": agent_config.agent_id, "mode": "greeting_fast_path"}
# AgroMatrix: capture user correction for latest photo and persist anonymized learning.
if agent_config.agent_id == "agromatrix" and text:
corrected_label = _extract_agromatrix_correction_label(text)
negative_feedback = _is_agromatrix_negative_feedback(text)
recent_file_id = _get_recent_photo_file_id(agent_config.agent_id, chat_id, user_id)
if not recent_file_id:
try:
mc = await memory_client.get_context(
user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id,
team_id=dao_id,
channel_id=chat_id,
limit=80,
)
recent_file_id = _extract_recent_photo_file_id_from_memory(mc)
except Exception:
recent_file_id = None
if not recent_file_id:
recent_file_id = await _get_agromatrix_last_photo_ref(
chat_id=chat_id,
user_id=user_id,
dao_id=dao_id,
)
if corrected_label and recent_file_id:
await _save_agromatrix_photo_learning(
file_id=recent_file_id,
label=corrected_label,
source="user_correction",
chat_id=chat_id,
user_id=user_id,
dao_id=dao_id,
)
logger.info(
f"AgroMatrix learning updated: file_id={recent_file_id}, label={corrected_label}"
)
if _is_agromatrix_correction_only_message(text):
ack = (
f"Дякую, зафіксував виправлення: {corrected_label}. "
"Для цього фото надалі використовуватиму саме цю мітку."
)
await send_telegram_message(chat_id, ack, telegram_token)
await memory_client.save_chat_turn(
agent_id=agent_config.agent_id,
team_id=dao_id,
user_id=f"tg:{user_id}",
message=text,
response=ack,
channel_id=chat_id,
scope="short_term",
save_agent_response=True,
agent_metadata={"agromatrix_learning_updated": True},
username=username,
)
return {"ok": True, "agent": agent_config.agent_id, "mode": "learning_updated"}
elif corrected_label and _is_agromatrix_correction_only_message(text):
ack = (
f"Прийняв виправлення: {corrected_label}. "
"Але не бачу останнє фото в контексті, надішли фото ще раз і я зафіксую корекцію саме до нього."
)
await send_telegram_message(chat_id, ack, telegram_token)
await memory_client.save_chat_turn(
agent_id=agent_config.agent_id,
team_id=dao_id,
user_id=f"tg:{user_id}",
message=text,
response=ack,
channel_id=chat_id,
scope="short_term",
save_agent_response=True,
agent_metadata={"agromatrix_learning_no_photo_context": True},
username=username,
)
return {"ok": True, "agent": agent_config.agent_id, "mode": "learning_no_photo_context"}
# If user says answer is wrong but does not provide a replacement label,
# invalidate stale prior so it won't keep forcing repeated wrong guesses.
if negative_feedback and not corrected_label and recent_file_id:
await _invalidate_agromatrix_photo_learning(
file_id=recent_file_id,
reason="user_marked_previous_answer_wrong_without_replacement",
dao_id=dao_id,
)
logger.info(
f"AgroMatrix learning invalidated: file_id={recent_file_id}, reason=negative_feedback_no_label"
)
if _is_agromatrix_correction_only_message(text):
ack = (
"Прийняв. Попередню мітку для цього фото скасовано. "
"Напиши правильну назву культури або попроси: 'перевір фото ще раз'."
)
await send_telegram_message(chat_id, ack, telegram_token)
await memory_client.save_chat_turn(
agent_id=agent_config.agent_id,
team_id=dao_id,
user_id=f"tg:{user_id}",
message=text,
response=ack,
channel_id=chat_id,
scope="short_term",
save_agent_response=True,
agent_metadata={"agromatrix_learning_invalidated": True},
username=username,
)
return {"ok": True, "agent": agent_config.agent_id, "mode": "learning_invalidated"}
elif negative_feedback and not corrected_label and _is_agromatrix_correction_only_message(text):
ack = (
"Прийняв, що попередня відповідь була хибна. "
"Щоб закріпити правильну мітку, напиши у форматі: 'правильна відповідь: <назва культури>'."
)
await send_telegram_message(chat_id, ack, telegram_token)
await memory_client.save_chat_turn(
agent_id=agent_config.agent_id,
team_id=dao_id,
user_id=f"tg:{user_id}",
message=text,
response=ack,
channel_id=chat_id,
scope="short_term",
save_agent_response=True,
agent_metadata={"agromatrix_negative_feedback_no_photo_context": True},
username=username,
)
return {"ok": True, "agent": agent_config.agent_id, "mode": "negative_feedback_ack"}
# Photo/image intent guard: # Photo/image intent guard:
# if text references a photo/image, try to resolve latest file_id and route to vision. # if text references a photo/image, try to resolve latest file_id and route to vision.
photo_intent = False photo_intent = False
@@ -2881,7 +3265,7 @@ async def handle_telegram_webhook(
# Regular chat mode # Regular chat mode
# Fetch memory context (includes local context as fallback) # Fetch memory context (includes local context as fallback)
# Всі агенти мають доступ до однакової історії (80 повідомлень) для контексту # Всі агенти мають доступ до однакової історії (80 повідомлень) для контексту
context_limit = 80 # Однакове для всіх агентів context_limit = 40 if agent_config.agent_id == "agromatrix" else 80
memory_context = await memory_client.get_context( memory_context = await memory_client.get_context(
user_id=f"tg:{user_id}", user_id=f"tg:{user_id}",
agent_id=agent_config.agent_id, agent_id=agent_config.agent_id,
@@ -2891,7 +3275,11 @@ async def handle_telegram_webhook(
) )
# Build message with conversation context # Build message with conversation context
local_history = memory_context.get("local_context_text", "") local_history = _truncate_context_for_prompt(
memory_context.get("local_context_text", ""),
max_chars=2200 if agent_config.agent_id == "agromatrix" else 3800,
max_lines=28 if agent_config.agent_id == "agromatrix" else 48,
)
# Check if this is a training group # Check if this is a training group
is_training_group = str(chat_id) in TRAINING_GROUP_IDS is_training_group = str(chat_id) in TRAINING_GROUP_IDS
@@ -2911,6 +3299,7 @@ async def handle_telegram_webhook(
# Do not duplicate current prompt if it matches one pending message. # Do not duplicate current prompt if it matches one pending message.
unresolved_non_current = [q for q in unresolved_questions if q.strip() != (text or "").strip()] unresolved_non_current = [q for q in unresolved_questions if q.strip() != (text or "").strip()]
if unresolved_non_current: if unresolved_non_current:
unresolved_non_current = unresolved_non_current[-1:] if agent_config.agent_id == "agromatrix" else unresolved_non_current
unresolved_block = ( unresolved_block = (
"[КРИТИЧНО: є невідповідані питання цього користувача. " "[КРИТИЧНО: є невідповідані питання цього користувача. "
"Спочатку коротко відповідай на них, потім на поточне повідомлення. " "Спочатку коротко відповідай на них, потім на поточне повідомлення. "