Files
microdao-daarion/gateway-bot/memory_client.py

452 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import logging
import time
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime
from collections import deque
import httpx
logger = logging.getLogger(__name__)
MEMORY_SERVICE_URL = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
CONTEXT_CACHE_TTL = float(os.getenv("MEMORY_CONTEXT_CACHE_TTL", "5"))
LOCAL_CONTEXT_MAX_MESSAGES = int(os.getenv("LOCAL_CONTEXT_MAX_MESSAGES", "50"))
# Auto-summarize trigger configuration
SUMMARIZE_TURN_THRESHOLD = int(os.getenv("SUMMARIZE_TURN_THRESHOLD", "30"))
SUMMARIZE_DEBOUNCE_SECONDS = int(os.getenv("SUMMARIZE_DEBOUNCE_SECONDS", "300")) # 5 min
# =====================================
# LOCAL CONTEXT STORE (fallback when Memory Service unavailable)
# =====================================
class LocalContextStore:
"""Локальне сховище контексту (in-memory) для випадків, коли Memory Service недоступний"""
def __init__(self, max_messages: int = LOCAL_CONTEXT_MAX_MESSAGES):
self.max_messages = max_messages
# {chat_id: deque([(role, text, timestamp), ...])}
self._store: Dict[str, deque] = {}
def add_message(self, chat_id: str, role: str, text: str):
"""Додати повідомлення до контексту"""
if chat_id not in self._store:
self._store[chat_id] = deque(maxlen=self.max_messages)
self._store[chat_id].append({
"role": role,
"text": text,
"timestamp": datetime.now().isoformat()
})
def get_context(self, chat_id: str, limit: int = 30) -> List[Dict[str, Any]]:
"""Отримати останні повідомлення для контексту"""
if chat_id not in self._store:
return []
messages = list(self._store[chat_id])
return messages[-limit:] if limit else messages
def clear_chat(self, chat_id: str):
"""Очистити контекст чату"""
if chat_id in self._store:
del self._store[chat_id]
def format_for_prompt(self, chat_id: str, limit: int = 30) -> str:
"""Форматувати контекст для system prompt"""
messages = self.get_context(chat_id, limit)
if not messages:
return ""
lines = []
for msg in messages:
role = "User" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['text']}")
return "\n".join(lines)
# Global local context store
local_context = LocalContextStore()
class MemoryClient:
"""Клієнт для роботи з Memory Service"""
def __init__(self, base_url: str = MEMORY_SERVICE_URL):
self.base_url = base_url.rstrip("/")
self.timeout = 10.0
self._context_cache: Dict[str, Tuple[float, Dict[str, Any]]] = {}
# Auto-summarize state
self._turn_counters: Dict[str, int] = {}
self._last_summarize: Dict[str, float] = {}
def _cache_key(
self,
user_id: str,
agent_id: str,
team_id: str,
channel_id: Optional[str],
limit: int
) -> str:
return f"{user_id}:{agent_id}:{team_id}:{channel_id}:{limit}"
async def get_context(
self,
user_id: str,
agent_id: str,
team_id: str,
channel_id: Optional[str] = None,
limit: int = 80
) -> Dict[str, Any]:
"""
Отримати контекст пам'яті для діалогу.
Використовує локальний кеш як fallback, якщо Memory Service недоступний.
"""
cache_key = self._cache_key(user_id, agent_id, team_id, channel_id, limit)
cached = self._context_cache.get(cache_key)
now = time.monotonic()
if cached and now - cached[0] < CONTEXT_CACHE_TTL:
return cached[1]
# Спроба отримати контекст із Memory Service
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
params = {
"user_id": user_id,
"channel_id": channel_id,
"limit": limit,
}
resp = await client.get(
f"{self.base_url}/agents/{agent_id}/memory",
params=params,
headers={"Authorization": f"Bearer {user_id}"},
)
if resp.status_code == 200:
data = resp.json()
events = data.get("events", [])
# Сортуємо за timestamp, якщо є
events = sorted(
events,
key=lambda e: e.get("timestamp", ""),
)
# Build user_id -> username mapping from all events (newer events may have metadata)
_uid_to_name = {}
for e in events:
uid = e.get("user_id", "")
md = e.get("metadata", {})
uname = md.get("username") or ""
if uid and uname and uid not in _uid_to_name:
_uid_to_name[uid] = uname
# Also try sender_name
sn = e.get("sender_name", "")
if uid and sn and not sn.startswith("tg:") and uid not in _uid_to_name:
_uid_to_name[uid] = sn
recent_events = [
{
"body_text": e.get("content", ""),
"kind": e.get("kind", "message"),
"type": "user" if e.get("role") == "user" else "agent",
"role": e.get("role", "unknown"),
"timestamp": e.get("timestamp"),
"user_id": e.get("user_id"),
"sender_name": e.get("sender_name"),
}
for e in events
if e.get("content")
]
# Формуємо контекст для prompt
lines = []
for e in events:
content = e.get("content", "")
if not content:
continue
if e.get("role") == "user":
# Show sender name for group chats
sender = e.get("sender_name", "")
if not sender:
md = e.get("metadata", {})
sender = md.get("username") or md.get("first_name") or ""
# Resolve tg:IDs using the mapping
if not sender or sender.startswith("tg:"):
uid = e.get("user_id", "")
sender = _uid_to_name.get(uid, sender)
if sender:
role = f"[{sender}]"
else:
role = "User"
else:
role = "Assistant"
lines.append(f"{role}: {content}")
result = {
"facts": [],
"recent_events": recent_events,
"dialog_summaries": [],
"local_context_text": "\n".join(lines[-limit:]),
}
self._context_cache[cache_key] = (now, result)
return result
except Exception as e:
logger.debug(f"Memory Service context fetch failed, using local: {e}")
# FALLBACK: локальний контекст (in-memory)
local_messages = local_context.get_context(f"{agent_id}:{channel_id or user_id}", limit)
local_events = [
{"body_text": msg["text"], "kind": "message", "type": "user" if msg["role"] == "user" else "agent"}
for msg in local_messages
]
result = {
"facts": [],
"recent_events": local_events,
"dialog_summaries": [],
"local_context_text": local_context.format_for_prompt(f"{agent_id}:{channel_id or user_id}", limit),
}
self._context_cache[cache_key] = (now, result)
return result
async def save_chat_turn(
self,
agent_id: str,
team_id: str,
user_id: str,
message: str,
response: str,
channel_id: Optional[str] = None,
scope: str = "short_term",
save_agent_response: bool = True,
agent_metadata: Optional[Dict[str, Any]] = None,
username: Optional[str] = None
) -> bool:
"""
Зберегти один turn діалогу (повідомлення + відповідь).
Завжди зберігає в локальний контекст + намагається зберегти в Memory Service.
"""
chat_key = f"{agent_id}:{channel_id or user_id}"
# ЗАВЖДИ зберігаємо в локальний контекст
local_context.add_message(chat_key, "user", message)
if save_agent_response and response:
local_context.add_message(chat_key, "assistant", response)
logger.info(f"💾 Saved to local context: chat={chat_key}, messages={len(local_context.get_context(chat_key))}")
# Спроба зберегти в Memory Service (може бути недоступний)
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
user_event = {
"agent_id": agent_id,
"team_id": team_id,
"channel_id": channel_id,
"user_id": user_id,
"scope": scope,
"kind": "message",
"body_text": message,
"body_json": {"type": "user_message", "source": "telegram", "username": username or ""}
}
await client.post(
f"{self.base_url}/agents/{agent_id}/memory",
json=user_event,
headers={"Authorization": f"Bearer {user_id}"}
)
if save_agent_response and response:
agent_event = {
"agent_id": agent_id,
"team_id": team_id,
"channel_id": channel_id,
"user_id": user_id,
"scope": scope,
"kind": "message",
"body_text": response,
"body_json": {
"type": "agent_response",
"source": "telegram",
**(agent_metadata or {})
}
}
await client.post(
f"{self.base_url}/agents/{agent_id}/memory",
json=agent_event,
headers={"Authorization": f"Bearer {user_id}"}
)
# Auto-summarize trigger (fire-and-forget, non-blocking)
try:
asyncio.ensure_future(self._maybe_trigger_summarize(
agent_id=agent_id,
channel_id=channel_id,
user_id=user_id,
team_id=team_id
))
except Exception as trigger_err:
logger.debug(f"Summarize trigger scheduling failed: {trigger_err}")
return True
except Exception as e:
# Memory Service недоступний - але локальний контекст вже збережено
logger.debug(f"Memory Service unavailable (using local context): {e}")
return True # Return True because local context was saved
async def _maybe_trigger_summarize(
self,
agent_id: str,
channel_id,
user_id: str,
team_id=None
):
"""
Auto-trigger summarize when conversation reaches threshold.
Uses turn counter + debounce to avoid excessive calls.
"""
chat_key = f"{agent_id}:{channel_id or user_id}"
# Increment turn counter
self._turn_counters[chat_key] = self._turn_counters.get(chat_key, 0) + 1
turn_count = self._turn_counters[chat_key]
# Check threshold
if turn_count < SUMMARIZE_TURN_THRESHOLD:
return
# Check debounce
now = time.monotonic()
last = self._last_summarize.get(chat_key, 0)
if now - last < SUMMARIZE_DEBOUNCE_SECONDS:
logger.debug(f"Summarize debounce active for {chat_key}, skipping")
return
# Reset counter and update timestamp
self._turn_counters[chat_key] = 0
self._last_summarize[chat_key] = now
# Fire-and-forget summarize request
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{self.base_url}/agents/{agent_id}/summarize",
json={
"channel_id": channel_id,
"user_id": user_id,
"max_events": 60,
"force": False
},
headers={"Authorization": f"Bearer {user_id}"}
)
if resp.status_code == 200:
data = resp.json()
logger.info(
f"Auto-summary created for {chat_key}: "
f"events={data.get('events_summarized', '?')}, "
f"summary_len={len(data.get('summary', ''))}"
)
else:
logger.warning(
f"Auto-summary failed for {chat_key}: "
f"status={resp.status_code}, body={resp.text[:200]}"
)
except Exception as e:
logger.warning(f"Auto-summary request failed for {chat_key}: {e}")
async def create_dialog_summary(
self,
team_id: str,
channel_id: Optional[str],
agent_id: str,
user_id: Optional[str],
period_start: datetime,
period_end: datetime,
summary_text: str,
message_count: int = 0,
participant_count: int = 0,
topics: Optional[List[str]] = None,
summary_json: Optional[Dict[str, Any]] = None
) -> bool:
"""
Створити підсумок діалогу для масштабування без переповнення контексту
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/summaries",
json={
"team_id": team_id,
"channel_id": channel_id,
"agent_id": agent_id,
"user_id": user_id,
"period_start": period_start.isoformat(),
"period_end": period_end.isoformat(),
"summary_text": summary_text,
"summary_json": summary_json,
"message_count": message_count,
"participant_count": participant_count,
"topics": topics or [],
"meta": {}
},
headers={"Authorization": f"Bearer {user_id or 'system'}"}
)
return response.status_code in [200, 201]
except Exception as e:
logger.warning(f"Failed to create dialog summary: {e}")
return False
async def upsert_fact(
self,
user_id: str,
fact_key: str,
fact_value: Optional[str] = None,
fact_value_json: Optional[Dict[str, Any]] = None,
team_id: Optional[str] = None
) -> bool:
"""
Створити або оновити факт користувача
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/facts/upsert",
json={
"user_id": user_id,
"fact_key": fact_key,
"fact_value": fact_value,
"fact_value_json": fact_value_json,
"team_id": team_id
},
headers={"Authorization": f"Bearer {user_id}"}
)
return response.status_code in [200, 201]
except Exception as e:
logger.warning(f"Failed to upsert fact: {e}")
return False
async def get_fact(
self,
user_id: str,
fact_key: str,
team_id: Optional[str] = None
) -> Optional[Dict[str, Any]]:
"""
Отримати факт користувача
Returns:
Fact dict with fact_value and fact_value_json, or None if not found
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.base_url}/facts/{fact_key}",
params={
"user_id": user_id,
"team_id": team_id
},
headers={"Authorization": f"Bearer {user_id}"}
)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.warning(f"Failed to get fact: {e}")
return None
# Глобальний екземпляр клієнта
memory_client = MemoryClient()