""" Service-to-Service Authentication ================================= JWT-based authentication between internal services. Usage: from service_auth import create_service_token, verify_service_token, require_service_auth # Create token for service token = create_service_token("router", "router") # Verify in endpoint @app.get("/protected") @require_service_auth(allowed_roles=["router", "gateway"]) async def protected_endpoint(): return {"status": "ok"} """ import os import jwt import time from typing import List, Optional, Dict, Any from functools import wraps from fastapi import HTTPException, Header, Request # Configuration JWT_SECRET = os.getenv("JWT_SECRET", "change-me-in-production") JWT_ALGORITHM = "HS256" JWT_AUDIENCE = os.getenv("SERVICE_AUD", "microdao-internal") JWT_ISSUER = os.getenv("SERVICE_ISS", "microdao") # Service roles and permissions SERVICE_ROLES = { "gateway": ["gateway", "router", "worker", "parser"], "router": ["router", "worker"], "worker": ["worker"], "memory": ["memory"], "control-plane": ["control-plane"], "parser": ["parser"], "ingest": ["ingest"] } # Service-to-service access matrix SERVICE_ACCESS = { "gateway": ["memory", "control-plane", "router"], "router": ["memory", "control-plane", "swapper"], "worker": ["memory", "router"], "parser": ["memory"], "ingest": ["memory"] } def create_service_token(service_id: str, service_role: str, expires_in: int = 900) -> str: """ Create JWT token for service-to-service authentication. Args: service_id: Unique service identifier (e.g., "router", "gateway") service_role: Service role (e.g., "router", "gateway") expires_in: Token expiration in seconds (default: 1 hour) Returns: JWT token string """ now = int(time.time()) payload = { "sub": service_id, "role": service_role, "aud": JWT_AUDIENCE, "iss": JWT_ISSUER, "iat": now, "exp": now + expires_in, "service_id": service_id, "service_role": service_role } token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) return token def verify_service_token(token: str) -> Dict[str, Any]: """ Verify service JWT token. Returns: Decoded token payload Raises: HTTPException: If token is invalid """ try: payload = jwt.decode( token, JWT_SECRET, algorithms=[JWT_ALGORITHM], audience=JWT_AUDIENCE, issuer=JWT_ISSUER ) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError as e: raise HTTPException(status_code=401, detail=f"Invalid token: {e}") def require_service_auth(allowed_roles: List[str] = None, allowed_services: List[str] = None): """ Decorator to require service authentication. Args: allowed_roles: List of allowed service roles allowed_services: List of allowed service IDs """ def decorator(func): @wraps(func) async def wrapper(request: Request, *args, **kwargs): # Get Authorization header auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException( status_code=401, detail="Missing or invalid Authorization header" ) token = auth_header.replace("Bearer ", "") try: payload = verify_service_token(token) service_id = payload.get("service_id") service_role = payload.get("role") # Check if service is allowed if allowed_roles and service_role not in allowed_roles: raise HTTPException( status_code=403, detail=f"Service role '{service_role}' not allowed" ) if allowed_services and service_id not in allowed_services: raise HTTPException( status_code=403, detail=f"Service '{service_id}' not allowed" ) # Add service info to request state request.state.service_id = service_id request.state.service_role = service_role return await func(request, *args, **kwargs) except HTTPException: raise except Exception as e: raise HTTPException(status_code=401, detail=f"Authentication failed: {e}") return wrapper return decorator def get_service_token() -> str: """ Get service token for current service (from environment). """ service_id = os.getenv("SERVICE_ID") service_role = os.getenv("SERVICE_ROLE", service_id) if not service_id: raise ValueError("SERVICE_ID environment variable not set") return create_service_token(service_id, service_role) # FastAPI dependency for service auth async def verify_service(request: Request, authorization: str = Header(None)): """FastAPI dependency for service authentication""" if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing Authorization header") token = authorization.replace("Bearer ", "") payload = verify_service_token(token) request.state.service_id = payload.get("service_id") request.state.service_role = payload.get("role") return payload