""" JWT Authentication для Memory Service """ import os import jwt import time from typing import Optional, Union from fastapi import HTTPException, Security, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.config import get_settings settings = get_settings() # JWT settings JWT_SECRET = settings.jwt_secret or os.getenv("MEMORY_JWT_SECRET", "change-me-in-production") JWT_ALGORITHM = settings.jwt_algorithm JWT_EXPIRATION = settings.jwt_expiration security = HTTPBearer() security_optional = HTTPBearer(auto_error=False) def generate_jwt_token(service_name: str, permissions: list = None) -> str: """Генерація JWT токену для сервісу""" payload = { "service": service_name, "permissions": permissions or ["read", "write"], "iat": int(time.time()), "exp": int(time.time()) + JWT_EXPIRATION } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def verify_jwt_token(token: str) -> dict: """Перевірка JWT токену""" try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") async def get_current_service_optional( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional) ) -> Optional[dict]: """Dependency для отримання поточного сервісу з JWT (опціонально)""" if not credentials: return None token = credentials.credentials try: payload = verify_jwt_token(token) return payload except HTTPException: return None async def get_current_service(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict: """Dependency для отримання поточного сервісу з JWT (обов'язково)""" token = credentials.credentials payload = verify_jwt_token(token) return payload def require_permission(permission: str): """Decorator для перевірки прав доступу""" async def permission_checker(service: dict = Security(get_current_service)): if permission not in service.get("permissions", []): raise HTTPException(status_code=403, detail=f"Permission '{permission}' required") return service return permission_checker