""" Auth for Sofiia Console. Strategy: cookie-based session (httpOnly, Secure, SameSite=Lax). Login: POST /api/auth/login { "key": "..." } → sets console_token cookie Logout: POST /api/auth/logout → clears cookie All protected endpoints check: 1. Cookie "console_token" (browser sessions) 2. X-API-Key header (backward compat: curl, API clients) Single operator key: SOFIIA_CONSOLE_API_KEY Team keys (multi-user): SOFIIA_CONSOLE_TEAM_KEYS = "alice:key1,bob:key2,sergiy:key3" - Each user gets a personal key; login stores a per-user cookie token. - require_auth returns "user:" for audit identification. Dev mode (ENV != prod, no key configured): open access. """ import hashlib import logging import os import secrets from typing import Dict, Optional, Tuple from fastapi import Cookie, HTTPException, Request, Security from fastapi.security import APIKeyHeader logger = logging.getLogger(__name__) _IS_PROD = os.getenv("ENV", "dev").strip().lower() in ("prod", "production", "staging") _COOKIE_NAME = "console_token" _COOKIE_MAX_AGE = 60 * 60 * 24 * 7 # 7 days API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) def get_console_api_key() -> str: return os.getenv("SOFIIA_CONSOLE_API_KEY", "").strip() def _get_team_keys() -> Dict[str, str]: """ Parse SOFIIA_CONSOLE_TEAM_KEYS = "alice:key1,bob:key2" Returns {name: key} mapping. Entries with empty name or key are skipped. """ raw = os.getenv("SOFIIA_CONSOLE_TEAM_KEYS", "").strip() if not raw: return {} result: Dict[str, str] = {} for entry in raw.split(","): entry = entry.strip() if ":" not in entry: continue name, _, key = entry.partition(":") name = name.strip() key = key.strip() if name and key: result[name] = key return result def _key_valid(provided: str) -> bool: """Check against the primary key. Returns True if no key configured (open).""" configured = get_console_api_key() if not configured: return True return secrets.compare_digest(provided.strip(), configured) def _team_key_identity(provided: str) -> Optional[str]: """ Check provided key against team keys. Returns user name if match found, None otherwise. """ for name, key in _get_team_keys().items(): if secrets.compare_digest(provided.strip(), key): return name return None def _cookie_token(api_key: str) -> str: """Derive a stable session token from the api key (so we never store key directly in cookie).""" return hashlib.sha256(api_key.encode()).hexdigest() def _expected_cookie_token() -> str: return _cookie_token(get_console_api_key()) def _expected_team_cookie_tokens() -> Dict[str, str]: """Returns {cookie_token: user_name} for all team keys.""" return {_cookie_token(key): name for name, key in _get_team_keys().items()} def validate_any_key(provided: str) -> Tuple[bool, str]: """ Check provided key against primary key AND team keys. Returns (is_valid, identity_string). identity_string: "operator" for primary key, "user:" for team key. """ # Primary key configured = get_console_api_key() if configured and secrets.compare_digest(provided.strip(), configured): return True, "operator" if not configured: return True, "anonymous" # Team keys name = _team_key_identity(provided) if name: return True, f"user:{name}" return False, "" def require_auth( request: Request, x_api_key: str = Security(API_KEY_HEADER), ) -> str: """ Check cookie OR X-API-Key header. Localhost (127.0.0.1 / ::1) is ALWAYS allowed — no key needed. In dev mode without a configured key: pass through. Returns identity string for audit (e.g. "operator", "user:alice", "localhost"). """ # Localhost bypass — always open for local development client_ip = (request.client.host if request.client else "") or "" if client_ip in ("127.0.0.1", "::1", "localhost"): return "localhost" configured = get_console_api_key() team_keys = _get_team_keys() if not configured and not team_keys: if _IS_PROD: logger.warning("SOFIIA_CONSOLE_API_KEY not set in prod — console is OPEN") return "" # 1) Cookie check — primary key cookie cookie_val = request.cookies.get(_COOKIE_NAME, "") if cookie_val: if configured and secrets.compare_digest(cookie_val, _expected_cookie_token()): return "operator" # Team key cookies team_tokens = _expected_team_cookie_tokens() if cookie_val in team_tokens: return f"user:{team_tokens[cookie_val]}" # 2) X-API-Key header if x_api_key: valid, identity = validate_any_key(x_api_key) if valid: return identity raise HTTPException(status_code=401, detail="Unauthorized") def require_auth_strict( request: Request, x_api_key: str = Security(API_KEY_HEADER), ) -> str: """Like require_auth but always fails if no key is configured.""" configured = get_console_api_key() if not configured: raise HTTPException(status_code=503, detail="SOFIIA_CONSOLE_API_KEY not configured") return require_auth(request, x_api_key) def require_audit_auth( request: Request, x_api_key: str = Security(API_KEY_HEADER), ) -> str: """ Strict auth for sensitive audit endpoint: - Requires configured SOFIIA_CONSOLE_API_KEY - Does NOT allow localhost bypass - Accepts cookie or X-API-Key when key is configured """ configured = get_console_api_key() if not configured: raise HTTPException(status_code=401, detail="Audit endpoint requires SOFIIA_CONSOLE_API_KEY") cookie_val = request.cookies.get(_COOKIE_NAME, "") if cookie_val and secrets.compare_digest(cookie_val, _expected_cookie_token()): return "cookie" if x_api_key and _key_valid(x_api_key): return "header" raise HTTPException(status_code=401, detail="Unauthorized") # Keep old names as aliases so existing imports don't break require_api_key = require_auth require_api_key_strict = require_auth_strict