feat: add Memory Service for DAARWIZZ

This commit is contained in:
Apple
2025-11-15 10:09:41 -08:00
parent a54a7b078c
commit 9e99c3afe2
7 changed files with 1355 additions and 0 deletions

View File

@@ -0,0 +1,144 @@
# Memory Service
FastAPI сервіс для управління пам'яттю в MicroDAO:
- **user_facts** - довгострокова пам'ять користувачів
- **dialog_summaries** - підсумки діалогів для масштабування
- **agent_memory_events** - події пам'яті агентів
- **token-gate інтеграція** - перевірка доступу через RBAC
## Встановлення
```bash
# Створити віртуальне середовище
python -m venv venv
source venv/bin/activate # Linux/Mac
# або
venv\Scripts\activate # Windows
# Встановити залежності
pip install -r requirements.txt
# Налаштувати .env файл
cp .env.example .env
# Відредагуйте .env з вашими налаштуваннями
```
## Запуск
```bash
# Development
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# Production
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
```
## API Endpoints
### User Facts
- `POST /facts/upsert` - Створити або оновити факт (основний ендпоінт)
- `GET /facts` - Список фактів користувача
- `GET /facts/{fact_key}` - Отримати факт за ключем
- `POST /facts` - Створити новий факт
- `PATCH /facts/{fact_id}` - Оновити факт
- `DELETE /facts/{fact_id}` - Видалити факт
- `GET /facts/token-gated` - Токен-гейт факти
### Dialog Summaries
- `POST /summaries` - Створити підсумок діалогу
- `GET /summaries` - Список підсумків (з cursor pagination)
- `GET /summaries/{summary_id}` - Отримати підсумок
- `DELETE /summaries/{summary_id}` - Видалити підсумок
### Agent Memory Events
- `POST /agents/{agent_id}/memory` - Створити подію пам'яті
- `GET /agents/{agent_id}/memory` - Список подій (з cursor pagination)
- `DELETE /agents/{agent_id}/memory/{event_id}` - Видалити подію
### Token Gate
- `POST /token-gate/check` - Перевірка токен-гейту
## Приклади використання
### Створити/оновити факт користувача
```bash
curl -X POST "http://localhost:8000/facts/upsert" \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"user_id": "u_123",
"fact_key": "language",
"fact_value": "uk-UA",
"metadata": {"source": "onboarding"}
}'
```
### Створити токен-гейт факт
```bash
curl -X POST "http://localhost:8000/facts/upsert" \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"user_id": "u_123",
"fact_key": "is_donor",
"fact_value": "true",
"token_gated": true,
"token_requirements": {
"token": "DAAR",
"min_balance": 1
}
}'
```
### Створити підсумок діалогу
```bash
curl -X POST "http://localhost:8000/summaries" \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"team_id": "t_123",
"channel_id": "c_456",
"period_start": "2025-01-01T00:00:00Z",
"period_end": "2025-01-01T23:59:59Z",
"summary_text": "Обговорювали новий проєкт, вирішили використати React",
"message_count": 42,
"participant_count": 5,
"topics": ["project-planning", "tech-stack"]
}'
```
## Структура бази даних
Сервіс використовує такі таблиці:
- `user_facts` - факти користувачів
- `dialog_summaries` - підсумки діалогів
- `agent_memory_events` - події пам'яті агентів
- `agent_memory_facts_vector` - векторні представлення фактів (для RAG)
## Інтеграція з іншими сервісами
### PDP Service
Для перевірки токен-гейту використовується PDP Service (Policy Decision Point).
### Wallet Service
Для перевірки балансів токенів використовується Wallet Service.
### Auth Service
Для авторизації використовується JWT токени з Auth Service.
## TODO
- [ ] Реалізувати реальну перевірку JWT токенів
- [ ] Інтегрувати з PDP Service для token-gate
- [ ] Інтегрувати з Wallet Service для перевірки балансів
- [ ] Додати кешування для часто використовуваних фактів
- [ ] Додати метрики та моніторинг
- [ ] Додати тести

View File

@@ -0,0 +1,329 @@
"""
CRUD операції для Memory Service
"""
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc
from datetime import datetime
from models import UserFact, DialogSummary, AgentMemoryEvent, AgentMemoryFactsVector
from schemas import (
UserFactCreate, UserFactUpdate, UserFactUpsertRequest,
DialogSummaryCreate, AgentMemoryEventCreate
)
# ========== User Facts CRUD ==========
def get_user_fact(
db: Session,
user_id: str,
fact_key: str,
team_id: Optional[str] = None
) -> Optional[UserFact]:
"""Отримати факт користувача"""
query = db.query(UserFact).filter(
and_(
UserFact.user_id == user_id,
UserFact.fact_key == fact_key
)
)
if team_id:
query = query.filter(UserFact.team_id == team_id)
else:
query = query.filter(UserFact.team_id.is_(None))
return query.first()
def get_user_facts(
db: Session,
user_id: str,
team_id: Optional[str] = None,
fact_keys: Optional[List[str]] = None,
skip: int = 0,
limit: int = 100
) -> List[UserFact]:
"""Отримати список фактів користувача"""
query = db.query(UserFact).filter(UserFact.user_id == user_id)
if team_id:
query = query.filter(
or_(
UserFact.team_id == team_id,
UserFact.team_id.is_(None) # Глобальні факти
)
)
if fact_keys:
query = query.filter(UserFact.fact_key.in_(fact_keys))
# Фільтр за терміном дії
query = query.filter(
or_(
UserFact.expires_at.is_(None),
UserFact.expires_at > datetime.utcnow()
)
)
return query.offset(skip).limit(limit).all()
def create_user_fact(
db: Session,
fact: UserFactCreate
) -> UserFact:
"""Створити новий факт"""
db_fact = UserFact(**fact.dict())
db.add(db_fact)
db.commit()
db.refresh(db_fact)
return db_fact
def update_user_fact(
db: Session,
fact_id: str,
fact_update: UserFactUpdate
) -> Optional[UserFact]:
"""Оновити факт"""
db_fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not db_fact:
return None
update_data = fact_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_fact, field, value)
db.commit()
db.refresh(db_fact)
return db_fact
def upsert_user_fact(
db: Session,
fact_request: UserFactUpsertRequest
) -> Tuple[UserFact, bool]:
"""
Створити або оновити факт (upsert)
Повертає (fact, created) де created = True якщо створено новий
"""
# Шукаємо існуючий факт
existing = get_user_fact(
db,
fact_request.user_id,
fact_request.fact_key,
fact_request.team_id
)
if existing:
# Оновлюємо існуючий
update_data = fact_request.dict(exclude={"user_id", "fact_key", "team_id"})
for field, value in update_data.items():
if value is not None:
setattr(existing, field, value)
db.commit()
db.refresh(existing)
return existing, False
else:
# Створюємо новий
new_fact = UserFact(**fact_request.dict())
db.add(new_fact)
db.commit()
db.refresh(new_fact)
return new_fact, True
def delete_user_fact(
db: Session,
fact_id: str
) -> bool:
"""Видалити факт"""
db_fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not db_fact:
return False
db.delete(db_fact)
db.commit()
return True
def get_user_facts_by_token_gate(
db: Session,
user_id: str,
team_id: Optional[str] = None
) -> List[UserFact]:
"""Отримати токен-гейт факти користувача"""
query = db.query(UserFact).filter(
and_(
UserFact.user_id == user_id,
UserFact.token_gated == True
)
)
if team_id:
query = query.filter(
or_(
UserFact.team_id == team_id,
UserFact.team_id.is_(None)
)
)
return query.all()
# ========== Dialog Summary CRUD ==========
def create_dialog_summary(
db: Session,
summary: DialogSummaryCreate
) -> DialogSummary:
"""Створити підсумок діалогу"""
db_summary = DialogSummary(**summary.dict())
db.add(db_summary)
db.commit()
db.refresh(db_summary)
return db_summary
def get_dialog_summaries(
db: Session,
team_id: Optional[str] = None,
channel_id: Optional[str] = None,
agent_id: Optional[str] = None,
user_id: Optional[str] = None,
skip: int = 0,
limit: int = 50,
cursor: Optional[str] = None
) -> Tuple[List[DialogSummary], Optional[str]]:
"""
Отримати список підсумків діалогів з cursor pagination
Повертає (summaries, next_cursor)
"""
query = db.query(DialogSummary)
if team_id:
query = query.filter(DialogSummary.team_id == team_id)
if channel_id:
query = query.filter(DialogSummary.channel_id == channel_id)
if agent_id:
query = query.filter(DialogSummary.agent_id == agent_id)
if user_id:
query = query.filter(DialogSummary.user_id == user_id)
if cursor:
# Cursor-based pagination (використовуємо created_at)
try:
cursor_time = datetime.fromisoformat(cursor)
query = query.filter(DialogSummary.created_at < cursor_time)
except ValueError:
pass
query = query.order_by(desc(DialogSummary.created_at))
results = query.offset(skip).limit(limit + 1).all()
# Перевіряємо чи є наступна сторінка
next_cursor = None
if len(results) > limit:
results = results[:limit]
next_cursor = results[-1].created_at.isoformat()
return results, next_cursor
def get_dialog_summary(
db: Session,
summary_id: str
) -> Optional[DialogSummary]:
"""Отримати підсумок за ID"""
return db.query(DialogSummary).filter(DialogSummary.id == summary_id).first()
def delete_dialog_summary(
db: Session,
summary_id: str
) -> bool:
"""Видалити підсумок"""
db_summary = db.query(DialogSummary).filter(DialogSummary.id == summary_id).first()
if not db_summary:
return False
db.delete(db_summary)
db.commit()
return True
# ========== Agent Memory Event CRUD ==========
def create_agent_memory_event(
db: Session,
event: AgentMemoryEventCreate
) -> AgentMemoryEvent:
"""Створити подію пам'яті агента"""
db_event = AgentMemoryEvent(**event.dict())
db.add(db_event)
db.commit()
db.refresh(db_event)
return db_event
def get_agent_memory_events(
db: Session,
agent_id: str,
team_id: Optional[str] = None,
channel_id: Optional[str] = None,
scope: Optional[str] = None,
kind: Optional[str] = None,
skip: int = 0,
limit: int = 50,
cursor: Optional[str] = None
) -> Tuple[List[AgentMemoryEvent], Optional[str]]:
"""
Отримати список подій пам'яті агента з cursor pagination
"""
query = db.query(AgentMemoryEvent).filter(AgentMemoryEvent.agent_id == agent_id)
if team_id:
query = query.filter(AgentMemoryEvent.team_id == team_id)
if channel_id:
query = query.filter(AgentMemoryEvent.channel_id == channel_id)
if scope:
query = query.filter(AgentMemoryEvent.scope == scope)
if kind:
query = query.filter(AgentMemoryEvent.kind == kind)
if cursor:
try:
cursor_time = datetime.fromisoformat(cursor)
query = query.filter(AgentMemoryEvent.created_at < cursor_time)
except ValueError:
pass
query = query.order_by(desc(AgentMemoryEvent.created_at))
results = query.offset(skip).limit(limit + 1).all()
next_cursor = None
if len(results) > limit:
results = results[:limit]
next_cursor = results[-1].created_at.isoformat()
return results, next_cursor
def delete_agent_memory_event(
db: Session,
event_id: str
) -> bool:
"""Видалити подію пам'яті"""
db_event = db.query(AgentMemoryEvent).filter(AgentMemoryEvent.id == event_id).first()
if not db_event:
return False
db.delete(db_event)
db.commit()
return True

View File

@@ -0,0 +1,414 @@
"""
Memory Service - FastAPI додаток
Підтримує: user_facts, dialog_summaries, agent_memory_events
Інтеграція з token-gate через RBAC
"""
import os
from typing import Optional, List
from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, Query, Header
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base, UserFact, DialogSummary, AgentMemoryEvent
from schemas import (
UserFactCreate, UserFactUpdate, UserFactResponse, UserFactUpsertRequest, UserFactUpsertResponse,
DialogSummaryCreate, DialogSummaryResponse, DialogSummaryListResponse,
AgentMemoryEventCreate, AgentMemoryEventResponse, AgentMemoryEventListResponse,
TokenGateCheck, TokenGateCheckResponse
)
from crud import (
get_user_fact, get_user_facts, create_user_fact, update_user_fact,
upsert_user_fact, delete_user_fact, get_user_facts_by_token_gate,
create_dialog_summary, get_dialog_summaries, get_dialog_summary, delete_dialog_summary,
create_agent_memory_event, get_agent_memory_events, delete_agent_memory_event
)
# ========== Configuration ==========
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://user:password@localhost:5432/microdao"
)
# Створюємо engine та sessionmaker
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Створюємо таблиці (для dev, в продакшені використовуйте міграції)
Base.metadata.create_all(bind=engine)
# ========== FastAPI App ==========
app = FastAPI(
title="Memory Service",
description="Сервіс пам'яті для MicroDAO: user_facts, dialog_summaries, agent_memory_events",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # В продакшені обмежте це
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ========== Dependencies ==========
def get_db():
"""Dependency для отримання DB сесії"""
db = SessionLocal()
try:
yield db
finally:
db.close()
async def verify_token(authorization: Optional[str] = Header(None)) -> Optional[str]:
"""
Перевірка JWT токену (заглушка)
В продакшені інтегруйте з вашою системою авторизації
"""
if not authorization:
raise HTTPException(status_code=401, detail="Missing authorization header")
# Заглушка: в реальності перевіряйте JWT
# token = authorization.replace("Bearer ", "")
# user_id = verify_jwt_token(token)
# return user_id
# Для тестування повертаємо user_id з заголовка
return "u_test" # TODO: реалізувати реальну перевірку
async def check_token_gate(
user_id: str,
token_requirements: dict,
db: Session
) -> TokenGateCheckResponse:
"""
Перевірка токен-гейту (інтеграція з RBAC/Wallet Service)
Заглушка - в продакшені викликайте ваш PDP/Wallet Service
"""
# TODO: Інтегрувати з:
# - PDP Service для перевірки capabilities
# - Wallet Service для перевірки балансів
# - RBAC для перевірки ролей
# Приклад логіки:
# if "token" in token_requirements:
# token_type = token_requirements["token"]
# min_balance = token_requirements.get("min_balance", 0)
# balance = await wallet_service.get_balance(user_id, token_type)
# if balance < min_balance:
# return TokenGateCheckResponse(
# allowed=False,
# reason=f"Insufficient {token_type} balance",
# missing_requirements={"token": token_type, "required": min_balance, "current": balance}
# )
# Заглушка: завжди дозволяємо
return TokenGateCheckResponse(allowed=True)
# ========== Health Check ==========
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "ok", "service": "memory-service"}
# ========== User Facts Endpoints ==========
@app.post("/facts/upsert", response_model=UserFactUpsertResponse)
async def upsert_fact(
fact_request: UserFactUpsertRequest,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""
Створити або оновити факт користувача (upsert)
Це основний ендпоінт для контрольованої довгострокової пам'яті.
Підтримує токен-гейт інтеграцію.
"""
# Перевірка токен-гейту якщо потрібно
if fact_request.token_gated and fact_request.token_requirements:
gate_check = await check_token_gate(
fact_request.user_id,
fact_request.token_requirements,
db
)
if not gate_check.allowed:
raise HTTPException(
status_code=403,
detail=f"Token gate check failed: {gate_check.reason}"
)
# Перевірка прав доступу (користувач може змінювати тільки свої факти)
if fact_request.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot modify other user's facts")
fact, created = upsert_user_fact(db, fact_request)
return UserFactUpsertResponse(
fact=UserFactResponse.model_validate(fact),
created=created
)
@app.get("/facts", response_model=List[UserFactResponse])
async def list_facts(
team_id: Optional[str] = Query(None),
fact_keys: Optional[str] = Query(None, description="Comma-separated list of fact keys"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати список фактів користувача"""
fact_keys_list = None
if fact_keys:
fact_keys_list = [k.strip() for k in fact_keys.split(",")]
facts = get_user_facts(db, user_id, team_id, fact_keys_list, skip, limit)
return [UserFactResponse.model_validate(f) for f in facts]
@app.get("/facts/{fact_key}", response_model=UserFactResponse)
async def get_fact(
fact_key: str,
team_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати конкретний факт за ключем"""
fact = get_user_fact(db, user_id, fact_key, team_id)
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
return UserFactResponse.model_validate(fact)
@app.post("/facts", response_model=UserFactResponse)
async def create_fact(
fact: UserFactCreate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Створити новий факт"""
if fact.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot create fact for other user")
db_fact = create_user_fact(db, fact)
return UserFactResponse.model_validate(db_fact)
@app.patch("/facts/{fact_id}", response_model=UserFactResponse)
async def update_fact(
fact_id: str,
fact_update: UserFactUpdate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Оновити факт"""
fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
if fact.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot modify other user's fact")
updated_fact = update_user_fact(db, fact_id, fact_update)
if not updated_fact:
raise HTTPException(status_code=404, detail="Fact not found")
return UserFactResponse.model_validate(updated_fact)
@app.delete("/facts/{fact_id}")
async def delete_fact(
fact_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Видалити факт"""
fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
if fact.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot delete other user's fact")
success = delete_user_fact(db, fact_id)
if not success:
raise HTTPException(status_code=404, detail="Fact not found")
return {"success": True}
@app.get("/facts/token-gated", response_model=List[UserFactResponse])
async def list_token_gated_facts(
team_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати токен-гейт факти користувача"""
facts = get_user_facts_by_token_gate(db, user_id, team_id)
return [UserFactResponse.model_validate(f) for f in facts]
# ========== Dialog Summary Endpoints ==========
@app.post("/summaries", response_model=DialogSummaryResponse)
async def create_summary(
summary: DialogSummaryCreate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""
Створити підсумок діалогу
Використовується для масштабування без переповнення контексту.
Агрегує інформацію про сесії/діалоги.
"""
db_summary = create_dialog_summary(db, summary)
return DialogSummaryResponse.model_validate(db_summary)
@app.get("/summaries", response_model=DialogSummaryListResponse)
async def list_summaries(
team_id: Optional[str] = Query(None),
channel_id: Optional[str] = Query(None),
agent_id: Optional[str] = Query(None),
user_id_param: Optional[str] = Query(None, alias="user_id"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
cursor: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати список підсумків діалогів"""
summaries, next_cursor = get_dialog_summaries(
db, team_id, channel_id, agent_id, user_id_param, skip, limit, cursor
)
return DialogSummaryListResponse(
items=[DialogSummaryResponse.model_validate(s) for s in summaries],
total=len(summaries),
cursor=next_cursor
)
@app.get("/summaries/{summary_id}", response_model=DialogSummaryResponse)
async def get_summary(
summary_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати підсумок за ID"""
summary = get_dialog_summary(db, summary_id)
if not summary:
raise HTTPException(status_code=404, detail="Summary not found")
return DialogSummaryResponse.model_validate(summary)
@app.delete("/summaries/{summary_id}")
async def delete_summary(
summary_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Видалити підсумок"""
success = delete_dialog_summary(db, summary_id)
if not success:
raise HTTPException(status_code=404, detail="Summary not found")
return {"success": True}
# ========== Agent Memory Event Endpoints ==========
@app.post("/agents/{agent_id}/memory", response_model=AgentMemoryEventResponse)
async def create_memory_event(
agent_id: str,
event: AgentMemoryEventCreate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Створити подію пам'яті агента"""
# Перевірка що agent_id збігається
if event.agent_id != agent_id:
raise HTTPException(status_code=400, detail="agent_id mismatch")
db_event = create_agent_memory_event(db, event)
return AgentMemoryEventResponse.model_validate(db_event)
@app.get("/agents/{agent_id}/memory", response_model=AgentMemoryEventListResponse)
async def list_memory_events(
agent_id: str,
team_id: Optional[str] = Query(None),
channel_id: Optional[str] = Query(None),
scope: Optional[str] = Query(None, description="short_term | mid_term | long_term"),
kind: Optional[str] = Query(None, description="message | fact | summary | note"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
cursor: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати список подій пам'яті агента"""
events, next_cursor = get_agent_memory_events(
db, agent_id, team_id, channel_id, scope, kind, skip, limit, cursor
)
return AgentMemoryEventListResponse(
items=[AgentMemoryEventResponse.model_validate(e) for e in events],
total=len(events),
cursor=next_cursor
)
@app.delete("/agents/{agent_id}/memory/{event_id}")
async def delete_memory_event(
agent_id: str,
event_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Видалити подію пам'яті"""
success = delete_agent_memory_event(db, event_id)
if not success:
raise HTTPException(status_code=404, detail="Memory event not found")
return {"success": True}
# ========== Token Gate Integration Endpoint ==========
@app.post("/token-gate/check", response_model=TokenGateCheckResponse)
async def check_token_gate_endpoint(
check: TokenGateCheck,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""
Перевірка токен-гейту для факту
Інтеграція з RBAC/Wallet Service для перевірки доступу
"""
if check.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot check token gate for other user")
return await check_token_gate(user_id, check.token_requirements, db)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,132 @@
-- Міграція для Memory Service
-- Створює таблиці: user_facts, dialog_summaries, agent_memory_events, agent_memory_facts_vector
-- Розширення для UUID та vector
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "vector";
-- ========== User Facts ==========
CREATE TABLE IF NOT EXISTS user_facts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
team_id TEXT REFERENCES teams(id) ON DELETE CASCADE,
-- Ключ факту (наприклад: "language", "is_donor", "is_validator", "top_contributor")
fact_key TEXT NOT NULL,
-- Значення факту
fact_value TEXT,
fact_value_json JSONB,
-- Метадані
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
-- Токен-гейт
token_gated BOOLEAN NOT NULL DEFAULT false,
token_requirements JSONB,
-- Таймстемпи
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_user_facts_user_id ON user_facts(user_id);
CREATE INDEX IF NOT EXISTS idx_user_facts_team_id ON user_facts(team_id);
CREATE INDEX IF NOT EXISTS idx_user_facts_user_key ON user_facts(user_id, fact_key);
CREATE INDEX IF NOT EXISTS idx_user_facts_token_gated ON user_facts(token_gated);
CREATE INDEX IF NOT EXISTS idx_user_facts_expires_at ON user_facts(expires_at) WHERE expires_at IS NOT NULL;
-- ========== Dialog Summaries ==========
CREATE TABLE IF NOT EXISTS dialog_summaries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Контекст діалогу
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
channel_id TEXT REFERENCES channels(id) ON DELETE CASCADE,
agent_id TEXT REFERENCES agents(id) ON DELETE CASCADE,
user_id TEXT REFERENCES users(id) ON DELETE CASCADE,
-- Період
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
-- Підсумок
summary_text TEXT NOT NULL,
summary_json JSONB,
-- Статистика
message_count INTEGER NOT NULL DEFAULT 0,
participant_count INTEGER NOT NULL DEFAULT 0,
-- Ключові теми
topics JSONB,
-- Метадані
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dialog_summaries_team_id ON dialog_summaries(team_id);
CREATE INDEX IF NOT EXISTS idx_dialog_summaries_channel_id ON dialog_summaries(channel_id);
CREATE INDEX IF NOT EXISTS idx_dialog_summaries_agent_id ON dialog_summaries(agent_id);
CREATE INDEX IF NOT EXISTS idx_dialog_summaries_user_id ON dialog_summaries(user_id);
CREATE INDEX IF NOT EXISTS idx_dialog_summaries_team_period ON dialog_summaries(team_id, period_start, period_end);
CREATE INDEX IF NOT EXISTS idx_dialog_summaries_created_at ON dialog_summaries(created_at);
-- ========== Agent Memory Events ==========
CREATE TABLE IF NOT EXISTS agent_memory_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
channel_id TEXT REFERENCES channels(id) ON DELETE CASCADE,
user_id TEXT REFERENCES users(id) ON DELETE CASCADE,
-- Scope: short_term, mid_term, long_term
scope TEXT NOT NULL CHECK (scope IN ('short_term', 'mid_term', 'long_term')),
-- Kind: message, fact, summary, note
kind TEXT NOT NULL CHECK (kind IN ('message', 'fact', 'summary', 'note')),
-- Тіло події
body_text TEXT,
body_json JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_agent_id ON agent_memory_events(agent_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_team_id ON agent_memory_events(team_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_channel_id ON agent_memory_events(channel_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_user_id ON agent_memory_events(user_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_agent_team_scope ON agent_memory_events(agent_id, team_id, scope);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_channel ON agent_memory_events(agent_id, channel_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_events_created_at ON agent_memory_events(created_at);
-- ========== Agent Memory Facts Vector (для RAG) ==========
CREATE TABLE IF NOT EXISTS agent_memory_facts_vector (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
team_id TEXT NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
fact_text TEXT NOT NULL,
embedding vector(1536), -- OpenAI ada-002 embedding size
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_agent_memory_facts_vector_agent_id ON agent_memory_facts_vector(agent_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_facts_vector_team_id ON agent_memory_facts_vector(team_id);
CREATE INDEX IF NOT EXISTS idx_agent_memory_facts_vector_agent_team ON agent_memory_facts_vector(agent_id, team_id);
-- Індекс для векторного пошуку (використовується pgvector)
-- Примітка: Створіть цей індекс після того, як у вас буде достатньо даних
-- CREATE INDEX IF NOT EXISTS idx_agent_memory_facts_vector_embedding
-- ON agent_memory_facts_vector USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 100);

View File

@@ -0,0 +1,159 @@
"""
SQLAlchemy моделі для Memory Service
Підтримує: user_facts, dialog_summaries, agent_memory_events
"""
from sqlalchemy import (
Column, String, Text, JSON, TIMESTAMP, ForeignKey,
CheckConstraint, Index, Boolean, Integer
)
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from pgvector.sqlalchemy import Vector
Base = declarative_base()
class UserFact(Base):
"""
Довгострокові факти про користувача
Використовується для контрольованої довгострокової пам'яті
(мови, вподобання, тип користувача, токен-статуси)
"""
__tablename__ = "user_facts"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True)
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=True, index=True)
# Ключ факту (наприклад: "language", "is_donor", "is_validator", "top_contributor")
fact_key = Column(String, nullable=False, index=True)
# Значення факту (може бути текст, число, boolean, JSON)
fact_value = Column(Text, nullable=True)
fact_value_json = Column(JSONB, nullable=True)
# Метадані: джерело, впевненість, термін дії
metadata = Column(JSONB, nullable=False, server_default="{}")
# Токен-гейт: чи залежить факт від токенів/активності
token_gated = Column(Boolean, nullable=False, server_default="false")
token_requirements = Column(JSONB, nullable=True) # {"token": "DAAR", "min_balance": 1}
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(TIMESTAMP(timezone=True), nullable=True, onupdate=func.now())
expires_at = Column(TIMESTAMP(timezone=True), nullable=True) # Для тимчасових фактів
__table_args__ = (
Index("idx_user_facts_user_key", "user_id", "fact_key"),
Index("idx_user_facts_team", "team_id"),
Index("idx_user_facts_token_gated", "token_gated"),
)
class DialogSummary(Base):
"""
Підсумки діалогів для масштабування без переповнення контексту
Зберігає агреговану інформацію про сесії/діалоги
"""
__tablename__ = "dialog_summaries"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
# Контекст діалогу
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=False, index=True)
channel_id = Column(String, ForeignKey("channels.id", ondelete="CASCADE"), nullable=True, index=True)
agent_id = Column(String, ForeignKey("agents.id", ondelete="CASCADE"), nullable=True, index=True)
user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True)
# Період, який охоплює підсумок
period_start = Column(TIMESTAMP(timezone=True), nullable=False)
period_end = Column(TIMESTAMP(timezone=True), nullable=False)
# Підсумок
summary_text = Column(Text, nullable=False)
summary_json = Column(JSONB, nullable=True) # Структуровані дані
# Статистика
message_count = Column(Integer, nullable=False, server_default="0")
participant_count = Column(Integer, nullable=False, server_default="0")
# Ключові теми/теги
topics = Column(JSONB, nullable=True) # ["project-planning", "bug-fix", ...]
# Метадані
metadata = Column(JSONB, nullable=False, server_default="{}")
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("idx_dialog_summaries_team_period", "team_id", "period_start", "period_end"),
Index("idx_dialog_summaries_channel", "channel_id"),
Index("idx_dialog_summaries_agent", "agent_id"),
)
class AgentMemoryEvent(Base):
"""
Події пам'яті агентів (short-term, mid-term, long-term)
Базується на документації: docs/cursor/13_agent_memory_system.md
"""
__tablename__ = "agent_memory_events"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
agent_id = Column(String, ForeignKey("agents.id", ondelete="CASCADE"), nullable=False, index=True)
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=False, index=True)
channel_id = Column(String, ForeignKey("channels.id", ondelete="CASCADE"), nullable=True, index=True)
user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True)
# Scope: short_term, mid_term, long_term
scope = Column(
String,
nullable=False,
CheckConstraint("scope IN ('short_term', 'mid_term', 'long_term')")
)
# Kind: message, fact, summary, note
kind = Column(
String,
nullable=False,
CheckConstraint("kind IN ('message', 'fact', 'summary', 'note')")
)
# Тіло події
body_text = Column(Text, nullable=True)
body_json = Column(JSONB, nullable=True)
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("idx_agent_memory_events_agent_team_scope", "agent_id", "team_id", "scope"),
Index("idx_agent_memory_events_channel", "agent_id", "channel_id"),
Index("idx_agent_memory_events_created_at", "created_at"),
)
class AgentMemoryFactsVector(Base):
"""
Векторні представлення фактів для RAG (Retrieval-Augmented Generation)
"""
__tablename__ = "agent_memory_facts_vector"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
agent_id = Column(String, ForeignKey("agents.id", ondelete="CASCADE"), nullable=False, index=True)
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=False, index=True)
fact_text = Column(Text, nullable=False)
embedding = Column(Vector(1536), nullable=True) # OpenAI ada-002 embedding size
metadata = Column(JSONB, nullable=False, server_default="{}")
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("idx_agent_memory_facts_vector_agent_team", "agent_id", "team_id"),
)

View File

@@ -0,0 +1,8 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
pgvector==0.2.4
pydantic==2.5.0
python-dotenv==1.0.0

View File

@@ -0,0 +1,169 @@
"""
Pydantic схеми для Memory Service API
"""
from datetime import datetime
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field, field_validator, ConfigDict
from uuid import UUID
# ========== User Facts Schemas ==========
class UserFactBase(BaseModel):
"""Базова схема для user fact"""
fact_key: str = Field(..., description="Ключ факту (наприклад: 'language', 'is_donor')")
fact_value: Optional[str] = Field(None, description="Текстове значення")
fact_value_json: Optional[Dict[str, Any]] = Field(None, description="JSON значення")
team_id: Optional[str] = Field(None, description="ID команди (якщо факт командно-специфічний)")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Метадані")
token_gated: bool = Field(False, description="Чи залежить факт від токенів")
token_requirements: Optional[Dict[str, Any]] = Field(None, description="Вимоги до токенів")
expires_at: Optional[datetime] = Field(None, description="Термін дії факту")
class UserFactCreate(UserFactBase):
"""Схема для створення/оновлення факту"""
user_id: str = Field(..., description="ID користувача")
class UserFactUpdate(BaseModel):
"""Схема для часткового оновлення факту"""
fact_value: Optional[str] = None
fact_value_json: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
token_gated: Optional[bool] = None
token_requirements: Optional[Dict[str, Any]] = None
expires_at: Optional[datetime] = None
class UserFactResponse(UserFactBase):
"""Схема відповіді для user fact"""
id: str
user_id: str
created_at: datetime
updated_at: Optional[datetime]
model_config = ConfigDict(from_attributes=True)
class UserFactUpsertRequest(BaseModel):
"""Схема для upsert операції (створення або оновлення)"""
user_id: str
fact_key: str
fact_value: Optional[str] = None
fact_value_json: Optional[Dict[str, Any]] = None
team_id: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
token_gated: bool = False
token_requirements: Optional[Dict[str, Any]] = None
expires_at: Optional[datetime] = None
class UserFactUpsertResponse(BaseModel):
"""Відповідь на upsert"""
fact: UserFactResponse
created: bool = Field(..., description="Чи був створений новий факт")
# ========== Dialog Summary Schemas ==========
class DialogSummaryBase(BaseModel):
"""Базова схема для dialog summary"""
team_id: str
channel_id: Optional[str] = None
agent_id: Optional[str] = None
user_id: Optional[str] = None
period_start: datetime
period_end: datetime
summary_text: str
summary_json: Optional[Dict[str, Any]] = None
message_count: int = 0
participant_count: int = 0
topics: Optional[List[str]] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class DialogSummaryCreate(DialogSummaryBase):
"""Схема для створення summary"""
pass
class DialogSummaryResponse(DialogSummaryBase):
"""Схема відповіді для summary"""
id: str
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class DialogSummaryListResponse(BaseModel):
"""Схема для списку summaries"""
items: List[DialogSummaryResponse]
total: int
cursor: Optional[str] = None
# ========== Agent Memory Event Schemas ==========
class AgentMemoryEventBase(BaseModel):
"""Базова схема для memory event"""
agent_id: str
team_id: str
channel_id: Optional[str] = None
user_id: Optional[str] = None
scope: str = Field(..., description="short_term | mid_term | long_term")
kind: str = Field(..., description="message | fact | summary | note")
body_text: Optional[str] = None
body_json: Optional[Dict[str, Any]] = None
@field_validator("scope")
@classmethod
def validate_scope(cls, v):
if v not in ["short_term", "mid_term", "long_term"]:
raise ValueError("scope must be one of: short_term, mid_term, long_term")
return v
@field_validator("kind")
@classmethod
def validate_kind(cls, v):
if v not in ["message", "fact", "summary", "note"]:
raise ValueError("kind must be one of: message, fact, summary, note")
return v
class AgentMemoryEventCreate(AgentMemoryEventBase):
"""Схема для створення memory event"""
pass
class AgentMemoryEventResponse(AgentMemoryEventBase):
"""Схема відповіді для memory event"""
id: str
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class AgentMemoryEventListResponse(BaseModel):
"""Схема для списку memory events"""
items: List[AgentMemoryEventResponse]
total: int
cursor: Optional[str] = None
# ========== Token Gate Integration ==========
class TokenGateCheck(BaseModel):
"""Перевірка токен-гейту для факту"""
user_id: str
fact_key: str
token_requirements: Dict[str, Any]
class TokenGateCheckResponse(BaseModel):
"""Відповідь на перевірку токен-гейту"""
allowed: bool
reason: Optional[str] = None
missing_requirements: Optional[Dict[str, Any]] = None