From 0cfd3619ea31e0fada66c95818319c772e29be96 Mon Sep 17 00:00:00 2001 From: Apple Date: Mon, 9 Feb 2026 10:15:43 -0800 Subject: [PATCH] feat: auto-summarize trigger for agent memory - Memory Service: POST /agents/{agent_id}/summarize endpoint - Fetches recent events by agent_id (new db.list_facts_by_agent) - Generates structured summary via DeepSeek LLM - Saves summary to PostgreSQL facts + Qdrant vector store - Returns structured JSON (summary, goals, decisions, key_facts) - Gateway memory_client: auto-trigger after 30 turns - Turn counter per chat (agent_id:channel_id) - 5-minute debounce between summarize calls - Fire-and-forget via asyncio.ensure_future (non-blocking) - Configurable via SUMMARIZE_TURN_THRESHOLD / SUMMARIZE_DEBOUNCE_SECONDS - Database: list_facts_by_agent() for agent-level queries without user_id Tested on NODE1: Helion summarize returns valid Ukrainian summary with 20 events. Co-authored-by: Cursor --- gateway-bot/memory_client.py | 78 +++++++++++ services/memory-service/app/database.py | 19 +++ services/memory-service/app/main.py | 166 ++++++++++++++++++++++++ 3 files changed, 263 insertions(+) diff --git a/gateway-bot/memory_client.py b/gateway-bot/memory_client.py index df7209e9..d22b9f85 100644 --- a/gateway-bot/memory_client.py +++ b/gateway-bot/memory_client.py @@ -13,6 +13,10 @@ MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000 CONTEXT_CACHE_TTL = float(os.getenv("MEMORY_CONTEXT_CACHE_TTL", "5")) LOCAL_CONTEXT_MAX_MESSAGES = int(os.getenv("LOCAL_CONTEXT_MAX_MESSAGES", "50")) +# Auto-summarize trigger configuration +SUMMARIZE_TURN_THRESHOLD = int(os.getenv("SUMMARIZE_TURN_THRESHOLD", "30")) +SUMMARIZE_DEBOUNCE_SECONDS = int(os.getenv("SUMMARIZE_DEBOUNCE_SECONDS", "300")) # 5 min + # ===================================== # LOCAL CONTEXT STORE (fallback when Memory Service unavailable) # ===================================== @@ -69,6 +73,9 @@ class MemoryClient: self.base_url = base_url.rstrip("/") self.timeout = 10.0 self._context_cache: Dict[str, Tuple[float, Dict[str, Any]]] = {} + # Auto-summarize state + self._turn_counters: Dict[str, int] = {} + self._last_summarize: Dict[str, float] = {} def _cache_key( self, @@ -258,12 +265,83 @@ class MemoryClient: headers={"Authorization": f"Bearer {user_id}"} ) + # Auto-summarize trigger (fire-and-forget, non-blocking) + try: + asyncio.ensure_future(self._maybe_trigger_summarize( + agent_id=agent_id, + channel_id=channel_id, + user_id=user_id, + team_id=team_id + )) + except Exception as trigger_err: + logger.debug(f"Summarize trigger scheduling failed: {trigger_err}") + return True except Exception as e: # Memory Service недоступний - але локальний контекст вже збережено logger.debug(f"Memory Service unavailable (using local context): {e}") return True # Return True because local context was saved + async def _maybe_trigger_summarize( + self, + agent_id: str, + channel_id, + user_id: str, + team_id=None + ): + """ + Auto-trigger summarize when conversation reaches threshold. + Uses turn counter + debounce to avoid excessive calls. + """ + chat_key = f"{agent_id}:{channel_id or user_id}" + + # Increment turn counter + self._turn_counters[chat_key] = self._turn_counters.get(chat_key, 0) + 1 + turn_count = self._turn_counters[chat_key] + + # Check threshold + if turn_count < SUMMARIZE_TURN_THRESHOLD: + return + + # Check debounce + now = time.monotonic() + last = self._last_summarize.get(chat_key, 0) + if now - last < SUMMARIZE_DEBOUNCE_SECONDS: + logger.debug(f"Summarize debounce active for {chat_key}, skipping") + return + + # Reset counter and update timestamp + self._turn_counters[chat_key] = 0 + self._last_summarize[chat_key] = now + + # Fire-and-forget summarize request + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{self.base_url}/agents/{agent_id}/summarize", + json={ + "channel_id": channel_id, + "user_id": user_id, + "max_events": 60, + "force": False + }, + headers={"Authorization": f"Bearer {user_id}"} + ) + if resp.status_code == 200: + data = resp.json() + logger.info( + f"Auto-summary created for {chat_key}: " + f"events={data.get('events_summarized', '?')}, " + f"summary_len={len(data.get('summary', ''))}" + ) + else: + logger.warning( + f"Auto-summary failed for {chat_key}: " + f"status={resp.status_code}, body={resp.text[:200]}" + ) + except Exception as e: + logger.warning(f"Auto-summary request failed for {chat_key}: {e}") + async def create_dialog_summary( self, team_id: str, diff --git a/services/memory-service/app/database.py b/services/memory-service/app/database.py index b3887403..2898760f 100644 --- a/services/memory-service/app/database.py +++ b/services/memory-service/app/database.py @@ -517,6 +517,25 @@ class Database: return [dict(row) for row in rows] + async def list_facts_by_agent( + self, + agent_id: str, + channel_id: str = None, + limit: int = 60 + ) -> list: + """List facts for an agent (any user), ordered by most recent.""" + async with self.pool.acquire() as conn: + query = "SELECT * FROM user_facts WHERE agent_id = $1" + params = [agent_id] + if channel_id: + query += " AND fact_key LIKE '%' || $2 || '%'" + params.append(channel_id) + query += " ORDER BY updated_at DESC" + query += f" LIMIT ${len(params) + 1}" + params.append(limit) + rows = await conn.fetch(query, *params) + return [dict(row) for row in rows] + async def delete_fact( self, user_id: str, diff --git a/services/memory-service/app/main.py b/services/memory-service/app/main.py index 84250a98..c979d6cc 100644 --- a/services/memory-service/app/main.py +++ b/services/memory-service/app/main.py @@ -1011,3 +1011,169 @@ async def get_stats(): if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) + + +# ============================================================================ +# AGENT-LEVEL SUMMARIZE (called by Gateway auto-trigger) +# ============================================================================ + +class AgentSummarizeRequest(BaseModel): + """Request to generate a summary of recent agent memory events""" + channel_id: Optional[str] = None + user_id: Optional[str] = None + max_events: int = 60 # how many recent events to summarize + force: bool = False # bypass debounce check + + +@app.post("/agents/{agent_id}/summarize") +async def summarize_agent_memory(agent_id: str, request: AgentSummarizeRequest): + """ + Generate rolling summary of recent agent memory events. + Called by Gateway when conversation reaches threshold. + + 1. Fetch recent events from facts table (agent-isolated) + 2. Generate structured summary via DeepSeek LLM + 3. Save summary back as a special fact for future context retrieval + 4. Index summary in Qdrant for semantic search + """ + import json as json_lib + from datetime import datetime as dt + from uuid import uuid4 + + try: + # 1. Fetch recent events for this agent (all users) + facts = await db.list_facts_by_agent( + agent_id=agent_id, + channel_id=request.channel_id, + limit=request.max_events + ) + + # Filter for chat events (optionally by channel) + events = [] + for fact in facts: + if not fact.get("fact_key", "").startswith("chat_event:"): + continue + event_data = fact.get("fact_value_json", {}) + if isinstance(event_data, str): + try: + event_data = json_lib.loads(event_data) + except Exception: + event_data = {} + if not isinstance(event_data, dict): + event_data = {} + if request.channel_id and event_data.get("channel_id") != request.channel_id: + continue + events.append(event_data) + + if len(events) < 5: + return { + "status": "skipped", + "reason": f"Too few events ({len(events)}), need at least 5", + "events_count": len(events) + } + + # 2. Format events for LLM + formatted_events = [] + for e in events: + formatted_events.append({ + "role": e.get("role", "user"), + "content": e.get("content", ""), + "timestamp": e.get("timestamp", "") + }) + + # 3. Generate summary via DeepSeek LLM + llm_result = await _llm_generate_summary(formatted_events) + + # 4. Save summary as a special fact + summary_id = str(uuid4()) + timestamp = dt.utcnow().isoformat() + summary_fact = { + "type": "dialog_summary", + "summary_id": summary_id, + "agent_id": agent_id, + "channel_id": request.channel_id, + "summary": llm_result["summary"], + "goals": llm_result["goals"], + "decisions": llm_result["decisions"], + "open_questions": llm_result["open_questions"], + "next_steps": llm_result["next_steps"], + "key_facts": llm_result["key_facts"], + "events_summarized": len(events), + "timestamp": timestamp + } + + await db.ensure_facts_table() + await db.upsert_fact( + user_id=request.user_id or "system", + fact_key=f"summary:{agent_id}:{request.channel_id or all}:{timestamp}", + fact_value_json=summary_fact, + team_id=None, + agent_id=agent_id + ) + + # 5. Index in Qdrant for semantic search + summary_text = llm_result["summary"] + if summary_text and len(summary_text) > 20: + try: + from .embedding import get_document_embeddings + from qdrant_client import models as qmodels + + embeddings = await get_document_embeddings([summary_text]) + if embeddings: + vector = embeddings[0] + collection_name = f"{agent_id}_summaries" + + # Ensure collection exists + try: + vector_store.client.get_collection(collection_name) + except Exception: + vector_store.client.create_collection( + collection_name=collection_name, + vectors_config=qmodels.VectorParams( + size=len(vector), + distance=qmodels.Distance.COSINE + ) + ) + logger.info("created_summary_collection", collection=collection_name) + + vector_store.client.upsert( + collection_name=collection_name, + points=[ + qmodels.PointStruct( + id=summary_id, + vector=vector, + payload={ + "type": "dialog_summary", + "agent_id": agent_id, + "channel_id": request.channel_id, + "events_count": len(events), + "summary_text": summary_text, + "timestamp": timestamp + } + ) + ] + ) + except Exception as ve: + logger.warning("summary_qdrant_index_failed", + error=str(ve), agent_id=agent_id) + + logger.info("agent_summary_created", + agent_id=agent_id, + channel_id=request.channel_id, + events_count=len(events), + summary_len=len(summary_text)) + + return { + "status": "ok", + "summary_id": summary_id, + "summary": llm_result["summary"], + "goals": llm_result["goals"], + "decisions": llm_result["decisions"], + "key_facts": llm_result["key_facts"], + "events_summarized": len(events), + "timestamp": timestamp + } + + except Exception as e: + logger.error("agent_summarize_failed", error=str(e), agent_id=agent_id) + raise HTTPException(status_code=500, detail=str(e))