Files
Apple 5290287058 feat: implement TTS, Document processing, and Memory Service /facts API
- TTS: xtts-v2 integration with voice cloning support
- Document: docling integration for PDF/DOCX/PPTX processing
- Memory Service: added /facts/upsert, /facts/{key}, /facts endpoints
- Added required dependencies (TTS, docling)
2026-01-17 08:16:37 -08:00

73 lines
2.5 KiB
Python

"""
JWT Authentication для Memory Service
"""
import os
import jwt
import time
from typing import Optional, Union
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.config import get_settings
settings = get_settings()
# JWT settings
JWT_SECRET = settings.jwt_secret or os.getenv("MEMORY_JWT_SECRET", "change-me-in-production")
JWT_ALGORITHM = settings.jwt_algorithm
JWT_EXPIRATION = settings.jwt_expiration
security = HTTPBearer()
security_optional = HTTPBearer(auto_error=False)
def generate_jwt_token(service_name: str, permissions: list = None) -> str:
"""Генерація JWT токену для сервісу"""
payload = {
"service": service_name,
"permissions": permissions or ["read", "write"],
"iat": int(time.time()),
"exp": int(time.time()) + JWT_EXPIRATION
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt_token(token: str) -> dict:
"""Перевірка JWT токену"""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_current_service_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional)
) -> Optional[dict]:
"""Dependency для отримання поточного сервісу з JWT (опціонально)"""
if not credentials:
return None
token = credentials.credentials
try:
payload = verify_jwt_token(token)
return payload
except HTTPException:
return None
async def get_current_service(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
"""Dependency для отримання поточного сервісу з JWT (обов'язково)"""
token = credentials.credentials
payload = verify_jwt_token(token)
return payload
def require_permission(permission: str):
"""Decorator для перевірки прав доступу"""
async def permission_checker(service: dict = Security(get_current_service)):
if permission not in service.get("permissions", []):
raise HTTPException(status_code=403, detail=f"Permission '{permission}' required")
return service
return permission_checker