Files
microdao-daarion/services/memory-service/app/auth.py
Apple 38cb96dd68 🔐 Auth: інтеграція JWT в Memory Service + конфігурації
- Опціональна JWT auth в Memory Service endpoints
- get_current_service_optional для backward compatibility
- NATS auth config (nkeys) - шаблони
- Qdrant auth config (API keys) - шаблони
- Тестовий скрипт для повного потоку

TODO: Генерація реальних JWT/ключів та застосування конфігів
2026-01-10 10:46:25 -08:00

72 lines
2.5 KiB
Python

"""
JWT Authentication для Memory Service
"""
import os
import jwt
import time
from typing import Optional, Union
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.config import get_settings
settings = get_settings()
# JWT settings
JWT_SECRET = settings.jwt_secret or os.getenv("MEMORY_JWT_SECRET", "change-me-in-production")
JWT_ALGORITHM = settings.jwt_algorithm
JWT_EXPIRATION = settings.jwt_expiration
security = HTTPBearer()
def generate_jwt_token(service_name: str, permissions: list = None) -> str:
"""Генерація JWT токену для сервісу"""
payload = {
"service": service_name,
"permissions": permissions or ["read", "write"],
"iat": int(time.time()),
"exp": int(time.time()) + JWT_EXPIRATION
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt_token(token: str) -> dict:
"""Перевірка JWT токену"""
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_current_service_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Security(security, auto_error=False)
) -> Optional[dict]:
"""Dependency для отримання поточного сервісу з JWT (опціонально)"""
if not credentials:
return None
token = credentials.credentials
try:
payload = verify_jwt_token(token)
return payload
except HTTPException:
return None
async def get_current_service(credentials: HTTPAuthorizationCredentials = Security(security)) -> dict:
"""Dependency для отримання поточного сервісу з JWT (обов'язково)"""
token = credentials.credentials
payload = verify_jwt_token(token)
return payload
def require_permission(permission: str):
"""Decorator для перевірки прав доступу"""
async def permission_checker(service: dict = Security(get_current_service)):
if permission not in service.get("permissions", []):
raise HTTPException(status_code=403, detail=f"Permission '{permission}' required")
return service
return permission_checker