Files
microdao-daarion/infrastructure/matrix-gateway/gateway/main.py
Apple a0c3c0cbb5 🚀 Matrix Gateway: базова реалізація v1
- Matrix Client (підключення та синхронізація)
- RBAC Checker (перевірка прав через Postgres)
- Job Creator (створення jobs з команд)
- NATS Publisher (публікація jobs у streams)
- K8s deployment
- README з документацією

Команди: !embed, !retrieve, !summarize

TODO: Реальна інтеграція з Matrix homeserver, статуси результатів
2026-01-10 10:40:18 -08:00

188 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
Matrix Gateway — зв'язок між Matrix rooms та NATS jobs
"""
import asyncio
import os
import signal
import sys
from typing import Optional
from gateway.matrix_client import MatrixClient
from gateway.nats_publisher import NATSPublisher
from gateway.rbac import RBACChecker
from gateway.job_creator import JobCreator
class MatrixGateway:
def __init__(self):
self.matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "https://matrix.org")
self.matrix_user = os.getenv("MATRIX_USER", "")
self.matrix_password = os.getenv("MATRIX_PASSWORD", "")
self.matrix_room_id = os.getenv("MATRIX_ROOM_ID", "")
self.nats_url = os.getenv("NATS_URL", "nats://nats-client.nats:4222")
self.postgres_url = os.getenv("POSTGRES_URL", "")
self.matrix_client: Optional[MatrixClient] = None
self.nats_publisher: Optional[NATSPublisher] = None
self.rbac_checker: Optional[RBACChecker] = None
self.job_creator: Optional[JobCreator] = None
self.running = False
async def start(self):
"""Запуск Matrix Gateway"""
print("🚀 Matrix Gateway запускається...")
print(f" Matrix: {self.matrix_homeserver}")
print(f" NATS: {self.nats_url}")
# Ініціалізація компонентів
self.rbac_checker = RBACChecker(self.postgres_url)
self.nats_publisher = NATSPublisher(self.nats_url)
self.job_creator = JobCreator(self.nats_publisher)
self.matrix_client = MatrixClient(
homeserver=self.matrix_homeserver,
user=self.matrix_user,
password=self.matrix_password,
room_id=self.matrix_room_id,
message_handler=self.handle_matrix_message
)
# Підключення
await self.rbac_checker.connect()
await self.nats_publisher.connect()
await self.matrix_client.connect()
# Синхронізація Matrix
await self.matrix_client.sync()
self.running = True
print("✅ Matrix Gateway запущено")
async def handle_matrix_message(self, event):
"""Обробка повідомлення з Matrix"""
if event.type != "m.room.message":
return
sender = event.sender
content = event.content
body = content.get("body", "").strip()
# Перевірка, чи це команда (починається з !)
if not body.startswith("!"):
return
print(f"📨 Команда від {sender}: {body}")
# RBAC перевірка
if not await self.rbac_checker.check_permission(sender, "execute_job"):
await self.matrix_client.send_message(f"❌ Доступ заборонено для {sender}")
return
# Парсинг команди
command = self._parse_command(body)
if not command:
await self.matrix_client.send_message("❌ Невірна команда. Використання: !embed <text> | !retrieve <query> | !summarize <thread_id>")
return
# Створення job
try:
job = await self.job_creator.create_job(command)
# Публікація в NATS
await self.nats_publisher.publish_job(job)
# Підтвердження в Matrix
await self.matrix_client.send_message(
f"✅ Job створено: `{job['job_id']}`\n"
f"Тип: {job['type']}\n"
f"Пріоритет: {job['priority']}"
)
except Exception as e:
print(f"❌ Помилка створення job: {e}")
await self.matrix_client.send_message(f"❌ Помилка: {e}")
def _parse_command(self, body: str) -> Optional[dict]:
"""Парсинг команди з Matrix повідомлення"""
parts = body.split(None, 2)
if len(parts) < 2:
return None
cmd = parts[0][1:] # Видаляємо !
args = parts[1] if len(parts) > 1 else ""
if cmd == "embed":
return {
"type": "embed",
"priority": "online",
"input": {
"text": [args],
"model": "cohere/embed-multilingual-v3.0",
"dims": 1024
}
}
elif cmd == "retrieve":
return {
"type": "retrieve",
"priority": "online",
"input": {
"query": args
}
}
elif cmd == "summarize":
return {
"type": "summarize",
"priority": "offline",
"input": {
"thread_id": args
}
}
else:
return None
async def stop(self):
"""Зупинка Matrix Gateway"""
print("🛑 Зупинка Matrix Gateway...")
self.running = False
if self.matrix_client:
await self.matrix_client.disconnect()
if self.nats_publisher:
await self.nats_publisher.disconnect()
if self.rbac_checker:
await self.rbac_checker.disconnect()
print("✅ Matrix Gateway зупинено")
def setup_signal_handlers(self):
"""Налаштування обробників сигналів"""
def signal_handler(sig, frame):
print(f"\n📡 Отримано сигнал {sig}")
asyncio.create_task(self.stop())
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def main():
gateway = MatrixGateway()
gateway.setup_signal_handlers()
try:
await gateway.start()
# Чекаємо поки працює
while gateway.running:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
await gateway.stop()
if __name__ == "__main__":
asyncio.run(main())