Files
microdao-daarion/services/auth-service/security.py

103 lines
2.7 KiB
Python

"""
Security utilities: password hashing, JWT tokens
"""
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
from uuid import UUID
from jose import jwt, JWTError
from passlib.context import CryptContext
from config import get_settings
settings = get_settings()
# Password hashing
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=settings.bcrypt_rounds
)
def hash_password(password: str) -> str:
"""Hash a password using bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
# JWT Token operations
def create_access_token(
user_id: UUID,
email: str,
display_name: Optional[str],
roles: list[str]
) -> str:
"""Create a JWT access token"""
now = datetime.now(timezone.utc)
expire = now + timedelta(seconds=settings.access_token_ttl)
payload = {
"sub": str(user_id),
"email": email,
"name": display_name,
"roles": roles,
"type": "access",
"iss": "daarion-auth",
"iat": int(now.timestamp()),
"exp": int(expire.timestamp())
}
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
def create_refresh_token(user_id: UUID, session_id: UUID) -> str:
"""Create a JWT refresh token"""
now = datetime.now(timezone.utc)
expire = now + timedelta(seconds=settings.refresh_token_ttl)
payload = {
"sub": str(user_id),
"session_id": str(session_id),
"type": "refresh",
"iss": "daarion-auth",
"iat": int(now.timestamp()),
"exp": int(expire.timestamp())
}
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
def decode_token(token: str) -> Optional[Dict[str, Any]]:
"""Decode and validate a JWT token"""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
options={"verify_exp": True}
)
return payload
except JWTError:
return None
def decode_access_token(token: str) -> Optional[Dict[str, Any]]:
"""Decode an access token and verify it's the correct type"""
payload = decode_token(token)
if payload and payload.get("type") == "access":
return payload
return None
def decode_refresh_token(token: str) -> Optional[Dict[str, Any]]:
"""Decode a refresh token and verify it's the correct type"""
payload = decode_token(token)
if payload and payload.get("type") == "refresh":
return payload
return None