Files
microdao-daarion/services/auth-service/routes_passkey.py
Apple 6bd769ef40 feat(city-map): Add 2D City Map with coordinates and agent presence
- Add migration 013_city_map_coordinates.sql with map coordinates, zones, and agents table
- Add /city/map API endpoint in city-service
- Add /city/agents and /city/agents/online endpoints
- Extend presence aggregator to include agents[] in snapshot
- Add AgentsSource for fetching agent data from DB
- Create CityMap component with interactive room tiles
- Add useCityMap hook for fetching map data
- Update useGlobalPresence to include agents
- Add map/list view toggle on /city page
- Add agent badges to room cards and map tiles
2025-11-27 07:00:47 -08:00

331 lines
9.9 KiB
Python

"""
Passkey Routes (WebAuthn)
4 endpoints: register/start, register/finish, authenticate/start, authenticate/finish
"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
import base64
import json
from passkey_store import PasskeyStore
from webauthn_utils import webauthn_manager, generate_session_token
from models import ActorIdentity, ActorType
router = APIRouter(prefix="/auth/passkey", tags=["passkey"])
# Global store (injected at startup)
passkey_store: Optional[PasskeyStore] = None
def get_store() -> PasskeyStore:
if not passkey_store:
raise HTTPException(500, "Passkey store not initialized")
return passkey_store
# ============================================================================
# Request/Response Models
# ============================================================================
class RegistrationStartRequest(BaseModel):
email: str = Field(..., min_length=3, max_length=255)
username: Optional[str] = None
display_name: Optional[str] = None
class RegistrationStartResponse(BaseModel):
options: Dict[str, Any]
challenge: str
class RegistrationFinishRequest(BaseModel):
email: str
credential: Dict[str, Any] # WebAuthn credential response
class RegistrationFinishResponse(BaseModel):
success: bool
user_id: str
message: str
class AuthenticationStartRequest(BaseModel):
email: Optional[str] = None # Optional for resident key
class AuthenticationStartResponse(BaseModel):
options: Dict[str, Any]
challenge: str
class AuthenticationFinishRequest(BaseModel):
credential: Dict[str, Any] # WebAuthn assertion response
class AuthenticationFinishResponse(BaseModel):
session_token: str
actor: ActorIdentity
# ============================================================================
# REGISTRATION FLOW
# ============================================================================
@router.post("/register/start", response_model=RegistrationStartResponse)
async def register_start(
request: RegistrationStartRequest,
store: PasskeyStore = Depends(get_store)
):
"""
Step 1 of registration: Generate WebAuthn challenge
Creates or finds user, generates registration options
"""
# Check if user already exists
user = await store.get_user_by_email(request.email)
if not user:
# Create new user
username = request.username or request.email.split('@')[0]
display_name = request.display_name or username
user = await store.create_user(
email=request.email,
username=username,
display_name=display_name
)
print(f"✅ Created new user: {user['id']}")
else:
print(f"✅ Found existing user: {user['id']}")
# Generate registration options
result = webauthn_manager.generate_registration_challenge(
user_id=str(user['id']),
username=user['username'],
display_name=user['display_name']
)
# Store challenge
await store.store_challenge(
challenge=result['challenge'],
challenge_type='register',
user_id=str(user['id']),
email=request.email
)
print(f"✅ Generated registration challenge for {request.email}")
return RegistrationStartResponse(
options=result['options'],
challenge=result['challenge']
)
@router.post("/register/finish", response_model=RegistrationFinishResponse)
async def register_finish(
request: RegistrationFinishRequest,
store: PasskeyStore = Depends(get_store)
):
"""
Step 2 of registration: Verify attestation and store credential
Validates WebAuthn response, stores public key
"""
# Get user
user = await store.get_user_by_email(request.email)
if not user:
raise HTTPException(404, "User not found")
# Extract challenge from credential
client_data_json = base64.urlsafe_b64decode(
request.credential['response']['clientDataJSON'] + "=="
)
client_data = json.loads(client_data_json)
challenge_b64 = client_data['challenge']
# Verify challenge
challenge_record = await store.verify_challenge(
challenge=challenge_b64,
challenge_type='register'
)
if not challenge_record:
raise HTTPException(400, "Invalid or expired challenge")
# Verify registration
expected_challenge = base64.urlsafe_b64decode(challenge_b64 + "==")
verification = webauthn_manager.verify_registration(
credential=request.credential,
expected_challenge=expected_challenge,
expected_origin=webauthn_manager.origin,
expected_rp_id=webauthn_manager.rp_id
)
if not verification['verified']:
raise HTTPException(400, f"Registration verification failed: {verification.get('error')}")
# Store passkey
await store.create_passkey(
user_id=str(user['id']),
credential_id=verification['credential_id'],
public_key=verification['public_key'],
sign_count=verification['sign_count'],
aaguid=verification['aaguid'],
attestation_format=verification['attestation_format']
)
print(f"✅ Registered passkey for user {user['id']}")
return RegistrationFinishResponse(
success=True,
user_id=str(user['id']),
message="Passkey registered successfully"
)
# ============================================================================
# AUTHENTICATION FLOW
# ============================================================================
@router.post("/authenticate/start", response_model=AuthenticationStartResponse)
async def authenticate_start(
request: AuthenticationStartRequest,
store: PasskeyStore = Depends(get_store)
):
"""
Step 1 of authentication: Generate WebAuthn challenge
Finds user's passkeys, generates authentication options
"""
credentials = []
user_id = None
if request.email:
# Email-based authentication
user = await store.get_user_by_email(request.email)
if not user:
raise HTTPException(404, "User not found")
user_id = str(user['id'])
# Get user's passkeys
passkeys = await store.get_passkeys_by_user_id(user_id)
credentials = [
{
"credential_id": pk['credential_id'],
"transports": pk.get('transports', [])
}
for pk in passkeys
]
if not credentials:
raise HTTPException(404, "No passkeys found for this user")
else:
# Resident key authentication (discoverable credential)
# Allow any passkey
pass
# Generate authentication options
result = webauthn_manager.generate_authentication_challenge(credentials)
# Store challenge
await store.store_challenge(
challenge=result['challenge'],
challenge_type='authenticate',
user_id=user_id,
email=request.email
)
print(f"✅ Generated authentication challenge")
return AuthenticationStartResponse(
options=result['options'],
challenge=result['challenge']
)
@router.post("/authenticate/finish", response_model=AuthenticationFinishResponse)
async def authenticate_finish(
request: AuthenticationFinishRequest,
store: PasskeyStore = Depends(get_store)
):
"""
Step 2 of authentication: Verify assertion and create session
Validates WebAuthn response, returns session token
"""
# Extract credential ID and challenge
credential_id_b64 = request.credential['id']
client_data_json = base64.urlsafe_b64decode(
request.credential['response']['clientDataJSON'] + "=="
)
client_data = json.loads(client_data_json)
challenge_b64 = client_data['challenge']
# Verify challenge
challenge_record = await store.verify_challenge(
challenge=challenge_b64,
challenge_type='authenticate'
)
if not challenge_record:
raise HTTPException(400, "Invalid or expired challenge")
# Get passkey
passkey = await store.get_passkey_by_credential_id(credential_id_b64)
if not passkey:
raise HTTPException(404, "Passkey not found")
# Verify authentication
expected_challenge = base64.urlsafe_b64decode(challenge_b64 + "==")
public_key_bytes = base64.urlsafe_b64decode(passkey['public_key'] + "==")
verification = webauthn_manager.verify_authentication(
credential=request.credential,
expected_challenge=expected_challenge,
credential_public_key=public_key_bytes,
credential_current_sign_count=passkey['sign_count'],
expected_origin=webauthn_manager.origin,
expected_rp_id=webauthn_manager.rp_id
)
if not verification['verified']:
raise HTTPException(400, f"Authentication verification failed: {verification.get('error')}")
# Update sign count
await store.update_sign_count(
credential_id=credential_id_b64,
new_sign_count=verification['new_sign_count']
)
# Get user
user = await store.get_user_by_id(str(passkey['user_id']))
if not user:
raise HTTPException(404, "User not found")
# Update last login
await store.update_last_login(str(user['id']))
# Create session
session_token = generate_session_token()
await store.create_session(
token=session_token,
user_id=str(user['id'])
)
# Build ActorIdentity
memberships = await store.get_user_microdao_memberships(str(user['id']))
actor = ActorIdentity(
actor_id=f"user:{user['id']}",
actor_type=ActorType.HUMAN,
microdao_ids=[m['microdao_id'] for m in memberships],
roles=[m['role'] for m in memberships]
)
print(f"✅ Authenticated user {user['id']}")
return AuthenticationFinishResponse(
session_token=session_token,
actor=actor
)