103 lines
2.7 KiB
Python
103 lines
2.7 KiB
Python
"""
|
|
Security utilities: password hashing, JWT tokens
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, Dict, Any
|
|
from uuid import UUID
|
|
from jose import jwt, JWTError
|
|
from passlib.context import CryptContext
|
|
|
|
from config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
# Password hashing
|
|
pwd_context = CryptContext(
|
|
schemes=["bcrypt"],
|
|
deprecated="auto",
|
|
bcrypt__rounds=settings.bcrypt_rounds
|
|
)
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password using bcrypt"""
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify a password against its hash"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
# JWT Token operations
|
|
def create_access_token(
|
|
user_id: UUID,
|
|
email: str,
|
|
display_name: Optional[str],
|
|
roles: list[str]
|
|
) -> str:
|
|
"""Create a JWT access token"""
|
|
now = datetime.now(timezone.utc)
|
|
expire = now + timedelta(seconds=settings.access_token_ttl)
|
|
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"email": email,
|
|
"name": display_name,
|
|
"roles": roles,
|
|
"type": "access",
|
|
"iss": "daarion-auth",
|
|
"iat": int(now.timestamp()),
|
|
"exp": int(expire.timestamp())
|
|
}
|
|
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def create_refresh_token(user_id: UUID, session_id: UUID) -> str:
|
|
"""Create a JWT refresh token"""
|
|
now = datetime.now(timezone.utc)
|
|
expire = now + timedelta(seconds=settings.refresh_token_ttl)
|
|
|
|
payload = {
|
|
"sub": str(user_id),
|
|
"session_id": str(session_id),
|
|
"type": "refresh",
|
|
"iss": "daarion-auth",
|
|
"iat": int(now.timestamp()),
|
|
"exp": int(expire.timestamp())
|
|
}
|
|
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def decode_token(token: str) -> Optional[Dict[str, Any]]:
|
|
"""Decode and validate a JWT token"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.jwt_secret,
|
|
algorithms=[settings.jwt_algorithm],
|
|
options={"verify_exp": True}
|
|
)
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def decode_access_token(token: str) -> Optional[Dict[str, Any]]:
|
|
"""Decode an access token and verify it's the correct type"""
|
|
payload = decode_token(token)
|
|
if payload and payload.get("type") == "access":
|
|
return payload
|
|
return None
|
|
|
|
|
|
def decode_refresh_token(token: str) -> Optional[Dict[str, Any]]:
|
|
"""Decode a refresh token and verify it's the correct type"""
|
|
payload = decode_token(token)
|
|
if payload and payload.get("type") == "refresh":
|
|
return payload
|
|
return None
|
|
|