Some checks failed
Build and Deploy Docs / build-and-deploy (push) Has been cancelled
- Created logs/ structure (sessions, operations, incidents) - Added session-start/log/end scripts - Installed Git hooks for auto-logging commits/pushes - Added shell integration for zsh - Created CHANGELOG.md - Documented today's session (2026-01-10)
342 lines
10 KiB
Python
342 lines
10 KiB
Python
"""
|
|
Passkey Routes (WebAuthn)
|
|
4 endpoints: register/start, register/finish, authenticate/start, authenticate/finish
|
|
"""
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, Dict, Any
|
|
import base64
|
|
import json
|
|
|
|
from passkey_store import PasskeyStore
|
|
from webauthn_utils import webauthn_manager, generate_session_token
|
|
from models import ActorIdentity, ActorType
|
|
|
|
router = APIRouter(prefix="/auth/passkey", tags=["passkey"])
|
|
|
|
# Global store (injected at startup)
|
|
passkey_store: Optional[PasskeyStore] = None
|
|
|
|
def get_store() -> PasskeyStore:
|
|
if not passkey_store:
|
|
raise HTTPException(500, "Passkey store not initialized")
|
|
return passkey_store
|
|
|
|
# ============================================================================
|
|
# Request/Response Models
|
|
# ============================================================================
|
|
|
|
class RegistrationStartRequest(BaseModel):
|
|
email: str = Field(..., min_length=3, max_length=255)
|
|
username: Optional[str] = None
|
|
display_name: Optional[str] = None
|
|
|
|
class RegistrationStartResponse(BaseModel):
|
|
options: Dict[str, Any]
|
|
challenge: str
|
|
|
|
class RegistrationFinishRequest(BaseModel):
|
|
email: str
|
|
credential: Dict[str, Any] # WebAuthn credential response
|
|
|
|
class RegistrationFinishResponse(BaseModel):
|
|
success: bool
|
|
user_id: str
|
|
message: str
|
|
|
|
class AuthenticationStartRequest(BaseModel):
|
|
email: Optional[str] = None # Optional for resident key
|
|
|
|
class AuthenticationStartResponse(BaseModel):
|
|
options: Dict[str, Any]
|
|
challenge: str
|
|
|
|
class AuthenticationFinishRequest(BaseModel):
|
|
credential: Dict[str, Any] # WebAuthn assertion response
|
|
|
|
class AuthenticationFinishResponse(BaseModel):
|
|
session_token: str
|
|
actor: ActorIdentity
|
|
|
|
# ============================================================================
|
|
# REGISTRATION FLOW
|
|
# ============================================================================
|
|
|
|
@router.post("/register/start", response_model=RegistrationStartResponse)
|
|
async def register_start(
|
|
request: RegistrationStartRequest,
|
|
store: PasskeyStore = Depends(get_store)
|
|
):
|
|
"""
|
|
Step 1 of registration: Generate WebAuthn challenge
|
|
|
|
Creates or finds user, generates registration options
|
|
"""
|
|
|
|
# Check if user already exists
|
|
user = await store.get_user_by_email(request.email)
|
|
|
|
if not user:
|
|
# Create new user
|
|
username = request.username or request.email.split('@')[0]
|
|
display_name = request.display_name or username
|
|
|
|
user = await store.create_user(
|
|
email=request.email,
|
|
username=username,
|
|
display_name=display_name
|
|
)
|
|
print(f"✅ Created new user: {user['id']}")
|
|
else:
|
|
print(f"✅ Found existing user: {user['id']}")
|
|
|
|
# Generate registration options
|
|
result = webauthn_manager.generate_registration_challenge(
|
|
user_id=str(user['id']),
|
|
username=user['username'],
|
|
display_name=user['display_name']
|
|
)
|
|
|
|
# Store challenge
|
|
await store.store_challenge(
|
|
challenge=result['challenge'],
|
|
challenge_type='register',
|
|
user_id=str(user['id']),
|
|
email=request.email
|
|
)
|
|
|
|
print(f"✅ Generated registration challenge for {request.email}")
|
|
|
|
return RegistrationStartResponse(
|
|
options=result['options'],
|
|
challenge=result['challenge']
|
|
)
|
|
|
|
@router.post("/register/finish", response_model=RegistrationFinishResponse)
|
|
async def register_finish(
|
|
request: RegistrationFinishRequest,
|
|
store: PasskeyStore = Depends(get_store)
|
|
):
|
|
"""
|
|
Step 2 of registration: Verify attestation and store credential
|
|
|
|
Validates WebAuthn response, stores public key
|
|
"""
|
|
|
|
# Get user
|
|
user = await store.get_user_by_email(request.email)
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
|
|
# Extract challenge from credential
|
|
client_data_json = base64.urlsafe_b64decode(
|
|
request.credential['response']['clientDataJSON'] + "=="
|
|
)
|
|
client_data = json.loads(client_data_json)
|
|
challenge_b64 = client_data['challenge']
|
|
|
|
# Verify challenge
|
|
challenge_record = await store.verify_challenge(
|
|
challenge=challenge_b64,
|
|
challenge_type='register'
|
|
)
|
|
|
|
if not challenge_record:
|
|
raise HTTPException(400, "Invalid or expired challenge")
|
|
|
|
# Verify registration
|
|
expected_challenge = base64.urlsafe_b64decode(challenge_b64 + "==")
|
|
|
|
verification = webauthn_manager.verify_registration(
|
|
credential=request.credential,
|
|
expected_challenge=expected_challenge,
|
|
expected_origin=webauthn_manager.origin,
|
|
expected_rp_id=webauthn_manager.rp_id
|
|
)
|
|
|
|
if not verification['verified']:
|
|
raise HTTPException(400, f"Registration verification failed: {verification.get('error')}")
|
|
|
|
# Store passkey
|
|
await store.create_passkey(
|
|
user_id=str(user['id']),
|
|
credential_id=verification['credential_id'],
|
|
public_key=verification['public_key'],
|
|
sign_count=verification['sign_count'],
|
|
aaguid=verification['aaguid'],
|
|
attestation_format=verification['attestation_format']
|
|
)
|
|
|
|
print(f"✅ Registered passkey for user {user['id']}")
|
|
|
|
return RegistrationFinishResponse(
|
|
success=True,
|
|
user_id=str(user['id']),
|
|
message="Passkey registered successfully"
|
|
)
|
|
|
|
# ============================================================================
|
|
# AUTHENTICATION FLOW
|
|
# ============================================================================
|
|
|
|
@router.post("/authenticate/start", response_model=AuthenticationStartResponse)
|
|
async def authenticate_start(
|
|
request: AuthenticationStartRequest,
|
|
store: PasskeyStore = Depends(get_store)
|
|
):
|
|
"""
|
|
Step 1 of authentication: Generate WebAuthn challenge
|
|
|
|
Finds user's passkeys, generates authentication options
|
|
"""
|
|
|
|
credentials = []
|
|
user_id = None
|
|
|
|
if request.email:
|
|
# Email-based authentication
|
|
user = await store.get_user_by_email(request.email)
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
|
|
user_id = str(user['id'])
|
|
|
|
# Get user's passkeys
|
|
passkeys = await store.get_passkeys_by_user_id(user_id)
|
|
credentials = [
|
|
{
|
|
"credential_id": pk['credential_id'],
|
|
"transports": pk.get('transports', [])
|
|
}
|
|
for pk in passkeys
|
|
]
|
|
|
|
if not credentials:
|
|
raise HTTPException(404, "No passkeys found for this user")
|
|
else:
|
|
# Resident key authentication (discoverable credential)
|
|
# Allow any passkey
|
|
pass
|
|
|
|
# Generate authentication options
|
|
result = webauthn_manager.generate_authentication_challenge(credentials)
|
|
|
|
# Store challenge
|
|
await store.store_challenge(
|
|
challenge=result['challenge'],
|
|
challenge_type='authenticate',
|
|
user_id=user_id,
|
|
email=request.email
|
|
)
|
|
|
|
print(f"✅ Generated authentication challenge")
|
|
|
|
return AuthenticationStartResponse(
|
|
options=result['options'],
|
|
challenge=result['challenge']
|
|
)
|
|
|
|
@router.post("/authenticate/finish", response_model=AuthenticationFinishResponse)
|
|
async def authenticate_finish(
|
|
request: AuthenticationFinishRequest,
|
|
store: PasskeyStore = Depends(get_store)
|
|
):
|
|
"""
|
|
Step 2 of authentication: Verify assertion and create session
|
|
|
|
Validates WebAuthn response, returns session token
|
|
"""
|
|
|
|
# Extract credential ID and challenge
|
|
credential_id_b64 = request.credential['id']
|
|
|
|
client_data_json = base64.urlsafe_b64decode(
|
|
request.credential['response']['clientDataJSON'] + "=="
|
|
)
|
|
client_data = json.loads(client_data_json)
|
|
challenge_b64 = client_data['challenge']
|
|
|
|
# Verify challenge
|
|
challenge_record = await store.verify_challenge(
|
|
challenge=challenge_b64,
|
|
challenge_type='authenticate'
|
|
)
|
|
|
|
if not challenge_record:
|
|
raise HTTPException(400, "Invalid or expired challenge")
|
|
|
|
# Get passkey
|
|
passkey = await store.get_passkey_by_credential_id(credential_id_b64)
|
|
if not passkey:
|
|
raise HTTPException(404, "Passkey not found")
|
|
|
|
# Verify authentication
|
|
expected_challenge = base64.urlsafe_b64decode(challenge_b64 + "==")
|
|
public_key_bytes = base64.urlsafe_b64decode(passkey['public_key'] + "==")
|
|
|
|
verification = webauthn_manager.verify_authentication(
|
|
credential=request.credential,
|
|
expected_challenge=expected_challenge,
|
|
credential_public_key=public_key_bytes,
|
|
credential_current_sign_count=passkey['sign_count'],
|
|
expected_origin=webauthn_manager.origin,
|
|
expected_rp_id=webauthn_manager.rp_id
|
|
)
|
|
|
|
if not verification['verified']:
|
|
raise HTTPException(400, f"Authentication verification failed: {verification.get('error')}")
|
|
|
|
# Update sign count
|
|
await store.update_sign_count(
|
|
credential_id=credential_id_b64,
|
|
new_sign_count=verification['new_sign_count']
|
|
)
|
|
|
|
# Get user
|
|
user = await store.get_user_by_id(str(passkey['user_id']))
|
|
if not user:
|
|
raise HTTPException(404, "User not found")
|
|
|
|
# Update last login
|
|
await store.update_last_login(str(user['id']))
|
|
|
|
# Create session
|
|
session_token = generate_session_token()
|
|
await store.create_session(
|
|
token=session_token,
|
|
user_id=str(user['id'])
|
|
)
|
|
|
|
# Build ActorIdentity
|
|
memberships = await store.get_user_microdao_memberships(str(user['id']))
|
|
|
|
actor = ActorIdentity(
|
|
actor_id=f"user:{user['id']}",
|
|
actor_type=ActorType.HUMAN,
|
|
microdao_ids=[m['microdao_id'] for m in memberships],
|
|
roles=[m['role'] for m in memberships]
|
|
)
|
|
|
|
print(f"✅ Authenticated user {user['id']}")
|
|
|
|
return AuthenticationFinishResponse(
|
|
session_token=session_token,
|
|
actor=actor
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|