- JWT middleware для FastAPI - Генерація/перевірка JWT токенів - Скрипти для генерації Qdrant API keys - Скрипти для генерації NATS operator JWT - План реалізації Auth TODO: Додати JWT до endpoints, NATS nkeys config, Qdrant API key config
59 lines
2.0 KiB
Python
59 lines
2.0 KiB
Python
"""
|
|
JWT Authentication для Memory Service
|
|
"""
|
|
|
|
import os
|
|
import jwt
|
|
import time
|
|
from typing import Optional
|
|
from fastapi import HTTPException, Security
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
# JWT settings
|
|
JWT_SECRET = settings.jwt_secret or os.getenv("MEMORY_JWT_SECRET", "change-me-in-production")
|
|
JWT_ALGORITHM = settings.jwt_algorithm
|
|
JWT_EXPIRATION = settings.jwt_expiration
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
def generate_jwt_token(service_name: str, permissions: list = None) -> str:
|
|
"""Генерація JWT токену для сервісу"""
|
|
payload = {
|
|
"service": service_name,
|
|
"permissions": permissions or ["read", "write"],
|
|
"iat": int(time.time()),
|
|
"exp": int(time.time()) + JWT_EXPIRATION
|
|
}
|
|
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
|
|
|
|
|
def verify_jwt_token(token: str) -> dict:
|
|
"""Перевірка JWT токену"""
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(status_code=401, detail="Token expired")
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
|
|
async def get_current_service(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
|
|
"""Dependency для отримання поточного сервісу з JWT"""
|
|
token = credentials.credentials
|
|
payload = verify_jwt_token(token)
|
|
return payload
|
|
|
|
|
|
def require_permission(permission: str):
|
|
"""Decorator для перевірки прав доступу"""
|
|
async def permission_checker(service: dict = Security(get_current_service)):
|
|
if permission not in service.get("permissions", []):
|
|
raise HTTPException(status_code=403, detail=f"Permission '{permission}' required")
|
|
return service
|
|
return permission_checker
|