Files
microdao-daarion/services/auth-service/webauthn_utils.py
Apple fca48b3eb0 feat(node2): Complete NODE2 setup - guardian, agents, swapper models
- Node-guardian running on MacBook and updating metrics
- NODE2 agents (Atlas, Greeter, Oracle, Builder Bot) assigned to node-2-macbook-m4max
- Swapper models displaying correctly (8 models)
- DAGI Router agents showing with correct status (3 active, 1 stale)
- Router health check using node_cache for remote nodes
2025-12-02 07:07:58 -08:00

213 lines
6.6 KiB
Python

"""
WebAuthn Utilities for DAARION
Handles challenge generation, credential validation, and attestation
"""
import os
import secrets
import base64
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import hashlib
# WebAuthn library
from webauthn import (
generate_registration_options,
verify_registration_response,
generate_authentication_options,
verify_authentication_response,
options_to_json,
)
from webauthn.helpers.structs import (
PublicKeyCredentialDescriptor,
UserVerificationRequirement,
AuthenticatorSelectionCriteria,
ResidentKeyRequirement,
AuthenticatorAttachment,
)
from webauthn.helpers.cose import COSEAlgorithmIdentifier
# Configuration
RP_ID = os.getenv("RP_ID", "localhost")
RP_NAME = os.getenv("RP_NAME", "DAARION")
ORIGIN = os.getenv("ORIGIN", "http://localhost:3000")
class WebAuthnManager:
"""Manages WebAuthn operations"""
def __init__(self):
self.rp_id = RP_ID
self.rp_name = RP_NAME
self.origin = ORIGIN
def generate_registration_challenge(
self,
user_id: str,
username: str,
display_name: str
) -> Dict[str, Any]:
"""
Generate WebAuthn registration options
Returns PublicKeyCredentialCreationOptions
"""
# Generate options using py_webauthn
options = generate_registration_options(
rp_id=self.rp_id,
rp_name=self.rp_name,
user_id=user_id.encode('utf-8'),
user_name=username,
user_display_name=display_name,
authenticator_selection=AuthenticatorSelectionCriteria(
authenticator_attachment=AuthenticatorAttachment.PLATFORM,
resident_key=ResidentKeyRequirement.PREFERRED,
user_verification=UserVerificationRequirement.PREFERRED,
),
supported_pub_key_algs=[
COSEAlgorithmIdentifier.ECDSA_SHA_256, # -7
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256, # -257
],
timeout=60000, # 60 seconds
)
# Convert to JSON-serializable dict
options_json = options_to_json(options)
options_dict = json.loads(options_json)
return {
"options": options_dict,
"challenge": base64.urlsafe_b64encode(options.challenge).decode('utf-8').rstrip('=')
}
def verify_registration(
self,
credential: Dict[str, Any],
expected_challenge: bytes,
expected_origin: str,
expected_rp_id: str
) -> Dict[str, Any]:
"""
Verify WebAuthn registration response
Returns verified credential data
"""
try:
verification = verify_registration_response(
credential=credential,
expected_challenge=expected_challenge,
expected_origin=expected_origin,
expected_rp_id=expected_rp_id,
)
return {
"verified": True,
"credential_id": base64.urlsafe_b64encode(verification.credential_id).decode('utf-8').rstrip('='),
"public_key": base64.urlsafe_b64encode(verification.credential_public_key).decode('utf-8').rstrip('='),
"sign_count": verification.sign_count,
"aaguid": verification.aaguid.hex() if verification.aaguid else None,
"attestation_format": verification.fmt,
}
except Exception as e:
return {
"verified": False,
"error": str(e)
}
def generate_authentication_challenge(
self,
credentials: list[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Generate WebAuthn authentication options
credentials: list of user's passkeys with credential_id
"""
# Convert stored credentials to PublicKeyCredentialDescriptor
allow_credentials = [
PublicKeyCredentialDescriptor(
id=base64.urlsafe_b64decode(cred["credential_id"] + "=="),
transports=cred.get("transports", [])
)
for cred in credentials
]
options = generate_authentication_options(
rp_id=self.rp_id,
allow_credentials=allow_credentials if allow_credentials else None,
user_verification=UserVerificationRequirement.PREFERRED,
timeout=60000,
)
# Convert to JSON-serializable dict
options_json = options_to_json(options)
options_dict = json.loads(options_json)
return {
"options": options_dict,
"challenge": base64.urlsafe_b64encode(options.challenge).decode('utf-8').rstrip('=')
}
def verify_authentication(
self,
credential: Dict[str, Any],
expected_challenge: bytes,
credential_public_key: bytes,
credential_current_sign_count: int,
expected_origin: str,
expected_rp_id: str
) -> Dict[str, Any]:
"""
Verify WebAuthn authentication response
Returns verification result with new sign count
"""
try:
verification = verify_authentication_response(
credential=credential,
expected_challenge=expected_challenge,
expected_rp_id=expected_rp_id,
expected_origin=expected_origin,
credential_public_key=credential_public_key,
credential_current_sign_count=credential_current_sign_count,
)
return {
"verified": True,
"new_sign_count": verification.new_sign_count
}
except Exception as e:
return {
"verified": False,
"error": str(e)
}
# Global instance
webauthn_manager = WebAuthnManager()
# ============================================================================
# Helper Functions
# ============================================================================
def generate_challenge() -> str:
"""Generate a cryptographically secure random challenge"""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
def generate_session_token() -> str:
"""Generate a secure session token"""
return secrets.token_urlsafe(32)
def hash_credential_id(credential_id: str) -> str:
"""Hash credential ID for storage"""
return hashlib.sha256(credential_id.encode()).hexdigest()