""" RBAC Checker — перевірка прав доступу через Postgres """ import asyncpg from typing import Optional class RBACChecker: def __init__(self, postgres_url: str): self.postgres_url = postgres_url self.pool: Optional[asyncpg.Pool] = None async def connect(self): """Підключення до Postgres""" if not self.postgres_url: print("⚠️ POSTGRES_URL не встановлено, RBAC вимкнено") return try: self.pool = await asyncpg.create_pool(self.postgres_url) await self._ensure_table() print("✅ Підключено до Postgres для RBAC") except Exception as e: print(f"❌ Помилка підключення до Postgres: {e}") self.pool = None async def disconnect(self): """Відключення від Postgres""" if self.pool: await self.pool.close() print("✅ Відключено від Postgres") async def _ensure_table(self): """Створення таблиці для RBAC якщо не існує""" async with self.pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS matrix_rbac ( user_id VARCHAR(255) PRIMARY KEY, permissions TEXT[] NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_matrix_rbac_user_id ON matrix_rbac(user_id); """) # Додаємо дефолтний користувач (якщо потрібно) # await conn.execute(""" # INSERT INTO matrix_rbac (user_id, permissions) # VALUES ($1, $2) # ON CONFLICT (user_id) DO NOTHING # """, "@admin:matrix.org", ["execute_job", "admin"]) async def check_permission(self, user_id: str, permission: str) -> bool: """Перевірка прав доступу""" if not self.pool: # Якщо Postgres не підключено, дозволяємо всім (dev режим) return True try: async with self.pool.acquire() as conn: row = await conn.fetchrow( "SELECT permissions FROM matrix_rbac WHERE user_id = $1", user_id ) if not row: # Користувач не знайдено - за замовчуванням заборонено return False permissions = row["permissions"] return permission in permissions or "admin" in permissions except Exception as e: print(f"❌ Помилка перевірки RBAC: {e}") return False