""" Privacy Gate Middleware ======================= Обов'язковий middleware для контролю приватності на Router рівні. Правила: - public: повний доступ + логування контенту - team: тільки для членів команди + логування metadata - confidential: тільки sanitized або з consent + без логування контенту - e2ee: НІКОЛИ plaintext на сервері """ import re import logging from typing import Optional, Dict, Any, Callable from dataclasses import dataclass from enum import Enum logger = logging.getLogger(__name__) class PrivacyMode(str, Enum): """Режими приватності""" PUBLIC = "public" TEAM = "team" CONFIDENTIAL = "confidential" E2EE = "e2ee" class GateAction(str, Enum): """Дії Privacy Gate""" ALLOW = "allow" SANITIZE = "sanitize" BLOCK = "block" REQUIRE_CONSENT = "require_consent" @dataclass class GateResult: """Результат перевірки Privacy Gate""" action: GateAction allow_content: bool log_content: bool transform: Optional[Callable[[str], str]] = None reason: str = "" consent_required: bool = False # PII Patterns для sanitization PII_PATTERNS = [ (r'\b\d{10,13}\b', '[PHONE]'), # Phone numbers (r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]'), # Emails (r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CARD]'), # Credit cards (r'\b\d{8,10}\b', '[ID]'), # IDs (r'\b(?:\+38)?0\d{9}\b', '[UA_PHONE]'), # UA phone numbers (r'\b\d{10}\b', '[IPN]'), # Ukrainian IPN ] def remove_pii(text: str) -> str: """Видаляє PII з тексту""" result = text for pattern, replacement in PII_PATTERNS: result = re.sub(pattern, replacement, result) return result def sanitize_context(context: str, max_length: int = 200) -> str: """ Sanitizes context for confidential mode. 1. Видаляє PII 2. Обмежує довжину 3. Узагальнює """ if not context: return "[Empty context]" # Step 1: Remove PII sanitized = remove_pii(context) # Step 2: Truncate if len(sanitized) > max_length: sanitized = sanitized[:max_length] + "..." # Step 3: Add marker return f"[Sanitized] {sanitized}" def create_summary(context: str, max_words: int = 20) -> str: """ Створює короткий summary для handoff. """ if not context: return "[No context]" # Remove PII first clean = remove_pii(context) # Take first N words words = clean.split()[:max_words] summary = " ".join(words) if len(clean.split()) > max_words: summary += "..." return summary class PrivacyGate: """ Privacy Gate - контролює доступ до контенту на основі mode. Usage: gate = PrivacyGate() result = gate.check(request) if result.action == GateAction.BLOCK: raise PrivacyViolation(result.reason) if result.transform: request.context = result.transform(request.context) """ def __init__(self, config: Dict[str, Any] = None): self.config = config or {} self._enabled = self.config.get("enabled", True) def check(self, mode: str, user_id: str = None, team_id: str = None, user_consent: bool = False, team_members: list = None) -> GateResult: """ Перевіряє запит і повертає рішення. Args: mode: Режим приватності (public/team/confidential/e2ee) user_id: ID користувача team_id: ID команди user_consent: Чи є згода користувача team_members: Список членів команди (для team mode) Returns: GateResult з рішенням """ if not self._enabled: return GateResult( action=GateAction.ALLOW, allow_content=True, log_content=True, reason="Privacy Gate disabled" ) try: privacy_mode = PrivacyMode(mode.lower()) except ValueError: # Unknown mode -> treat as confidential privacy_mode = PrivacyMode.CONFIDENTIAL logger.warning(f"Unknown privacy mode: {mode}, treating as confidential") # Route to specific handler if privacy_mode == PrivacyMode.PUBLIC: return self._check_public() elif privacy_mode == PrivacyMode.TEAM: return self._check_team(user_id, team_id, team_members) elif privacy_mode == PrivacyMode.CONFIDENTIAL: return self._check_confidential(user_consent) elif privacy_mode == PrivacyMode.E2EE: return self._check_e2ee() # Default: block unknown return GateResult( action=GateAction.BLOCK, allow_content=False, log_content=False, reason="Unknown privacy mode" ) def _check_public(self) -> GateResult: """Public mode: повний доступ""" return GateResult( action=GateAction.ALLOW, allow_content=True, log_content=True, reason="Public mode - full access" ) def _check_team(self, user_id: str, team_id: str, team_members: list = None) -> GateResult: """Team mode: тільки для членів команди""" # Check membership if team_members and user_id and user_id not in team_members: return GateResult( action=GateAction.BLOCK, allow_content=False, log_content=False, reason=f"User {user_id} is not a member of team {team_id}" ) return GateResult( action=GateAction.ALLOW, allow_content=True, log_content=False, # Тільки metadata reason="Team mode - metadata logging only" ) def _check_confidential(self, user_consent: bool) -> GateResult: """Confidential mode: sanitized або з consent""" if user_consent: # З consent - дозволяємо, але без логування контенту return GateResult( action=GateAction.ALLOW, allow_content=True, log_content=False, reason="Confidential mode - user consent given" ) else: # Без consent - тільки sanitized return GateResult( action=GateAction.SANITIZE, allow_content=True, log_content=False, transform=sanitize_context, consent_required=True, reason="Confidential mode - sanitizing content" ) def _check_e2ee(self) -> GateResult: """E2EE mode: НІКОЛИ plaintext на сервері""" return GateResult( action=GateAction.BLOCK, allow_content=False, log_content=False, reason="E2EE mode - server-side processing not allowed" ) def apply_transform(self, content: str, result: GateResult) -> str: """Застосовує transform якщо потрібно""" if result.transform and content: return result.transform(content) return content class PrivacyViolation(Exception): """Виняток при порушенні приватності""" pass # ==================== MIDDLEWARE ==================== async def privacy_gate_middleware(request, call_next, gate: PrivacyGate = None): """ FastAPI/Starlette middleware для Privacy Gate. Usage: from privacy_gate import privacy_gate_middleware, PrivacyGate gate = PrivacyGate() @app.middleware("http") async def privacy_middleware(request, call_next): return await privacy_gate_middleware(request, call_next, gate) """ if gate is None: gate = PrivacyGate() # Extract mode from request mode = "public" # Default user_consent = False # Try to get from headers if hasattr(request, 'headers'): mode = request.headers.get("X-Privacy-Mode", "public") user_consent = request.headers.get("X-User-Consent", "false").lower() == "true" # Check privacy result = gate.check( mode=mode, user_consent=user_consent ) # Handle based on result if result.action == GateAction.BLOCK: from fastapi.responses import JSONResponse return JSONResponse( status_code=403, content={"error": "Privacy violation", "reason": result.reason} ) # Add privacy info to request state if hasattr(request, 'state'): request.state.privacy_result = result request.state.log_content = result.log_content # Continue with request response = await call_next(request) return response # ==================== ROUTER INTEGRATION ==================== def check_privacy_for_handoff( source_agent: str, target_agent: str, context: str, mode: str = "public", user_consent: bool = False ) -> Dict[str, Any]: """ Перевіряє приватність перед handoff між агентами. Returns: { "allowed": bool, "context": str (можливо sanitized), "reason": str, "consent_required": bool } """ gate = PrivacyGate() result = gate.check(mode=mode, user_consent=user_consent) if result.action == GateAction.BLOCK: return { "allowed": False, "context": None, "reason": result.reason, "consent_required": False } # Apply transform if needed final_context = gate.apply_transform(context, result) return { "allowed": True, "context": final_context, "reason": result.reason, "consent_required": result.consent_required, "was_sanitized": result.transform is not None } # ==================== AUDIT HELPERS ==================== def create_privacy_audit_event( action: str, mode: str, user_id: str, result: GateResult, content_hash: str = None ) -> Dict[str, Any]: """ Створює audit event для Privacy Gate. Note: НІКОЛИ не включає контент, тільки hash якщо потрібно. """ from datetime import datetime return { "event_type": "privacy.check", "timestamp": datetime.utcnow().isoformat(), "action": action, "mode": mode, "user_id": user_id, "gate_action": result.action.value, "allow_content": result.allow_content, "log_content": result.log_content, "consent_required": result.consent_required, "reason": result.reason, "content_hash": content_hash # Hash, not content }