# KYC Attestation Endpoints (NO PII to LLM) # To be appended to memory-service/app/main.py from typing import Optional from pydantic import BaseModel from datetime import datetime # ============================================================================ # KYC ATTESTATIONS # ============================================================================ class KYCAttestationUpdate(BaseModel): account_id: str kyc_status: str # unverified, pending, passed, failed kyc_provider: Optional[str] = None jurisdiction: Optional[str] = None # ISO country code risk_tier: Optional[str] = "unknown" # low, medium, high, unknown pep_sanctions_flag: bool = False wallet_verified: bool = False class KYCAttestationResponse(BaseModel): account_id: str kyc_status: str kyc_provider: Optional[str] jurisdiction: Optional[str] risk_tier: str pep_sanctions_flag: bool wallet_verified: bool attested_at: Optional[datetime] created_at: datetime @app.get("/kyc/attestation") async def get_kyc_attestation(account_id: str) -> KYCAttestationResponse: """ Get KYC attestation for an account. Returns status flags only - NO personal data. """ try: row = await db.pool.fetchrow( """ SELECT * FROM kyc_attestations WHERE account_id = $1::uuid """, account_id ) if not row: # Return default unverified status return KYCAttestationResponse( account_id=account_id, kyc_status="unverified", kyc_provider=None, jurisdiction=None, risk_tier="unknown", pep_sanctions_flag=False, wallet_verified=False, attested_at=None, created_at=datetime.utcnow() ) return KYCAttestationResponse( account_id=str(row['account_id']), kyc_status=row['kyc_status'], kyc_provider=row['kyc_provider'], jurisdiction=row['jurisdiction'], risk_tier=row['risk_tier'], pep_sanctions_flag=row['pep_sanctions_flag'], wallet_verified=row['wallet_verified'], attested_at=row['attested_at'], created_at=row['created_at'] ) except Exception as e: logger.error(f"Failed to get KYC attestation: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/kyc/attestation") async def update_kyc_attestation(attestation: KYCAttestationUpdate): """ Update KYC attestation for an account. Called by KYC provider webhook or admin. """ try: valid_statuses = ['unverified', 'pending', 'passed', 'failed'] if attestation.kyc_status not in valid_statuses: raise HTTPException( status_code=400, detail=f"Invalid kyc_status. Must be one of: {valid_statuses}" ) valid_tiers = ['low', 'medium', 'high', 'unknown'] if attestation.risk_tier not in valid_tiers: raise HTTPException( status_code=400, detail=f"Invalid risk_tier. Must be one of: {valid_tiers}" ) await db.pool.execute( """ INSERT INTO kyc_attestations ( account_id, kyc_status, kyc_provider, jurisdiction, risk_tier, pep_sanctions_flag, wallet_verified, attested_at ) VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, NOW()) ON CONFLICT (account_id) DO UPDATE SET kyc_status = EXCLUDED.kyc_status, kyc_provider = EXCLUDED.kyc_provider, jurisdiction = EXCLUDED.jurisdiction, risk_tier = EXCLUDED.risk_tier, pep_sanctions_flag = EXCLUDED.pep_sanctions_flag, wallet_verified = EXCLUDED.wallet_verified, attested_at = NOW(), updated_at = NOW() """, attestation.account_id, attestation.kyc_status, attestation.kyc_provider, attestation.jurisdiction, attestation.risk_tier, attestation.pep_sanctions_flag, attestation.wallet_verified ) logger.info(f"KYC attestation updated for account {attestation.account_id}: {attestation.kyc_status}") return {"success": True, "account_id": attestation.account_id, "status": attestation.kyc_status} except HTTPException: raise except Exception as e: logger.error(f"Failed to update KYC attestation: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/kyc/webhook/provider") async def kyc_provider_webhook( account_id: str, status: str, provider: str, jurisdiction: Optional[str] = None, risk_tier: Optional[str] = "unknown", pep_flag: bool = False ): """ Webhook endpoint for KYC providers. Updates attestation when KYC check completes. """ try: # Map provider status to our status status_map = { 'approved': 'passed', 'verified': 'passed', 'rejected': 'failed', 'denied': 'failed', 'pending': 'pending', 'review': 'pending' } mapped_status = status_map.get(status.lower(), status.lower()) attestation = KYCAttestationUpdate( account_id=account_id, kyc_status=mapped_status, kyc_provider=provider, jurisdiction=jurisdiction, risk_tier=risk_tier, pep_sanctions_flag=pep_flag, wallet_verified=False # Wallet verification is separate ) return await update_kyc_attestation(attestation) except Exception as e: logger.error(f"KYC webhook failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/kyc/stats") async def get_kyc_stats(): """Get KYC statistics for the platform.""" try: stats = await db.pool.fetchrow( """ SELECT COUNT(*) FILTER (WHERE kyc_status = 'passed') as passed, COUNT(*) FILTER (WHERE kyc_status = 'pending') as pending, COUNT(*) FILTER (WHERE kyc_status = 'failed') as failed, COUNT(*) FILTER (WHERE kyc_status = 'unverified') as unverified, COUNT(*) FILTER (WHERE wallet_verified = true) as wallets_verified, COUNT(*) FILTER (WHERE pep_sanctions_flag = true) as pep_flagged, COUNT(*) as total FROM kyc_attestations """ ) return { "passed": stats['passed'] or 0, "pending": stats['pending'] or 0, "failed": stats['failed'] or 0, "unverified": stats['unverified'] or 0, "wallets_verified": stats['wallets_verified'] or 0, "pep_flagged": stats['pep_flagged'] or 0, "total": stats['total'] or 0 } except Exception as e: logger.error(f"Failed to get KYC stats: {e}") raise HTTPException(status_code=500, detail=str(e))