diff --git a/gateway-bot/http_api.py b/gateway-bot/http_api.py index fbdc2a17..8bb526d3 100644 --- a/gateway-bot/http_api.py +++ b/gateway-bot/http_api.py @@ -5,6 +5,7 @@ Handles incoming webhooks from Telegram, Discord, etc. import asyncio import base64 import copy +import hashlib import json import re import logging @@ -69,6 +70,7 @@ USER_RESPONSE_STYLE_PREF_TTL = 30 * 24 * 3600 # 30 days # Recent photo context for follow-up questions in chat (agent:chat:user -> {file_id, ts}) RECENT_PHOTO_CONTEXT: Dict[str, Dict[str, Any]] = {} RECENT_PHOTO_TTL = 30 * 60 # 30 minutes +AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID = "agent:agromatrix:global" def _cleanup_recent_photo_context() -> None: @@ -147,6 +149,248 @@ def _looks_like_photo_followup(text: str) -> bool: return False +def _extract_agromatrix_correction_label(text: str) -> Optional[str]: + """ + Extract corrected plant label from free-form user feedback. + Examples: + - "це соняшник" + - "це не кабачок, а гарбуз" + - "правильна відповідь: кукурудза" + """ + raw = (text or "").strip() + if not raw: + return None + t = re.sub(r"\s+", " ", raw.lower()) + + patterns = [ + r"правильн\w*\s+відповід\w*[:\-]?\s*([a-zа-яіїєґ0-9'’\-\s]{2,60})", + r"це\s+не\s+[a-zа-яіїєґ0-9'’\-\s]{1,60},?\s+а\s+([a-zа-яіїєґ0-9'’\-\s]{2,60})", + ] + for pat in patterns: + m = re.search(pat, t) + if not m: + continue + label = re.sub(r"\s+", " ", (m.group(1) or "").strip(" .,!?:;\"'()[]{}")) + if not label: + continue + if len(label.split()) > 4: + continue + if label in {"не знаю", "помилка", "невірно", "не вірно"}: + continue + # Filter imperative/meta phrases that are not plant labels. + bad_prefixes = ( + "не ", "в чат", "зробити", "напиши", "потрібно", "навіщо", + "ти ", "він ", "вона ", "це ", "а ", "і ", + ) + if label.startswith(bad_prefixes): + continue + if any(x in label for x in ("повідом", "чат", "відповід", "потрібно", "не потрібно")): + continue + if re.search(r"\d", label): + continue + return label + return None + + +def _is_agromatrix_negative_feedback(text: str) -> bool: + t = (text or "").strip().lower() + if not t: + return False + markers = ( + "це помилка", + "це не вірно", + "це невірно", + "неправильно", + "не вірно", + "невірно", + "не так", + "помилка у відповіді", + "відповідь не вірна", + "відповідь невірна", + ) + return any(m in t for m in markers) + + +def _is_agromatrix_correction_only_message(text: str) -> bool: + t = (text or "").strip().lower() + if not t: + return False + # Treat as "correction only" when there is no direct question. + if "?" in t: + return False + markers = ( + "це ", "правильна відповідь", "невірно", "не вірно", "це не", + "не так", "неправильно", "виправ", + ) + return any(m in t for m in markers) + + +def _truncate_context_for_prompt(raw: str, *, max_chars: int = 2200, max_lines: int = 28) -> str: + if not raw: + return "" + lines = [ln for ln in raw.splitlines() if ln.strip()] + if len(lines) > max_lines: + lines = lines[-max_lines:] + out = "\n".join(lines) + if len(out) > max_chars: + out = out[-max_chars:] + # try to cut from next line boundary for cleaner prompt + pos = out.find("\n") + if 0 <= pos < 200: + out = out[pos + 1 :] + return out.strip() + + +def _agromatrix_observation_doc_id(file_id: str, label: str) -> str: + digest = hashlib.sha1(f"{file_id}:{label}".encode("utf-8")).hexdigest()[:16] + return f"agromatrix-photo-{digest}" + + +async def _save_agromatrix_photo_learning( + *, + file_id: str, + label: str, + source: str, + chat_id: str, + user_id: str, + dao_id: str, +) -> None: + """ + Persist non-private photo learning: + 1) fact keyed by file_id for deterministic follow-ups + 2) anonymized doc card in agromatrix_docs for semantic reuse + """ + if not file_id or not label: + return + now_iso = datetime.utcnow().isoformat() + try: + await memory_client.upsert_fact( + user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID, + fact_key=f"agromatrix:photo_label:{file_id}", + fact_value=label, + fact_value_json={ + "label": label, + "source": source, + "updated_at": now_iso, + }, + team_id=dao_id, + ) + except Exception as e: + logger.warning(f"AgroMatrix photo learning fact save failed: {e}") + + # Best-effort semantic card, no personal data/chat text. + card_text = ( + f"AgroMatrix plant observation.\n" + f"Validated label: {label}.\n" + f"Use as a prior hint for similar seedling/leaf photos.\n" + f"Source: {source}. Updated: {now_iso}." + ) + try: + router_url = os.getenv("ROUTER_URL", "http://router:8000").rstrip("/") + async with httpx.AsyncClient(timeout=20.0) as client: + await client.post( + f"{router_url}/v1/documents/ingest", + json={ + "agent_id": "agromatrix", + "doc_id": _agromatrix_observation_doc_id(file_id, label), + "file_name": f"agromatrix_photo_learning_{label}.txt", + "text": card_text, + "dao_id": dao_id, + "user_id": AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID, + }, + ) + except Exception as e: + logger.warning(f"AgroMatrix photo learning ingest failed: {e}") + + +async def _invalidate_agromatrix_photo_learning( + *, + file_id: str, + reason: str, + dao_id: str, +) -> None: + if not file_id: + return + try: + await memory_client.upsert_fact( + user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID, + fact_key=f"agromatrix:photo_label:{file_id}", + fact_value="", + fact_value_json={ + "label": "", + "invalidated": True, + "reason": reason, + "updated_at": datetime.utcnow().isoformat(), + }, + team_id=dao_id, + ) + except Exception as e: + logger.warning(f"AgroMatrix photo learning invalidation failed: {e}") + + +async def _get_agromatrix_photo_prior(file_id: str, dao_id: str) -> Optional[str]: + if not file_id: + return None + try: + fact = await memory_client.get_fact( + user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID, + fact_key=f"agromatrix:photo_label:{file_id}", + team_id=dao_id, + ) + if not fact: + return None + data = fact.get("fact_value_json") if isinstance(fact, dict) else None + if isinstance(data, dict): + if bool(data.get("invalidated")): + return None + label = str(data.get("label") or "").strip() + if label: + return label + label = str(fact.get("fact_value") or "").strip() if isinstance(fact, dict) else "" + return label or None + except Exception as e: + logger.warning(f"AgroMatrix photo prior lookup failed: {e}") + return None + + +async def _set_agromatrix_last_photo_ref(*, chat_id: str, user_id: str, file_id: str, dao_id: str) -> None: + if not (chat_id and user_id and file_id): + return + try: + await memory_client.upsert_fact( + user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID, + fact_key=f"agromatrix:last_photo:{chat_id}:{user_id}", + fact_value=file_id, + fact_value_json={"file_id": file_id, "updated_at": datetime.utcnow().isoformat()}, + team_id=dao_id, + ) + except Exception as e: + logger.warning(f"AgroMatrix last photo ref save failed: {e}") + + +async def _get_agromatrix_last_photo_ref(*, chat_id: str, user_id: str, dao_id: str) -> Optional[str]: + if not (chat_id and user_id): + return None + try: + fact = await memory_client.get_fact( + user_id=AGROMATRIX_GLOBAL_KNOWLEDGE_USER_ID, + fact_key=f"agromatrix:last_photo:{chat_id}:{user_id}", + team_id=dao_id, + ) + if not fact: + return None + data = fact.get("fact_value_json") if isinstance(fact, dict) else None + if isinstance(data, dict): + file_id = str(data.get("file_id") or "").strip() + if file_id: + return file_id + file_id = str(fact.get("fact_value") or "").strip() if isinstance(fact, dict) else "" + return file_id or None + except Exception as e: + logger.warning(f"AgroMatrix last photo ref lookup failed: {e}") + return None + + def _needs_photo_only_response(text: str) -> bool: """ Return True only for explicit requests to analyze/describe image content. @@ -1434,6 +1678,13 @@ async def process_photo( logger.info(f"{agent_config.name}: Photo from {username} (tg:{user_id}), file_id: {file_id}") _set_recent_photo_context(agent_config.agent_id, chat_id, user_id, file_id) + if agent_config.agent_id == "agromatrix": + await _set_agromatrix_last_photo_ref( + chat_id=chat_id, + user_id=user_id, + file_id=file_id, + dao_id=dao_id, + ) # Get caption for media question check caption = caption_override if caption_override is not None else ((update.message or {}).get("caption") or "") @@ -1500,6 +1751,14 @@ async def process_photo( # Send to Router with specialist_vision_8b model (Swapper) # IMPORTANT: Default prompt must request BRIEF description (2-3 sentences max) prompt = caption.strip() if caption else "Коротко (2-3 речення) скажи, що на цьому зображенні та яке його значення." + if agent_config.agent_id == "agromatrix": + prior_label = await _get_agromatrix_photo_prior(file_id=file_id, dao_id=dao_id) + if prior_label: + prompt = ( + f"{prompt}\n\n" + f"[Контекст навчання AgroMatrix: для цього фото раніше підтверджено мітку: '{prior_label}'. " + "Використай як пріоритетну гіпотезу, але перевір ознаки і коротко поясни.]" + ) router_request = { "message": f"{prompt}\n\n[Зображення передано окремо у context.images]", "mode": "chat", @@ -1557,7 +1816,7 @@ async def process_photo( agent_metadata={"context": "photo"}, username=username, ) - + return {"ok": True, "agent": agent_config.agent_id, "model": "specialist_vision_8b"} else: await send_telegram_message( @@ -2577,6 +2836,131 @@ async def handle_telegram_webhook( ) return {"ok": True, "agent": agent_config.agent_id, "mode": "greeting_fast_path"} + # AgroMatrix: capture user correction for latest photo and persist anonymized learning. + if agent_config.agent_id == "agromatrix" and text: + corrected_label = _extract_agromatrix_correction_label(text) + negative_feedback = _is_agromatrix_negative_feedback(text) + recent_file_id = _get_recent_photo_file_id(agent_config.agent_id, chat_id, user_id) + if not recent_file_id: + try: + mc = await memory_client.get_context( + user_id=f"tg:{user_id}", + agent_id=agent_config.agent_id, + team_id=dao_id, + channel_id=chat_id, + limit=80, + ) + recent_file_id = _extract_recent_photo_file_id_from_memory(mc) + except Exception: + recent_file_id = None + if not recent_file_id: + recent_file_id = await _get_agromatrix_last_photo_ref( + chat_id=chat_id, + user_id=user_id, + dao_id=dao_id, + ) + + if corrected_label and recent_file_id: + await _save_agromatrix_photo_learning( + file_id=recent_file_id, + label=corrected_label, + source="user_correction", + chat_id=chat_id, + user_id=user_id, + dao_id=dao_id, + ) + logger.info( + f"AgroMatrix learning updated: file_id={recent_file_id}, label={corrected_label}" + ) + if _is_agromatrix_correction_only_message(text): + ack = ( + f"Дякую, зафіксував виправлення: {corrected_label}. " + "Для цього фото надалі використовуватиму саме цю мітку." + ) + await send_telegram_message(chat_id, ack, telegram_token) + await memory_client.save_chat_turn( + agent_id=agent_config.agent_id, + team_id=dao_id, + user_id=f"tg:{user_id}", + message=text, + response=ack, + channel_id=chat_id, + scope="short_term", + save_agent_response=True, + agent_metadata={"agromatrix_learning_updated": True}, + username=username, + ) + return {"ok": True, "agent": agent_config.agent_id, "mode": "learning_updated"} + elif corrected_label and _is_agromatrix_correction_only_message(text): + ack = ( + f"Прийняв виправлення: {corrected_label}. " + "Але не бачу останнє фото в контексті, надішли фото ще раз і я зафіксую корекцію саме до нього." + ) + await send_telegram_message(chat_id, ack, telegram_token) + await memory_client.save_chat_turn( + agent_id=agent_config.agent_id, + team_id=dao_id, + user_id=f"tg:{user_id}", + message=text, + response=ack, + channel_id=chat_id, + scope="short_term", + save_agent_response=True, + agent_metadata={"agromatrix_learning_no_photo_context": True}, + username=username, + ) + return {"ok": True, "agent": agent_config.agent_id, "mode": "learning_no_photo_context"} + + # If user says answer is wrong but does not provide a replacement label, + # invalidate stale prior so it won't keep forcing repeated wrong guesses. + if negative_feedback and not corrected_label and recent_file_id: + await _invalidate_agromatrix_photo_learning( + file_id=recent_file_id, + reason="user_marked_previous_answer_wrong_without_replacement", + dao_id=dao_id, + ) + logger.info( + f"AgroMatrix learning invalidated: file_id={recent_file_id}, reason=negative_feedback_no_label" + ) + if _is_agromatrix_correction_only_message(text): + ack = ( + "Прийняв. Попередню мітку для цього фото скасовано. " + "Напиши правильну назву культури або попроси: 'перевір фото ще раз'." + ) + await send_telegram_message(chat_id, ack, telegram_token) + await memory_client.save_chat_turn( + agent_id=agent_config.agent_id, + team_id=dao_id, + user_id=f"tg:{user_id}", + message=text, + response=ack, + channel_id=chat_id, + scope="short_term", + save_agent_response=True, + agent_metadata={"agromatrix_learning_invalidated": True}, + username=username, + ) + return {"ok": True, "agent": agent_config.agent_id, "mode": "learning_invalidated"} + elif negative_feedback and not corrected_label and _is_agromatrix_correction_only_message(text): + ack = ( + "Прийняв, що попередня відповідь була хибна. " + "Щоб закріпити правильну мітку, напиши у форматі: 'правильна відповідь: <назва культури>'." + ) + await send_telegram_message(chat_id, ack, telegram_token) + await memory_client.save_chat_turn( + agent_id=agent_config.agent_id, + team_id=dao_id, + user_id=f"tg:{user_id}", + message=text, + response=ack, + channel_id=chat_id, + scope="short_term", + save_agent_response=True, + agent_metadata={"agromatrix_negative_feedback_no_photo_context": True}, + username=username, + ) + return {"ok": True, "agent": agent_config.agent_id, "mode": "negative_feedback_ack"} + # Photo/image intent guard: # if text references a photo/image, try to resolve latest file_id and route to vision. photo_intent = False @@ -2881,7 +3265,7 @@ async def handle_telegram_webhook( # Regular chat mode # Fetch memory context (includes local context as fallback) # Всі агенти мають доступ до однакової історії (80 повідомлень) для контексту - context_limit = 80 # Однакове для всіх агентів + context_limit = 40 if agent_config.agent_id == "agromatrix" else 80 memory_context = await memory_client.get_context( user_id=f"tg:{user_id}", agent_id=agent_config.agent_id, @@ -2891,7 +3275,11 @@ async def handle_telegram_webhook( ) # Build message with conversation context - local_history = memory_context.get("local_context_text", "") + local_history = _truncate_context_for_prompt( + memory_context.get("local_context_text", ""), + max_chars=2200 if agent_config.agent_id == "agromatrix" else 3800, + max_lines=28 if agent_config.agent_id == "agromatrix" else 48, + ) # Check if this is a training group is_training_group = str(chat_id) in TRAINING_GROUP_IDS @@ -2911,6 +3299,7 @@ async def handle_telegram_webhook( # Do not duplicate current prompt if it matches one pending message. unresolved_non_current = [q for q in unresolved_questions if q.strip() != (text or "").strip()] if unresolved_non_current: + unresolved_non_current = unresolved_non_current[-1:] if agent_config.agent_id == "agromatrix" else unresolved_non_current unresolved_block = ( "[КРИТИЧНО: є невідповідані питання цього користувача. " "Спочатку коротко відповідай на них, потім на поточне повідомлення. "