refactor: reorganize memory-service into app/ directory structure

- Move models.py, schemas.py, crud.py, main.py to app/
- Update imports to use app.* prefix
- Update README with new structure
- Fix uvicorn run command for new structure
This commit is contained in:
Apple
2025-11-15 10:14:26 -08:00
parent 6d5d83c347
commit 7aa0745877
5 changed files with 9 additions and 9 deletions

View File

@@ -0,0 +1,329 @@
"""
CRUD операції для Memory Service
"""
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc
from datetime import datetime
from app.models import UserFact, DialogSummary, AgentMemoryEvent, AgentMemoryFactsVector
from app.schemas import (
UserFactCreate, UserFactUpdate, UserFactUpsertRequest,
DialogSummaryCreate, AgentMemoryEventCreate
)
# ========== User Facts CRUD ==========
def get_user_fact(
db: Session,
user_id: str,
fact_key: str,
team_id: Optional[str] = None
) -> Optional[UserFact]:
"""Отримати факт користувача"""
query = db.query(UserFact).filter(
and_(
UserFact.user_id == user_id,
UserFact.fact_key == fact_key
)
)
if team_id:
query = query.filter(UserFact.team_id == team_id)
else:
query = query.filter(UserFact.team_id.is_(None))
return query.first()
def get_user_facts(
db: Session,
user_id: str,
team_id: Optional[str] = None,
fact_keys: Optional[List[str]] = None,
skip: int = 0,
limit: int = 100
) -> List[UserFact]:
"""Отримати список фактів користувача"""
query = db.query(UserFact).filter(UserFact.user_id == user_id)
if team_id:
query = query.filter(
or_(
UserFact.team_id == team_id,
UserFact.team_id.is_(None) # Глобальні факти
)
)
if fact_keys:
query = query.filter(UserFact.fact_key.in_(fact_keys))
# Фільтр за терміном дії
query = query.filter(
or_(
UserFact.expires_at.is_(None),
UserFact.expires_at > datetime.utcnow()
)
)
return query.offset(skip).limit(limit).all()
def create_user_fact(
db: Session,
fact: UserFactCreate
) -> UserFact:
"""Створити новий факт"""
db_fact = UserFact(**fact.dict())
db.add(db_fact)
db.commit()
db.refresh(db_fact)
return db_fact
def update_user_fact(
db: Session,
fact_id: str,
fact_update: UserFactUpdate
) -> Optional[UserFact]:
"""Оновити факт"""
db_fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not db_fact:
return None
update_data = fact_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_fact, field, value)
db.commit()
db.refresh(db_fact)
return db_fact
def upsert_user_fact(
db: Session,
fact_request: UserFactUpsertRequest
) -> Tuple[UserFact, bool]:
"""
Створити або оновити факт (upsert)
Повертає (fact, created) де created = True якщо створено новий
"""
# Шукаємо існуючий факт
existing = get_user_fact(
db,
fact_request.user_id,
fact_request.fact_key,
fact_request.team_id
)
if existing:
# Оновлюємо існуючий
update_data = fact_request.dict(exclude={"user_id", "fact_key", "team_id"})
for field, value in update_data.items():
if value is not None:
setattr(existing, field, value)
db.commit()
db.refresh(existing)
return existing, False
else:
# Створюємо новий
new_fact = UserFact(**fact_request.dict())
db.add(new_fact)
db.commit()
db.refresh(new_fact)
return new_fact, True
def delete_user_fact(
db: Session,
fact_id: str
) -> bool:
"""Видалити факт"""
db_fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not db_fact:
return False
db.delete(db_fact)
db.commit()
return True
def get_user_facts_by_token_gate(
db: Session,
user_id: str,
team_id: Optional[str] = None
) -> List[UserFact]:
"""Отримати токен-гейт факти користувача"""
query = db.query(UserFact).filter(
and_(
UserFact.user_id == user_id,
UserFact.token_gated == True
)
)
if team_id:
query = query.filter(
or_(
UserFact.team_id == team_id,
UserFact.team_id.is_(None)
)
)
return query.all()
# ========== Dialog Summary CRUD ==========
def create_dialog_summary(
db: Session,
summary: DialogSummaryCreate
) -> DialogSummary:
"""Створити підсумок діалогу"""
db_summary = DialogSummary(**summary.dict())
db.add(db_summary)
db.commit()
db.refresh(db_summary)
return db_summary
def get_dialog_summaries(
db: Session,
team_id: Optional[str] = None,
channel_id: Optional[str] = None,
agent_id: Optional[str] = None,
user_id: Optional[str] = None,
skip: int = 0,
limit: int = 50,
cursor: Optional[str] = None
) -> Tuple[List[DialogSummary], Optional[str]]:
"""
Отримати список підсумків діалогів з cursor pagination
Повертає (summaries, next_cursor)
"""
query = db.query(DialogSummary)
if team_id:
query = query.filter(DialogSummary.team_id == team_id)
if channel_id:
query = query.filter(DialogSummary.channel_id == channel_id)
if agent_id:
query = query.filter(DialogSummary.agent_id == agent_id)
if user_id:
query = query.filter(DialogSummary.user_id == user_id)
if cursor:
# Cursor-based pagination (використовуємо created_at)
try:
cursor_time = datetime.fromisoformat(cursor)
query = query.filter(DialogSummary.created_at < cursor_time)
except ValueError:
pass
query = query.order_by(desc(DialogSummary.created_at))
results = query.offset(skip).limit(limit + 1).all()
# Перевіряємо чи є наступна сторінка
next_cursor = None
if len(results) > limit:
results = results[:limit]
next_cursor = results[-1].created_at.isoformat()
return results, next_cursor
def get_dialog_summary(
db: Session,
summary_id: str
) -> Optional[DialogSummary]:
"""Отримати підсумок за ID"""
return db.query(DialogSummary).filter(DialogSummary.id == summary_id).first()
def delete_dialog_summary(
db: Session,
summary_id: str
) -> bool:
"""Видалити підсумок"""
db_summary = db.query(DialogSummary).filter(DialogSummary.id == summary_id).first()
if not db_summary:
return False
db.delete(db_summary)
db.commit()
return True
# ========== Agent Memory Event CRUD ==========
def create_agent_memory_event(
db: Session,
event: AgentMemoryEventCreate
) -> AgentMemoryEvent:
"""Створити подію пам'яті агента"""
db_event = AgentMemoryEvent(**event.dict())
db.add(db_event)
db.commit()
db.refresh(db_event)
return db_event
def get_agent_memory_events(
db: Session,
agent_id: str,
team_id: Optional[str] = None,
channel_id: Optional[str] = None,
scope: Optional[str] = None,
kind: Optional[str] = None,
skip: int = 0,
limit: int = 50,
cursor: Optional[str] = None
) -> Tuple[List[AgentMemoryEvent], Optional[str]]:
"""
Отримати список подій пам'яті агента з cursor pagination
"""
query = db.query(AgentMemoryEvent).filter(AgentMemoryEvent.agent_id == agent_id)
if team_id:
query = query.filter(AgentMemoryEvent.team_id == team_id)
if channel_id:
query = query.filter(AgentMemoryEvent.channel_id == channel_id)
if scope:
query = query.filter(AgentMemoryEvent.scope == scope)
if kind:
query = query.filter(AgentMemoryEvent.kind == kind)
if cursor:
try:
cursor_time = datetime.fromisoformat(cursor)
query = query.filter(AgentMemoryEvent.created_at < cursor_time)
except ValueError:
pass
query = query.order_by(desc(AgentMemoryEvent.created_at))
results = query.offset(skip).limit(limit + 1).all()
next_cursor = None
if len(results) > limit:
results = results[:limit]
next_cursor = results[-1].created_at.isoformat()
return results, next_cursor
def delete_agent_memory_event(
db: Session,
event_id: str
) -> bool:
"""Видалити подію пам'яті"""
db_event = db.query(AgentMemoryEvent).filter(AgentMemoryEvent.id == event_id).first()
if not db_event:
return False
db.delete(db_event)
db.commit()
return True

View File

@@ -0,0 +1,414 @@
"""
Memory Service - FastAPI додаток
Підтримує: user_facts, dialog_summaries, agent_memory_events
Інтеграція з token-gate через RBAC
"""
import os
from typing import Optional, List
from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, Query, Header
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.models import Base, UserFact, DialogSummary, AgentMemoryEvent
from app.schemas import (
UserFactCreate, UserFactUpdate, UserFactResponse, UserFactUpsertRequest, UserFactUpsertResponse,
DialogSummaryCreate, DialogSummaryResponse, DialogSummaryListResponse,
AgentMemoryEventCreate, AgentMemoryEventResponse, AgentMemoryEventListResponse,
TokenGateCheck, TokenGateCheckResponse
)
from app.crud import (
get_user_fact, get_user_facts, create_user_fact, update_user_fact,
upsert_user_fact, delete_user_fact, get_user_facts_by_token_gate,
create_dialog_summary, get_dialog_summaries, get_dialog_summary, delete_dialog_summary,
create_agent_memory_event, get_agent_memory_events, delete_agent_memory_event
)
# ========== Configuration ==========
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://user:password@localhost:5432/microdao"
)
# Створюємо engine та sessionmaker
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Створюємо таблиці (для dev, в продакшені використовуйте міграції)
Base.metadata.create_all(bind=engine)
# ========== FastAPI App ==========
app = FastAPI(
title="Memory Service",
description="Сервіс пам'яті для MicroDAO: user_facts, dialog_summaries, agent_memory_events",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # В продакшені обмежте це
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ========== Dependencies ==========
def get_db():
"""Dependency для отримання DB сесії"""
db = SessionLocal()
try:
yield db
finally:
db.close()
async def verify_token(authorization: Optional[str] = Header(None)) -> Optional[str]:
"""
Перевірка JWT токену (заглушка)
В продакшені інтегруйте з вашою системою авторизації
"""
if not authorization:
raise HTTPException(status_code=401, detail="Missing authorization header")
# Заглушка: в реальності перевіряйте JWT
# token = authorization.replace("Bearer ", "")
# user_id = verify_jwt_token(token)
# return user_id
# Для тестування повертаємо user_id з заголовка
return "u_test" # TODO: реалізувати реальну перевірку
async def check_token_gate(
user_id: str,
token_requirements: dict,
db: Session
) -> TokenGateCheckResponse:
"""
Перевірка токен-гейту (інтеграція з RBAC/Wallet Service)
Заглушка - в продакшені викликайте ваш PDP/Wallet Service
"""
# TODO: Інтегрувати з:
# - PDP Service для перевірки capabilities
# - Wallet Service для перевірки балансів
# - RBAC для перевірки ролей
# Приклад логіки:
# if "token" in token_requirements:
# token_type = token_requirements["token"]
# min_balance = token_requirements.get("min_balance", 0)
# balance = await wallet_service.get_balance(user_id, token_type)
# if balance < min_balance:
# return TokenGateCheckResponse(
# allowed=False,
# reason=f"Insufficient {token_type} balance",
# missing_requirements={"token": token_type, "required": min_balance, "current": balance}
# )
# Заглушка: завжди дозволяємо
return TokenGateCheckResponse(allowed=True)
# ========== Health Check ==========
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "ok", "service": "memory-service"}
# ========== User Facts Endpoints ==========
@app.post("/facts/upsert", response_model=UserFactUpsertResponse)
async def upsert_fact(
fact_request: UserFactUpsertRequest,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""
Створити або оновити факт користувача (upsert)
Це основний ендпоінт для контрольованої довгострокової пам'яті.
Підтримує токен-гейт інтеграцію.
"""
# Перевірка токен-гейту якщо потрібно
if fact_request.token_gated and fact_request.token_requirements:
gate_check = await check_token_gate(
fact_request.user_id,
fact_request.token_requirements,
db
)
if not gate_check.allowed:
raise HTTPException(
status_code=403,
detail=f"Token gate check failed: {gate_check.reason}"
)
# Перевірка прав доступу (користувач може змінювати тільки свої факти)
if fact_request.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot modify other user's facts")
fact, created = upsert_user_fact(db, fact_request)
return UserFactUpsertResponse(
fact=UserFactResponse.model_validate(fact),
created=created
)
@app.get("/facts", response_model=List[UserFactResponse])
async def list_facts(
team_id: Optional[str] = Query(None),
fact_keys: Optional[str] = Query(None, description="Comma-separated list of fact keys"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати список фактів користувача"""
fact_keys_list = None
if fact_keys:
fact_keys_list = [k.strip() for k in fact_keys.split(",")]
facts = get_user_facts(db, user_id, team_id, fact_keys_list, skip, limit)
return [UserFactResponse.model_validate(f) for f in facts]
@app.get("/facts/{fact_key}", response_model=UserFactResponse)
async def get_fact(
fact_key: str,
team_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати конкретний факт за ключем"""
fact = get_user_fact(db, user_id, fact_key, team_id)
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
return UserFactResponse.model_validate(fact)
@app.post("/facts", response_model=UserFactResponse)
async def create_fact(
fact: UserFactCreate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Створити новий факт"""
if fact.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot create fact for other user")
db_fact = create_user_fact(db, fact)
return UserFactResponse.model_validate(db_fact)
@app.patch("/facts/{fact_id}", response_model=UserFactResponse)
async def update_fact(
fact_id: str,
fact_update: UserFactUpdate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Оновити факт"""
fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
if fact.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot modify other user's fact")
updated_fact = update_user_fact(db, fact_id, fact_update)
if not updated_fact:
raise HTTPException(status_code=404, detail="Fact not found")
return UserFactResponse.model_validate(updated_fact)
@app.delete("/facts/{fact_id}")
async def delete_fact(
fact_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Видалити факт"""
fact = db.query(UserFact).filter(UserFact.id == fact_id).first()
if not fact:
raise HTTPException(status_code=404, detail="Fact not found")
if fact.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot delete other user's fact")
success = delete_user_fact(db, fact_id)
if not success:
raise HTTPException(status_code=404, detail="Fact not found")
return {"success": True}
@app.get("/facts/token-gated", response_model=List[UserFactResponse])
async def list_token_gated_facts(
team_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати токен-гейт факти користувача"""
facts = get_user_facts_by_token_gate(db, user_id, team_id)
return [UserFactResponse.model_validate(f) for f in facts]
# ========== Dialog Summary Endpoints ==========
@app.post("/summaries", response_model=DialogSummaryResponse)
async def create_summary(
summary: DialogSummaryCreate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""
Створити підсумок діалогу
Використовується для масштабування без переповнення контексту.
Агрегує інформацію про сесії/діалоги.
"""
db_summary = create_dialog_summary(db, summary)
return DialogSummaryResponse.model_validate(db_summary)
@app.get("/summaries", response_model=DialogSummaryListResponse)
async def list_summaries(
team_id: Optional[str] = Query(None),
channel_id: Optional[str] = Query(None),
agent_id: Optional[str] = Query(None),
user_id_param: Optional[str] = Query(None, alias="user_id"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
cursor: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати список підсумків діалогів"""
summaries, next_cursor = get_dialog_summaries(
db, team_id, channel_id, agent_id, user_id_param, skip, limit, cursor
)
return DialogSummaryListResponse(
items=[DialogSummaryResponse.model_validate(s) for s in summaries],
total=len(summaries),
cursor=next_cursor
)
@app.get("/summaries/{summary_id}", response_model=DialogSummaryResponse)
async def get_summary(
summary_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати підсумок за ID"""
summary = get_dialog_summary(db, summary_id)
if not summary:
raise HTTPException(status_code=404, detail="Summary not found")
return DialogSummaryResponse.model_validate(summary)
@app.delete("/summaries/{summary_id}")
async def delete_summary(
summary_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Видалити підсумок"""
success = delete_dialog_summary(db, summary_id)
if not success:
raise HTTPException(status_code=404, detail="Summary not found")
return {"success": True}
# ========== Agent Memory Event Endpoints ==========
@app.post("/agents/{agent_id}/memory", response_model=AgentMemoryEventResponse)
async def create_memory_event(
agent_id: str,
event: AgentMemoryEventCreate,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Створити подію пам'яті агента"""
# Перевірка що agent_id збігається
if event.agent_id != agent_id:
raise HTTPException(status_code=400, detail="agent_id mismatch")
db_event = create_agent_memory_event(db, event)
return AgentMemoryEventResponse.model_validate(db_event)
@app.get("/agents/{agent_id}/memory", response_model=AgentMemoryEventListResponse)
async def list_memory_events(
agent_id: str,
team_id: Optional[str] = Query(None),
channel_id: Optional[str] = Query(None),
scope: Optional[str] = Query(None, description="short_term | mid_term | long_term"),
kind: Optional[str] = Query(None, description="message | fact | summary | note"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
cursor: Optional[str] = Query(None),
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Отримати список подій пам'яті агента"""
events, next_cursor = get_agent_memory_events(
db, agent_id, team_id, channel_id, scope, kind, skip, limit, cursor
)
return AgentMemoryEventListResponse(
items=[AgentMemoryEventResponse.model_validate(e) for e in events],
total=len(events),
cursor=next_cursor
)
@app.delete("/agents/{agent_id}/memory/{event_id}")
async def delete_memory_event(
agent_id: str,
event_id: str,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""Видалити подію пам'яті"""
success = delete_agent_memory_event(db, event_id)
if not success:
raise HTTPException(status_code=404, detail="Memory event not found")
return {"success": True}
# ========== Token Gate Integration Endpoint ==========
@app.post("/token-gate/check", response_model=TokenGateCheckResponse)
async def check_token_gate_endpoint(
check: TokenGateCheck,
db: Session = Depends(get_db),
user_id: str = Depends(verify_token)
):
"""
Перевірка токен-гейту для факту
Інтеграція з RBAC/Wallet Service для перевірки доступу
"""
if check.user_id != user_id:
raise HTTPException(status_code=403, detail="Cannot check token gate for other user")
return await check_token_gate(user_id, check.token_requirements, db)
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)

View File

@@ -0,0 +1,159 @@
"""
SQLAlchemy моделі для Memory Service
Підтримує: user_facts, dialog_summaries, agent_memory_events
"""
from sqlalchemy import (
Column, String, Text, JSON, TIMESTAMP, ForeignKey,
CheckConstraint, Index, Boolean, Integer
)
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from pgvector.sqlalchemy import Vector
Base = declarative_base()
class UserFact(Base):
"""
Довгострокові факти про користувача
Використовується для контрольованої довгострокової пам'яті
(мови, вподобання, тип користувача, токен-статуси)
"""
__tablename__ = "user_facts"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True)
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=True, index=True)
# Ключ факту (наприклад: "language", "is_donor", "is_validator", "top_contributor")
fact_key = Column(String, nullable=False, index=True)
# Значення факту (може бути текст, число, boolean, JSON)
fact_value = Column(Text, nullable=True)
fact_value_json = Column(JSONB, nullable=True)
# Метадані: джерело, впевненість, термін дії
metadata = Column(JSONB, nullable=False, server_default="{}")
# Токен-гейт: чи залежить факт від токенів/активності
token_gated = Column(Boolean, nullable=False, server_default="false")
token_requirements = Column(JSONB, nullable=True) # {"token": "DAAR", "min_balance": 1}
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(TIMESTAMP(timezone=True), nullable=True, onupdate=func.now())
expires_at = Column(TIMESTAMP(timezone=True), nullable=True) # Для тимчасових фактів
__table_args__ = (
Index("idx_user_facts_user_key", "user_id", "fact_key"),
Index("idx_user_facts_team", "team_id"),
Index("idx_user_facts_token_gated", "token_gated"),
)
class DialogSummary(Base):
"""
Підсумки діалогів для масштабування без переповнення контексту
Зберігає агреговану інформацію про сесії/діалоги
"""
__tablename__ = "dialog_summaries"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
# Контекст діалогу
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=False, index=True)
channel_id = Column(String, ForeignKey("channels.id", ondelete="CASCADE"), nullable=True, index=True)
agent_id = Column(String, ForeignKey("agents.id", ondelete="CASCADE"), nullable=True, index=True)
user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True)
# Період, який охоплює підсумок
period_start = Column(TIMESTAMP(timezone=True), nullable=False)
period_end = Column(TIMESTAMP(timezone=True), nullable=False)
# Підсумок
summary_text = Column(Text, nullable=False)
summary_json = Column(JSONB, nullable=True) # Структуровані дані
# Статистика
message_count = Column(Integer, nullable=False, server_default="0")
participant_count = Column(Integer, nullable=False, server_default="0")
# Ключові теми/теги
topics = Column(JSONB, nullable=True) # ["project-planning", "bug-fix", ...]
# Метадані
metadata = Column(JSONB, nullable=False, server_default="{}")
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("idx_dialog_summaries_team_period", "team_id", "period_start", "period_end"),
Index("idx_dialog_summaries_channel", "channel_id"),
Index("idx_dialog_summaries_agent", "agent_id"),
)
class AgentMemoryEvent(Base):
"""
Події пам'яті агентів (short-term, mid-term, long-term)
Базується на документації: docs/cursor/13_agent_memory_system.md
"""
__tablename__ = "agent_memory_events"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
agent_id = Column(String, ForeignKey("agents.id", ondelete="CASCADE"), nullable=False, index=True)
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=False, index=True)
channel_id = Column(String, ForeignKey("channels.id", ondelete="CASCADE"), nullable=True, index=True)
user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True)
# Scope: short_term, mid_term, long_term
scope = Column(
String,
nullable=False,
CheckConstraint("scope IN ('short_term', 'mid_term', 'long_term')")
)
# Kind: message, fact, summary, note
kind = Column(
String,
nullable=False,
CheckConstraint("kind IN ('message', 'fact', 'summary', 'note')")
)
# Тіло події
body_text = Column(Text, nullable=True)
body_json = Column(JSONB, nullable=True)
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("idx_agent_memory_events_agent_team_scope", "agent_id", "team_id", "scope"),
Index("idx_agent_memory_events_channel", "agent_id", "channel_id"),
Index("idx_agent_memory_events_created_at", "created_at"),
)
class AgentMemoryFactsVector(Base):
"""
Векторні представлення фактів для RAG (Retrieval-Augmented Generation)
"""
__tablename__ = "agent_memory_facts_vector"
id = Column(UUID(as_uuid=False), primary_key=True, server_default=func.gen_random_uuid())
agent_id = Column(String, ForeignKey("agents.id", ondelete="CASCADE"), nullable=False, index=True)
team_id = Column(String, ForeignKey("teams.id", ondelete="CASCADE"), nullable=False, index=True)
fact_text = Column(Text, nullable=False)
embedding = Column(Vector(1536), nullable=True) # OpenAI ada-002 embedding size
metadata = Column(JSONB, nullable=False, server_default="{}")
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
__table_args__ = (
Index("idx_agent_memory_facts_vector_agent_team", "agent_id", "team_id"),
)

View File

@@ -0,0 +1,169 @@
"""
Pydantic схеми для Memory Service API
"""
from datetime import datetime
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field, field_validator, ConfigDict
from uuid import UUID
# ========== User Facts Schemas ==========
class UserFactBase(BaseModel):
"""Базова схема для user fact"""
fact_key: str = Field(..., description="Ключ факту (наприклад: 'language', 'is_donor')")
fact_value: Optional[str] = Field(None, description="Текстове значення")
fact_value_json: Optional[Dict[str, Any]] = Field(None, description="JSON значення")
team_id: Optional[str] = Field(None, description="ID команди (якщо факт командно-специфічний)")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Метадані")
token_gated: bool = Field(False, description="Чи залежить факт від токенів")
token_requirements: Optional[Dict[str, Any]] = Field(None, description="Вимоги до токенів")
expires_at: Optional[datetime] = Field(None, description="Термін дії факту")
class UserFactCreate(UserFactBase):
"""Схема для створення/оновлення факту"""
user_id: str = Field(..., description="ID користувача")
class UserFactUpdate(BaseModel):
"""Схема для часткового оновлення факту"""
fact_value: Optional[str] = None
fact_value_json: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
token_gated: Optional[bool] = None
token_requirements: Optional[Dict[str, Any]] = None
expires_at: Optional[datetime] = None
class UserFactResponse(UserFactBase):
"""Схема відповіді для user fact"""
id: str
user_id: str
created_at: datetime
updated_at: Optional[datetime]
model_config = ConfigDict(from_attributes=True)
class UserFactUpsertRequest(BaseModel):
"""Схема для upsert операції (створення або оновлення)"""
user_id: str
fact_key: str
fact_value: Optional[str] = None
fact_value_json: Optional[Dict[str, Any]] = None
team_id: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
token_gated: bool = False
token_requirements: Optional[Dict[str, Any]] = None
expires_at: Optional[datetime] = None
class UserFactUpsertResponse(BaseModel):
"""Відповідь на upsert"""
fact: UserFactResponse
created: bool = Field(..., description="Чи був створений новий факт")
# ========== Dialog Summary Schemas ==========
class DialogSummaryBase(BaseModel):
"""Базова схема для dialog summary"""
team_id: str
channel_id: Optional[str] = None
agent_id: Optional[str] = None
user_id: Optional[str] = None
period_start: datetime
period_end: datetime
summary_text: str
summary_json: Optional[Dict[str, Any]] = None
message_count: int = 0
participant_count: int = 0
topics: Optional[List[str]] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class DialogSummaryCreate(DialogSummaryBase):
"""Схема для створення summary"""
pass
class DialogSummaryResponse(DialogSummaryBase):
"""Схема відповіді для summary"""
id: str
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class DialogSummaryListResponse(BaseModel):
"""Схема для списку summaries"""
items: List[DialogSummaryResponse]
total: int
cursor: Optional[str] = None
# ========== Agent Memory Event Schemas ==========
class AgentMemoryEventBase(BaseModel):
"""Базова схема для memory event"""
agent_id: str
team_id: str
channel_id: Optional[str] = None
user_id: Optional[str] = None
scope: str = Field(..., description="short_term | mid_term | long_term")
kind: str = Field(..., description="message | fact | summary | note")
body_text: Optional[str] = None
body_json: Optional[Dict[str, Any]] = None
@field_validator("scope")
@classmethod
def validate_scope(cls, v):
if v not in ["short_term", "mid_term", "long_term"]:
raise ValueError("scope must be one of: short_term, mid_term, long_term")
return v
@field_validator("kind")
@classmethod
def validate_kind(cls, v):
if v not in ["message", "fact", "summary", "note"]:
raise ValueError("kind must be one of: message, fact, summary, note")
return v
class AgentMemoryEventCreate(AgentMemoryEventBase):
"""Схема для створення memory event"""
pass
class AgentMemoryEventResponse(AgentMemoryEventBase):
"""Схема відповіді для memory event"""
id: str
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class AgentMemoryEventListResponse(BaseModel):
"""Схема для списку memory events"""
items: List[AgentMemoryEventResponse]
total: int
cursor: Optional[str] = None
# ========== Token Gate Integration ==========
class TokenGateCheck(BaseModel):
"""Перевірка токен-гейту для факту"""
user_id: str
fact_key: str
token_requirements: Dict[str, Any]
class TokenGateCheckResponse(BaseModel):
"""Відповідь на перевірку токен-гейту"""
allowed: bool
reason: Optional[str] = None
missing_requirements: Optional[Dict[str, Any]] = None