- TTS: xtts-v2 integration with voice cloning support
- Document: docling integration for PDF/DOCX/PPTX processing
- Memory Service: added /facts/upsert, /facts/{key}, /facts endpoints
- Added required dependencies (TTS, docling)
73 lines
2.5 KiB
Python
73 lines
2.5 KiB
Python
"""
|
|
JWT Authentication для Memory Service
|
|
"""
|
|
|
|
import os
|
|
import jwt
|
|
import time
|
|
from typing import Optional, Union
|
|
from fastapi import HTTPException, Security, Depends
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
# JWT settings
|
|
JWT_SECRET = settings.jwt_secret or os.getenv("MEMORY_JWT_SECRET", "change-me-in-production")
|
|
JWT_ALGORITHM = settings.jwt_algorithm
|
|
JWT_EXPIRATION = settings.jwt_expiration
|
|
|
|
security = HTTPBearer()
|
|
security_optional = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def generate_jwt_token(service_name: str, permissions: list = None) -> str:
|
|
"""Генерація JWT токену для сервісу"""
|
|
payload = {
|
|
"service": service_name,
|
|
"permissions": permissions or ["read", "write"],
|
|
"iat": int(time.time()),
|
|
"exp": int(time.time()) + JWT_EXPIRATION
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
|
|
def verify_jwt_token(token: str) -> dict:
|
|
"""Перевірка JWT токену"""
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="Token expired")
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
|
|
async def get_current_service_optional(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional)
|
|
) -> Optional[dict]:
|
|
"""Dependency для отримання поточного сервісу з JWT (опціонально)"""
|
|
if not credentials:
|
|
return None
|
|
token = credentials.credentials
|
|
try:
|
|
payload = verify_jwt_token(token)
|
|
return payload
|
|
except HTTPException:
|
|
return None
|
|
|
|
async def get_current_service(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
|
|
"""Dependency для отримання поточного сервісу з JWT (обов'язково)"""
|
|
token = credentials.credentials
|
|
payload = verify_jwt_token(token)
|
|
return payload
|
|
|
|
|
|
def require_permission(permission: str):
|
|
"""Decorator для перевірки прав доступу"""
|
|
async def permission_checker(service: dict = Security(get_current_service)):
|
|
if permission not in service.get("permissions", []):
|
|
raise HTTPException(status_code=403, detail=f"Permission '{permission}' required")
|
|
return service
|
|
return permission_checker
|