🚀 Matrix Gateway: базова реалізація v1

- Matrix Client (підключення та синхронізація)
- RBAC Checker (перевірка прав через Postgres)
- Job Creator (створення jobs з команд)
- NATS Publisher (публікація jobs у streams)
- K8s deployment
- README з документацією

Команди: !embed, !retrieve, !summarize

TODO: Реальна інтеграція з Matrix homeserver, статуси результатів
This commit is contained in:
Apple
2026-01-10 10:40:18 -08:00
parent a001636c11
commit a0c3c0cbb5
10 changed files with 714 additions and 0 deletions

View File

@@ -0,0 +1,88 @@
---
# Matrix Gateway Deployment
apiVersion: v1
kind: Namespace
metadata:
name: matrix-gateway
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: matrix-gateway
namespace: matrix-gateway
labels:
app: matrix-gateway
spec:
replicas: 1
selector:
matchLabels:
app: matrix-gateway
template:
metadata:
labels:
app: matrix-gateway
spec:
containers:
- name: gateway
image: matrix-gateway:latest
imagePullPolicy: Never # Local build
env:
- name: MATRIX_HOMESERVER
value: "https://matrix.org" # TODO: Замінити на свій homeserver
- name: MATRIX_USER
valueFrom:
secretKeyRef:
name: matrix-gateway-secrets
key: matrix_user
- name: MATRIX_PASSWORD
valueFrom:
secretKeyRef:
name: matrix-gateway-secrets
key: matrix_password
- name: MATRIX_ROOM_ID
valueFrom:
secretKeyRef:
name: matrix-gateway-secrets
key: matrix_room_id
- name: NATS_URL
value: "nats://nats-client.nats:4222"
- name: POSTGRES_URL
valueFrom:
secretKeyRef:
name: matrix-gateway-secrets
key: postgres_url
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
exec:
command:
- /bin/sh
- -c
- "ps aux | grep '[p]ython.*gateway.main' || exit 1"
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- /bin/sh
- -c
- "ps aux | grep '[p]ython.*gateway.main' || exit 1"
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Secret
metadata:
name: matrix-gateway-secrets
namespace: matrix-gateway
type: Opaque
stringData:
matrix_user: "@user:matrix.org" # TODO: Замінити
matrix_password: "password" # TODO: Замінити
matrix_room_id: "!roomid:matrix.org" # TODO: Замінити
postgres_url: "postgresql://user:password@host:5432/db" # TODO: Замінити

View File

@@ -0,0 +1,13 @@
FROM python:3.11-slim
WORKDIR /app
# Встановлення залежностей
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Копіювання коду
COPY gateway/ ./gateway/
# Entrypoint
ENTRYPOINT ["python3", "-m", "gateway.main"]

View File

@@ -0,0 +1,129 @@
# Matrix Gateway
**Дата:** 2026-01-10
**Версія:** 1.0.0
---
## 📋 Призначення
Matrix Gateway — компонент, який зв'язує Matrix rooms з NATS JetStream jobs:
1. **Приймає команди** з Matrix rooms (формат: `!command args`)
2. **Перевіряє RBAC** через Postgres
3. **Створює jobs** у NATS JetStream
4. **Публікує статуси** назад у Matrix
---
## 🔧 Команди
### Підтримувані команди:
- `!embed <text>` — створює embedding job (online priority)
- `!retrieve <query>` — створює retrieval job (online priority)
- `!summarize <thread_id>` — створює summarization job (offline priority)
### Приклад:
```
!embed Привіт, це тестовий текст для embedding
```
---
## 🚀 Deployment
### K8s
```bash
kubectl apply -f infrastructure/kubernetes/matrix-gateway/deployment.yaml
```
### Docker
```bash
docker build -t matrix-gateway:latest .
docker run -d \
-e MATRIX_HOMESERVER=https://matrix.org \
-e MATRIX_USER=@user:matrix.org \
-e MATRIX_PASSWORD=password \
-e MATRIX_ROOM_ID=!roomid:matrix.org \
-e NATS_URL=nats://nats-client.nats:4222 \
-e POSTGRES_URL=postgresql://... \
matrix-gateway:latest
```
---
## 🔐 RBAC
RBAC перевірка виконується через таблицю `matrix_rbac` в Postgres:
```sql
CREATE TABLE matrix_rbac (
user_id VARCHAR(255) PRIMARY KEY,
permissions TEXT[] NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
```
### Додавання користувача:
```sql
INSERT INTO matrix_rbac (user_id, permissions)
VALUES ('@user:matrix.org', ARRAY['execute_job', 'admin']);
```
---
## 📊 Архітектура
```
Matrix Room
Matrix Gateway
├── RBAC Checker (Postgres)
├── Job Creator
└── NATS Publisher
NATS JetStream
├── MM_ONLINE (embed, retrieve, summarize)
└── MM_OFFLINE (backfill, index)
Worker Daemon
└── Job Executor
Memory Service / Qdrant / PostgreSQL
```
---
## 🔄 Потік виконання
1. Користувач відправляє команду в Matrix room: `!embed текст`
2. Matrix Gateway отримує повідомлення
3. RBAC перевірка: чи має користувач право `execute_job`?
4. Парсинг команди та створення job payload
5. Публікація job у NATS (subject: `mm.embed.online`)
6. Підтвердження в Matrix: `✅ Job створено: <job_id>`
7. Worker Daemon бере job, виконує, пише результат
8. (Опціонально) Matrix Gateway публікує результат назад у Matrix
---
## 📁 Структура коду
```
gateway/
├── main.py # Entry point
├── matrix_client.py # Matrix connection & sync
├── nats_publisher.py # NATS job publishing
├── rbac.py # RBAC checking
└── job_creator.py # Job creation from commands
```
---
*Документ створено: 2026-01-10 19:30 CET*

View File

@@ -0,0 +1 @@
# Matrix Gateway package

View File

@@ -0,0 +1,61 @@
"""
Job Creator — створення jobs з командами Matrix
"""
import uuid
import hashlib
from datetime import datetime
from typing import Dict, Any, Optional
from gateway.nats_publisher import NATSPublisher
class JobCreator:
def __init__(self, nats_publisher: NATSPublisher):
self.nats_publisher = nats_publisher
async def create_job(self, command: Dict[str, Any],
tenant: Optional[Dict[str, Any]] = None,
scope: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Створення job з команди"""
job_id = self._generate_ulid()
idempotency_key = self._generate_idempotency_key(command)
job = {
"job_id": job_id,
"idempotency_key": idempotency_key,
"type": command.get("type", "unknown"),
"priority": command.get("priority", "offline"),
"tenant": tenant or {
"org_id": "550e8400-e29b-41d4-a716-446655440000" # Default DAARION org
},
"scope": scope or {},
"requirements": command.get("requirements", {
"needs_gpu": command.get("type") == "embed",
"min_vram_gb": 8 if command.get("type") == "embed" else 0,
"max_latency_ms": 300 if command.get("priority") == "online" else 60000,
"tier": "A" if command.get("priority") == "online" else "B"
}),
"input": command.get("input", {}),
"refs": command.get("refs", {}),
"trace": {
"trace_id": str(uuid.uuid4()),
"parent_span_id": None
},
"timestamps": {
"created_at": datetime.utcnow().isoformat() + "Z"
}
}
return job
def _generate_ulid(self) -> str:
"""Генерація ULID (спрощена версія)"""
# TODO: Використати бібліотеку ulid-py для правильного ULID
return str(uuid.uuid4()).replace("-", "")[:26]
def _generate_idempotency_key(self, command: Dict[str, Any]) -> str:
"""Генерація idempotency key (SHA256 hash)"""
import json
key_data = json.dumps(command, sort_keys=True)
hash_obj = hashlib.sha256(key_data.encode())
return f"sha256:{hash_obj.hexdigest()}"

View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
Matrix Gateway — зв'язок між Matrix rooms та NATS jobs
"""
import asyncio
import os
import signal
import sys
from typing import Optional
from gateway.matrix_client import MatrixClient
from gateway.nats_publisher import NATSPublisher
from gateway.rbac import RBACChecker
from gateway.job_creator import JobCreator
class MatrixGateway:
def __init__(self):
self.matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
self.matrix_user = os.getenv("MATRIX_USER", "")
self.matrix_password = os.getenv("MATRIX_PASSWORD", "")
self.matrix_room_id = os.getenv("MATRIX_ROOM_ID", "")
self.nats_url = os.getenv("NATS_URL", "nats://nats-client.nats:4222")
self.postgres_url = os.getenv("POSTGRES_URL", "")
self.matrix_client: Optional[MatrixClient] = None
self.nats_publisher: Optional[NATSPublisher] = None
self.rbac_checker: Optional[RBACChecker] = None
self.job_creator: Optional[JobCreator] = None
self.running = False
async def start(self):
"""Запуск Matrix Gateway"""
print("🚀 Matrix Gateway запускається...")
print(f" Matrix: {self.matrix_homeserver}")
print(f" NATS: {self.nats_url}")
# Ініціалізація компонентів
self.rbac_checker = RBACChecker(self.postgres_url)
self.nats_publisher = NATSPublisher(self.nats_url)
self.job_creator = JobCreator(self.nats_publisher)
self.matrix_client = MatrixClient(
homeserver=self.matrix_homeserver,
user=self.matrix_user,
password=self.matrix_password,
room_id=self.matrix_room_id,
message_handler=self.handle_matrix_message
)
# Підключення
await self.rbac_checker.connect()
await self.nats_publisher.connect()
await self.matrix_client.connect()
# Синхронізація Matrix
await self.matrix_client.sync()
self.running = True
print("✅ Matrix Gateway запущено")
async def handle_matrix_message(self, event):
"""Обробка повідомлення з Matrix"""
if event.type != "m.room.message":
return
sender = event.sender
content = event.content
body = content.get("body", "").strip()
# Перевірка, чи це команда (починається з !)
if not body.startswith("!"):
return
print(f"📨 Команда від {sender}: {body}")
# RBAC перевірка
if not await self.rbac_checker.check_permission(sender, "execute_job"):
await self.matrix_client.send_message(f"❌ Доступ заборонено для {sender}")
return
# Парсинг команди
command = self._parse_command(body)
if not command:
await self.matrix_client.send_message("❌ Невірна команда. Використання: !embed <text> | !retrieve <query> | !summarize <thread_id>")
return
# Створення job
try:
job = await self.job_creator.create_job(command)
# Публікація в NATS
await self.nats_publisher.publish_job(job)
# Підтвердження в Matrix
await self.matrix_client.send_message(
f"✅ Job створено: `{job['job_id']}`\n"
f"Тип: {job['type']}\n"
f"Пріоритет: {job['priority']}"
)
except Exception as e:
print(f"❌ Помилка створення job: {e}")
await self.matrix_client.send_message(f"❌ Помилка: {e}")
def _parse_command(self, body: str) -> Optional[dict]:
"""Парсинг команди з Matrix повідомлення"""
parts = body.split(None, 2)
if len(parts) < 2:
return None
cmd = parts[0][1:] # Видаляємо !
args = parts[1] if len(parts) > 1 else ""
if cmd == "embed":
return {
"type": "embed",
"priority": "online",
"input": {
"text": [args],
"model": "cohere/embed-multilingual-v3.0",
"dims": 1024
}
}
elif cmd == "retrieve":
return {
"type": "retrieve",
"priority": "online",
"input": {
"query": args
}
}
elif cmd == "summarize":
return {
"type": "summarize",
"priority": "offline",
"input": {
"thread_id": args
}
}
else:
return None
async def stop(self):
"""Зупинка Matrix Gateway"""
print("🛑 Зупинка Matrix Gateway...")
self.running = False
if self.matrix_client:
await self.matrix_client.disconnect()
if self.nats_publisher:
await self.nats_publisher.disconnect()
if self.rbac_checker:
await self.rbac_checker.disconnect()
print("✅ Matrix Gateway зупинено")
def setup_signal_handlers(self):
"""Налаштування обробників сигналів"""
def signal_handler(sig, frame):
print(f"\n📡 Отримано сигнал {sig}")
asyncio.create_task(self.stop())
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
gateway = MatrixGateway()
gateway.setup_signal_handlers()
try:
await gateway.start()
# Чекаємо поки працює
while gateway.running:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
await gateway.stop()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,81 @@
"""
Matrix Client — підключення до Matrix та обробка повідомлень
"""
import asyncio
from typing import Optional, Callable
from nio import AsyncClient, MatrixRoom, RoomMessageText
from nio.events import Event
class MatrixClient:
def __init__(self, homeserver: str, user: str, password: str, room_id: str, message_handler: Callable):
self.homeserver = homeserver
self.user = user
self.password = password
self.room_id = room_id
self.message_handler = message_handler
self.client: Optional[AsyncClient] = None
self.sync_task: Optional[asyncio.Task] = None
async def connect(self):
"""Підключення до Matrix"""
try:
self.client = AsyncClient(self.homeserver, self.user)
await self.client.login(self.password)
print(f"✅ Підключено до Matrix: {self.user}")
except Exception as e:
print(f"❌ Помилка підключення до Matrix: {e}")
raise
async def disconnect(self):
"""Відключення від Matrix"""
if self.sync_task:
self.sync_task.cancel()
if self.client:
await self.client.close()
print("✅ Відключено від Matrix")
async def sync(self):
"""Синхронізація Matrix (слухання повідомлень)"""
if not self.client:
raise RuntimeError("Matrix client не підключено")
# Callback для обробки повідомлень
async def message_callback(room: MatrixRoom, event: RoomMessageText):
await self.message_handler(event)
# Додаємо callback
self.client.add_event_callback(message_callback, RoomMessageText)
# Запускаємо синхронізацію в окремому task
self.sync_task = asyncio.create_task(self._sync_loop())
async def _sync_loop(self):
"""Loop для синхронізації Matrix"""
while True:
try:
await self.client.sync(timeout=30000) # 30 секунд
except asyncio.CancelledError:
break
except Exception as e:
print(f"⚠️ Помилка синхронізації Matrix: {e}")
await asyncio.sleep(5)
async def send_message(self, text: str):
"""Відправка повідомлення в Matrix room"""
if not self.client:
return
try:
await self.client.room_send(
room_id=self.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": text
}
)
except Exception as e:
print(f"❌ Помилка відправки повідомлення в Matrix: {e}")

View File

@@ -0,0 +1,68 @@
"""
NATS Publisher — публікація jobs у NATS JetStream
"""
from typing import Optional, Dict, Any
from nats.aio.client import Client as NATS
from nats.js import JetStreamContext
class NATSPublisher:
def __init__(self, nats_url: str):
self.nats_url = nats_url
self.nc: Optional[NATS] = None
self.js: Optional[JetStreamContext] = None
async def connect(self):
"""Підключення до NATS"""
try:
self.nc = NATS()
await self.nc.connect(self.nats_url)
self.js = self.nc.jetstream()
print(f"✅ Підключено до NATS: {self.nats_url}")
except Exception as e:
print(f"❌ Помилка підключення до NATS: {e}")
raise
async def disconnect(self):
"""Відключення від NATS"""
if self.nc:
await self.nc.close()
print("✅ Відключено від NATS")
async def publish_job(self, job: Dict[str, Any]):
"""Публікація job у NATS stream"""
if not self.js:
raise RuntimeError("NATS не підключено")
job_type = job.get("type", "unknown")
priority = job.get("priority", "offline")
# Визначення subject залежно від типу та пріоритету
if priority == "online":
if job_type == "embed":
subject = "mm.embed.online"
elif job_type == "retrieve":
subject = "mm.retrieve.online"
elif job_type == "summarize":
subject = "mm.summarize.online"
else:
subject = f"mm.{job_type}.online"
else:
if job_type == "embed":
subject = "mm.embed.offline"
elif job_type == "index":
subject = "mm.index.offline"
elif job_type == "backfill":
subject = "mm.backfill.offline"
else:
subject = f"mm.{job_type}.offline"
# Публікація
import json
job_json = json.dumps(job).encode()
ack = await self.js.publish(subject, job_json)
print(f"📤 Job опубліковано: {subject} (seq: {ack.seq})")
return ack

View File

@@ -0,0 +1,78 @@
"""
RBAC Checker — перевірка прав доступу через Postgres
"""
import asyncpg
from typing import Optional
class RBACChecker:
def __init__(self, postgres_url: str):
self.postgres_url = postgres_url
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Підключення до Postgres"""
if not self.postgres_url:
print("⚠️ POSTGRES_URL не встановлено, RBAC вимкнено")
return
try:
self.pool = await asyncpg.create_pool(self.postgres_url)
await self._ensure_table()
print("✅ Підключено до Postgres для RBAC")
except Exception as e:
print(f"❌ Помилка підключення до Postgres: {e}")
self.pool = None
async def disconnect(self):
"""Відключення від Postgres"""
if self.pool:
await self.pool.close()
print("✅ Відключено від Postgres")
async def _ensure_table(self):
"""Створення таблиці для RBAC якщо не існує"""
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS matrix_rbac (
user_id VARCHAR(255) PRIMARY KEY,
permissions TEXT[] NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_matrix_rbac_user_id
ON matrix_rbac(user_id);
""")
# Додаємо дефолтний користувач (якщо потрібно)
# await conn.execute("""
# INSERT INTO matrix_rbac (user_id, permissions)
# VALUES ($1, $2)
# ON CONFLICT (user_id) DO NOTHING
# """, "@admin:matrix.org", ["execute_job", "admin"])
async def check_permission(self, user_id: str, permission: str) -> bool:
"""Перевірка прав доступу"""
if not self.pool:
# Якщо Postgres не підключено, дозволяємо всім (dev режим)
return True
try:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT permissions FROM matrix_rbac WHERE user_id = $1",
user_id
)
if not row:
# Користувач не знайдено - за замовчуванням заборонено
return False
permissions = row["permissions"]
return permission in permissions or "admin" in permissions
except Exception as e:
print(f"❌ Помилка перевірки RBAC: {e}")
return False

View File

@@ -0,0 +1,8 @@
matrix-nio==0.21.0
nats-py==2.7.0
asyncpg==0.29.0
fastapi==0.109.0
uvicorn==0.27.0.post1
pydantic==2.5.3
python-dotenv==1.0.1
aiohttp==3.9.1