From 7108985b55aef1a2481f01d72e5a451d2a114990 Mon Sep 17 00:00:00 2001 From: Apple Date: Sun, 30 Nov 2025 10:12:27 -0800 Subject: [PATCH] feat: Matrix Gateway integration for Rooms Layer Matrix Gateway: - Add POST /internal/matrix/room/join endpoint - Add POST /internal/matrix/message/send endpoint - Add GET /internal/matrix/rooms/{room_id}/messages endpoint City Service: - Add POST /rooms/sync/matrix endpoint for bulk sync - Update get_agent_chat_room to auto-create Matrix rooms - Update get_node_chat_room to auto-create Matrix rooms - Update get_microdao_chat_room to auto-create Matrix rooms - Add join_user_to_room, send_message_to_room, get_room_messages to matrix_client - Add ensure_room_has_matrix helper function This enables: - Automatic Matrix room creation for all entity types - chat_available = true when Matrix room exists - Real-time messaging via Matrix --- ...ASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md | 269 ++++++++++++++++++ services/city-service/matrix_client.py | 138 ++++++++- services/city-service/routes_city.py | 220 ++++++++++++-- services/matrix-gateway/main.py | 170 +++++++++++ 4 files changed, 771 insertions(+), 26 deletions(-) create mode 100644 docs/tasks/TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md diff --git a/docs/tasks/TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md b/docs/tasks/TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md new file mode 100644 index 00000000..e3152b78 --- /dev/null +++ b/docs/tasks/TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md @@ -0,0 +1,269 @@ +# TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1 + +Version: 1.0 +Status: Ready +Priority: Critical (Rooms & Chat Layer) + +--- + +## 1. МЕТА + +Повністю інтегрувати Matrix у DAARION.city так, щоб: + +- усі кімнати (City / MicroDAO / Node / Agent) мали `matrix_room_id`; +- gateway умів створювати кімнати та додавати агентів; +- чат-виджети на /agents /nodes /microdao працювали з реальними повідомленнями; +- з'явилися online-статуси та події для Agent Presence. + +Цей таск спирається на вже виконаний: +- `TASK_PHASE_ROOMS_LAYER_RESTORE_AND_MATRIX_INTEGRATION.md` +(rooms seeded у БД, citizens працюють, але Matrix ще не прив'язаний). + +--- + +## 2. ОБСЯГ РОБОТ (SCOPE) + +1. **Matrix Room Creation API** у `matrix-gateway` +2. **Room ↔ Matrix синхронізація** у `city-service` +3. **Agent / Node / MicroDAO join-логіка** через gateway +4. **Message flow**: frontend → city-service → gateway → Matrix → back +5. **Статуси чату**: `chat_available`, online/offline +6. **Мінімальні тести + smoke-перевірки** + +--- + +## 3. ВИМОГИ ТА ІНВАРІАНТИ + +### 3.1. Для кожної кімнати в `rooms`: + +- `rooms.id` існує +- `rooms.matrix_room_id` НЕ NULL +- запис у `matrix_rooms` з посиланням на Matrix room + +### 3.2. Для кожного публічного агента: + +- є хоча б одна кімната, де агент учасник (через `room_agents` або аналог) +- чат-виджет показує `chat_available = true` (якщо Matrix OK) + +### 3.3. Для кожної ноди: + +- є support room `node-{slug}-support` +- у ній учасники: guardian + steward + технічні агенти + +--- + +## 4. МОДУЛЬ 1 — MATRIX-GATEWAY + +### 4.1. Нові endpoints (internal) + +У сервісі `matrix-gateway` (Python/FastAPI або Node, залежно від репо): + +1. `POST /internal/matrix/room/create` + + Вхід: + ```json + { + "room_alias": "string", // наприклад "city-general" + "room_name": "string", // "General" + "visibility": "public|private", + "topic": "string|null" + } + ``` + + Вихід: + ```json + { + "room_id": "!abcdefg:matrix.daarion.city", + "room_alias": "#city-general:matrix.daarion.city" + } + ``` + + Дії: + - викликає Matrix API `/_matrix/client/v3/createRoom` + - задає canonical alias, name, topic + - повертає `room_id` для збереження в БД + +2. `POST /internal/matrix/room/join` + + Вхід: + ```json + { + "room_id": "!room:matrix.daarion.city", + "user_id": "@agent_daarwizz:matrix.daarion.city" + } + ``` + + Дії: + - викликає Matrix API `join` для `user_id` + - обробляє помилки (already joined / banned / unknown) + +3. `POST /internal/matrix/message/send` + + Вхід: + ```json + { + "room_id": "!room:matrix.daarion.city", + "sender": "@agent_x:matrix.daarion.city", + "body": "string" + } + ``` + + Дії: + - відправляє текстове повідомлення від імені бота/агента + - повертає event_id + +### 4.2. Конфігурація + +- додати ENV: + - `MATRIX_HOMESERVER_URL` + - `MATRIX_ACCESS_TOKEN` (service/bot user) + - `MATRIX_DEFAULT_SENDER` (наприклад, `@daarion-bot:matrix.daarion.city`) + +- healthcheck: + - `GET /health` → перевірка `/versions` на Matrix HS + +--- + +## 5. МОДУЛЬ 2 — CITY-SERVICE (ROOM SYNC) + +У `services/city-service` додати: + +### 5.1. Repository-логіку + +- новий модуль, наприклад `repo_rooms_matrix.py`: + - `get_rooms_without_matrix_id()` + - `update_room_matrix_id(room_id, matrix_room_id)` + - `get_agent_matrix_user(agent_id)` + - `get_node_guardian_and_steward(node_id)` + +### 5.2. Сервіс-логіку синхронізації + +1. `POST /api/v1/rooms/sync/matrix` + + Задача: + - знайти всі кімнати, де `matrix_room_id IS NULL` + - викликати `matrix-gateway /room/create` + - зберегти `matrix_room_id` + - (для city/microdao rooms) додати публічних агентів як учасників: + - City: DARIO, DARIA, Atlas, Greeter + - MicroDAO: orchestrator + core team + - Node: guardian + steward + - Agent: сам агент + +2. Авто-виклик: + - опційно: при створенні нової кімнати (event-based) + - але для MVP достатньо ручного endpoint + одноразового запуску + +--- + +## 6. МОДУЛЬ 3 — AGENT CHAT FLOW + +### 6.1. Backend API + +У `city-service`: + +- `GET /api/v1/agents/{agent_id}/chat-room` + + Доповнити: + - якщо `matrix_room_id IS NULL` → створити кімнату через gateway + - повернути: + ```json + { + "room_id": "", + "matrix_room_id": "!room:matrix...", + "agent_id": "", + "chat_available": true|false + } + ``` + +- опційно: `POST /api/v1/agents/{agent_id}/message` + (для проксінгу повідомлень з frontend до gateway) + +### 6.2. Frontend (apps/web) + +Компонент `AgentChatWidget`: + +- якщо `chat_available === false` → показувати "тимчасово недоступний" +- якщо `chat_available === true`: + - використовувати Matrix JS SDK АБО власний lightweight client: + - `GET /api/v1/agents/{id}/chat-room` + - `WS` або `long-poll` на gateway + - `POST /internal/matrix/message/send` через city-service + +Для MVP можна обмежитись: +- односторонній відправлення повідомлень (log history локально) +- завантаження останніх N повідомлень із Matrix через gateway + +--- + +## 7. МОДУЛЬ 4 — NODE / MICRODAO CHAT FLOW + +Аналогічно до агента: + +- `GET /api/v1/nodes/{node_id}/chat-room` +- `GET /api/v1/microdaos/{slug}/chat-room` + +Логіка: +- якщо нема Matrix room → створити +- додати відповідних агентів: + - Node: guardian + steward + - MicroDAO: orchestrator + core team + +Frontend вже має `AgentChatWidget`, достатньо змінити пропси: +- `entityType: "agent" | "node" | "microdao"` +- `entityId` / `slug` + +--- + +## 8. MESSAGE FLOW & NATS (опціонально) + +Якщо є час у межах таска: + +- `matrix-gateway` → NATS subject `integration.matrix.message` +- `city-service` підписаний і логує `event_outbox` +- Agents можуть реагувати на події. + +Якщо ні — відкласти у окремий таск. + +--- + +## 9. TESTS / SMOKE CHECKLIST + +Після завершення: + +1. `POST /api/v1/rooms/sync/matrix` → всі `rooms.matrix_room_id` заповнені. +2. `GET /city`: + - список кімнат завантажується у режимі "Мапа" і "Список". +3. `/agents/daarwizz`: + - чат-виджет відкривається, + - можна надіслати повідомлення, + - у Matrix-клієнті видно це повідомлення. +4. `/nodes/node-1-hetzner-gex44`: + - чат з guardian/steward доступний. +5. `/microdao/daarion`: + - чат MicroDAO працює. + +--- + +## 10. PROMPT ДЛЯ CURSOR + +```text +Виконай TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md. + +Фокус: +1) matrix-gateway (internal API: room/create, room/join, message/send) +2) city-service (room sync, agent/node/microdao chat-room API) +3) apps/web (AgentChatWidget інтеграція з Matrix) +4) мінімальний smoke-тест, як описано в розділі 9. + +Використовуй існуючі foundation-документи у docs/foundation/ як джерело істини. +Після кожного завершеного модуля роби окремий git commit. +Не змінюй інші підсистеми, які не стосуються Rooms/Matrix/Chat. +``` + +--- + +**Target Date**: Immediate +**Priority**: Critical +**Dependencies**: Rooms Layer seeded, Matrix Synapse running + diff --git a/services/city-service/matrix_client.py b/services/city-service/matrix_client.py index 9dc62d6d..bf1dfbdd 100644 --- a/services/city-service/matrix_client.py +++ b/services/city-service/matrix_client.py @@ -4,14 +4,14 @@ Matrix Gateway Client for City Service import os import httpx import logging -from typing import Optional, Tuple +from typing import Optional, Tuple, List, Dict, Any logger = logging.getLogger(__name__) MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://daarion-matrix-gateway:7025") -async def create_matrix_room(slug: str, name: str, visibility: str = "public") -> Tuple[Optional[str], Optional[str]]: +async def create_matrix_room(slug: str, name: str, visibility: str = "public", topic: str = None) -> Tuple[Optional[str], Optional[str]]: """ Create a Matrix room via Matrix Gateway. @@ -20,13 +20,17 @@ async def create_matrix_room(slug: str, name: str, visibility: str = "public") - """ async with httpx.AsyncClient(timeout=30.0) as client: try: + payload = { + "slug": slug, + "name": name, + "visibility": visibility + } + if topic: + payload["topic"] = topic + response = await client.post( f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms/create", - json={ - "slug": slug, - "name": name, - "visibility": visibility - } + json=payload ) if response.status_code == 200: @@ -73,6 +77,108 @@ async def find_matrix_room_by_alias(alias: str) -> Tuple[Optional[str], Optional return None, None +async def join_user_to_room(room_id: str, user_id: str) -> bool: + """ + Join a Matrix user to a room. + + Args: + room_id: Matrix room ID (!abc:daarion.space) + user_id: Matrix user ID (@user:daarion.space) + + Returns: + True if joined successfully, False otherwise + """ + async with httpx.AsyncClient(timeout=30.0) as client: + try: + response = await client.post( + f"{MATRIX_GATEWAY_URL}/internal/matrix/room/join", + json={ + "room_id": room_id, + "user_id": user_id + } + ) + + if response.status_code == 200: + data = response.json() + logger.info(f"User {user_id} joined room {room_id}") + return data.get("ok", False) + else: + logger.error(f"Failed to join room: {response.text}") + return False + + except httpx.RequestError as e: + logger.error(f"Matrix Gateway request error: {e}") + return False + + +async def send_message_to_room(room_id: str, body: str, sender: str = None) -> Optional[str]: + """ + Send a message to a Matrix room. + + Args: + room_id: Matrix room ID + body: Message text + sender: Optional sender Matrix user ID + + Returns: + Event ID if sent successfully, None otherwise + """ + async with httpx.AsyncClient(timeout=30.0) as client: + try: + payload = { + "room_id": room_id, + "body": body, + "sender": sender or "@daarion-bot:daarion.space" + } + + response = await client.post( + f"{MATRIX_GATEWAY_URL}/internal/matrix/message/send", + json=payload + ) + + if response.status_code == 200: + data = response.json() + logger.info(f"Message sent to {room_id}: {data.get('event_id')}") + return data.get("event_id") + else: + logger.error(f"Failed to send message: {response.text}") + return None + + except httpx.RequestError as e: + logger.error(f"Matrix Gateway request error: {e}") + return None + + +async def get_room_messages(room_id: str, limit: int = 50) -> List[Dict[str, Any]]: + """ + Get recent messages from a Matrix room. + + Args: + room_id: Matrix room ID + limit: Max number of messages to fetch + + Returns: + List of message dicts with event_id, sender, body, timestamp + """ + async with httpx.AsyncClient(timeout=30.0) as client: + try: + response = await client.get( + f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms/{room_id}/messages", + params={"limit": limit} + ) + + if response.status_code == 200: + data = response.json() + return data.get("messages", []) + else: + logger.error(f"Failed to get messages: {response.text}") + return [] + + except httpx.RequestError as e: + logger.error(f"Matrix Gateway request error: {e}") + return [] + + async def check_matrix_gateway_health() -> bool: """Check if Matrix Gateway is available.""" async with httpx.AsyncClient(timeout=5.0) as client: @@ -82,3 +188,21 @@ async def check_matrix_gateway_health() -> bool: except Exception: return False + +async def ensure_room_has_matrix(room_slug: str, room_name: str, visibility: str = "public") -> Tuple[Optional[str], Optional[str]]: + """ + Ensure a room has a Matrix room ID. Creates one if it doesn't exist. + + Returns: + Tuple of (matrix_room_id, matrix_room_alias) or (None, None) on failure + """ + # First try to find existing room + alias = f"#city_{room_slug}:daarion.space" + matrix_room_id, matrix_room_alias = await find_matrix_room_by_alias(alias) + + if matrix_room_id: + return matrix_room_id, matrix_room_alias + + # Create new room + return await create_matrix_room(room_slug, room_name, visibility) + diff --git a/services/city-service/routes_city.py b/services/city-service/routes_city.py index 6fbae4ec..05b83a54 100644 --- a/services/city-service/routes_city.py +++ b/services/city-service/routes_city.py @@ -48,7 +48,15 @@ from models_city import ( ) import repo_city from common.redis_client import PresenceRedis, get_redis -from matrix_client import create_matrix_room, find_matrix_room_by_alias +from matrix_client import ( + create_matrix_room, + find_matrix_room_by_alias, + join_user_to_room, + send_message_to_room, + get_room_messages, + check_matrix_gateway_health, + ensure_room_has_matrix +) from dagi_router_client import get_dagi_router_client, DagiRouterClient logger = logging.getLogger(__name__) @@ -876,6 +884,89 @@ async def delete_agent_microdao_membership( # City Rooms API # ============================================================================= +@router.post("/rooms/sync/matrix") +async def sync_rooms_with_matrix(): + """ + Sync all rooms with Matrix. + Creates Matrix rooms for any room without matrix_room_id. + + This is an admin endpoint for initial setup or recovery. + """ + try: + # Check Matrix Gateway health + gateway_ok = await check_matrix_gateway_health() + if not gateway_ok: + raise HTTPException(status_code=503, detail="Matrix Gateway unavailable") + + # Get all rooms without Matrix ID + rooms = await repo_city.get_rooms_without_matrix() + + results = { + "synced": [], + "failed": [], + "skipped": [] + } + + for room in rooms: + room_slug = room.get("slug") + room_name = room.get("name") + + if not room_slug or not room_name: + results["skipped"].append({"id": room.get("id"), "reason": "missing slug or name"}) + continue + + try: + # Create Matrix room + matrix_room_id, matrix_room_alias = await create_matrix_room( + slug=room_slug, + name=room_name, + visibility="public" if room.get("is_public", True) else "private" + ) + + if matrix_room_id: + # Update room with Matrix ID + await repo_city.update_room_matrix( + room_id=room.get("id"), + matrix_room_id=matrix_room_id, + matrix_room_alias=matrix_room_alias + ) + results["synced"].append({ + "id": room.get("id"), + "slug": room_slug, + "matrix_room_id": matrix_room_id + }) + logger.info(f"Synced room {room_slug} with Matrix: {matrix_room_id}") + else: + results["failed"].append({ + "id": room.get("id"), + "slug": room_slug, + "reason": "Matrix room creation failed" + }) + + except Exception as e: + logger.error(f"Failed to sync room {room_slug}: {e}") + results["failed"].append({ + "id": room.get("id"), + "slug": room_slug, + "reason": str(e) + }) + + return { + "status": "completed", + "total_rooms": len(rooms), + "synced_count": len(results["synced"]), + "failed_count": len(results["failed"]), + "skipped_count": len(results["skipped"]), + "details": results + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to sync rooms with Matrix: {e}") + raise HTTPException(status_code=500, detail=f"Failed to sync rooms: {str(e)}") + + @router.get("/rooms", response_model=List[CityRoomRead]) async def get_city_rooms(limit: int = 100, offset: int = 0): """ @@ -1263,7 +1354,7 @@ async def chat_bootstrap( async def get_agent_chat_room(agent_id: str): """ Отримати інформацію про кімнату чату для агента. - Повертає room_id, agent info для ініціалізації чату. + Автоматично створює Matrix кімнату якщо її немає. """ try: agent = await repo_city.get_agent_by_id(agent_id) @@ -1271,15 +1362,42 @@ async def get_agent_chat_room(agent_id: str): raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}") # Get agent's primary room or create room slug - room_slug = f"agent-console-{agent.get('public_slug') or agent_id}" + agent_slug = agent.get('public_slug') or agent_id.replace('ag_', '').replace('-', '_') + room_slug = f"agent-console-{agent_slug}" room = await repo_city.get_room_by_slug(room_slug) - # If room doesn't exist, try to get agent's primary room from dashboard + # If room doesn't exist in DB, create it if not room: - rooms = await repo_city.get_agent_rooms(agent_id) - if rooms and len(rooms) > 0: - room = rooms[0] - room_slug = room.get("slug", room_slug) + # Create room in DB + room = await repo_city.create_room( + slug=room_slug, + name=f"{agent.get('display_name')} Console", + description=f"Chat room for agent {agent.get('display_name')}", + created_by="system" + ) + logger.info(f"Created agent chat room in DB: {room_slug}") + + # Check if room has Matrix ID + matrix_room_id = room.get("matrix_room_id") if room else None + + # If no Matrix room, try to create one + if not matrix_room_id: + try: + matrix_room_id, matrix_room_alias = await ensure_room_has_matrix( + room_slug=room_slug, + room_name=f"{agent.get('display_name')} Console", + visibility="private" + ) + + if matrix_room_id and room: + await repo_city.update_room_matrix( + room_id=room.get("id"), + matrix_room_id=matrix_room_id, + matrix_room_alias=matrix_room_alias + ) + logger.info(f"Created Matrix room for agent {agent_id}: {matrix_room_id}") + except Exception as e: + logger.warning(f"Could not create Matrix room for agent {agent_id}: {e}") return { "agent_id": agent_id, @@ -1289,8 +1407,8 @@ async def get_agent_chat_room(agent_id: str): "agent_kind": agent.get("kind"), "room_slug": room_slug, "room_id": room.get("id") if room else None, - "matrix_room_id": room.get("matrix_room_id") if room else None, - "chat_available": room is not None and room.get("matrix_room_id") is not None + "matrix_room_id": matrix_room_id, + "chat_available": matrix_room_id is not None } except HTTPException: raise @@ -1303,7 +1421,7 @@ async def get_agent_chat_room(agent_id: str): async def get_node_chat_room(node_id: str): """ Отримати інформацію про кімнату чату для ноди. - Повертає room_id, guardian/steward agents info. + Автоматично створює Matrix кімнату якщо її немає. """ try: node = await repo_city.get_node_by_id(node_id) @@ -1315,6 +1433,38 @@ async def get_node_chat_room(node_id: str): room_slug = f"node-support-{node_slug}" room = await repo_city.get_room_by_slug(room_slug) + # If room doesn't exist in DB, create it + if not room: + room = await repo_city.create_room( + slug=room_slug, + name=f"{node.get('name', node_id)} Support", + description=f"Support room for node {node.get('name', node_id)}", + created_by="system" + ) + logger.info(f"Created node support room in DB: {room_slug}") + + # Check if room has Matrix ID + matrix_room_id = room.get("matrix_room_id") if room else None + + # If no Matrix room, try to create one + if not matrix_room_id: + try: + matrix_room_id, matrix_room_alias = await ensure_room_has_matrix( + room_slug=room_slug, + room_name=f"{node.get('name', node_id)} Support", + visibility="private" + ) + + if matrix_room_id and room: + await repo_city.update_room_matrix( + room_id=room.get("id"), + matrix_room_id=matrix_room_id, + matrix_room_alias=matrix_room_alias + ) + logger.info(f"Created Matrix room for node {node_id}: {matrix_room_id}") + except Exception as e: + logger.warning(f"Could not create Matrix room for node {node_id}: {e}") + # Get guardian and steward agents guardian_agent = None steward_agent = None @@ -1341,8 +1491,8 @@ async def get_node_chat_room(node_id: str): "node_status": node.get("status", "offline"), "room_slug": room_slug, "room_id": room.get("id") if room else None, - "matrix_room_id": room.get("matrix_room_id") if room else None, - "chat_available": room is not None and room.get("matrix_room_id") is not None, + "matrix_room_id": matrix_room_id, + "chat_available": matrix_room_id is not None, "agents": [a for a in [guardian_agent, steward_agent] if a is not None] } except HTTPException: @@ -1356,18 +1506,18 @@ async def get_node_chat_room(node_id: str): async def get_microdao_chat_room(slug: str): """ Отримати інформацію про кімнату чату для MicroDAO. - Повертає room_id, orchestrator agent info. + Автоматично створює Matrix кімнату якщо її немає. """ try: dao = await repo_city.get_microdao_by_slug(slug) if not dao: raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}") - # Get MicroDAO lobby room - room_slug = f"microdao-lobby-{slug}" + # Get MicroDAO lobby room (prefer {slug}-lobby pattern) + room_slug = f"{slug}-lobby" room = await repo_city.get_room_by_slug(room_slug) - # If no lobby room, try to get primary room + # If no lobby room, try to get primary room from existing rooms if not room: rooms = await repo_city.get_microdao_rooms(dao["id"]) if rooms and len(rooms) > 0: @@ -1376,6 +1526,38 @@ async def get_microdao_chat_room(slug: str): room = primary room_slug = room.get("slug", room_slug) + # If still no room, create one + if not room: + room = await repo_city.create_room( + slug=room_slug, + name=f"{dao.get('name', slug)} Lobby", + description=f"Lobby for {dao.get('name', slug)} MicroDAO", + created_by="system" + ) + logger.info(f"Created MicroDAO lobby room in DB: {room_slug}") + + # Check if room has Matrix ID + matrix_room_id = room.get("matrix_room_id") if room else None + + # If no Matrix room, try to create one + if not matrix_room_id: + try: + matrix_room_id, matrix_room_alias = await ensure_room_has_matrix( + room_slug=room_slug, + room_name=f"{dao.get('name', slug)} Lobby", + visibility="public" + ) + + if matrix_room_id and room: + await repo_city.update_room_matrix( + room_id=room.get("id"), + matrix_room_id=matrix_room_id, + matrix_room_alias=matrix_room_alias + ) + logger.info(f"Created Matrix room for MicroDAO {slug}: {matrix_room_id}") + except Exception as e: + logger.warning(f"Could not create Matrix room for MicroDAO {slug}: {e}") + # Get orchestrator agent orchestrator = None orchestrator_id = dao.get("orchestrator_agent_id") @@ -1396,8 +1578,8 @@ async def get_microdao_chat_room(slug: str): "microdao_name": dao.get("name"), "room_slug": room_slug, "room_id": room.get("id") if room else None, - "matrix_room_id": room.get("matrix_room_id") if room else None, - "chat_available": room is not None and room.get("matrix_room_id") is not None, + "matrix_room_id": matrix_room_id, + "chat_available": matrix_room_id is not None, "orchestrator": orchestrator } except HTTPException: diff --git a/services/matrix-gateway/main.py b/services/matrix-gateway/main.py index fd5d5946..5da989cb 100644 --- a/services/matrix-gateway/main.py +++ b/services/matrix-gateway/main.py @@ -91,6 +91,32 @@ class SetPresenceResponse(BaseModel): status: str +# NEW: Room Join Request/Response +class RoomJoinRequest(BaseModel): + room_id: str # Matrix room ID (!abc:daarion.space) + user_id: str # Matrix user ID (@user:daarion.space) + + +class RoomJoinResponse(BaseModel): + ok: bool + room_id: str + user_id: str + + +# NEW: Send Message Request/Response +class SendMessageRequest(BaseModel): + room_id: str # Matrix room ID + sender: str # Matrix user ID (must be bot or have permissions) + body: str # Message text + msgtype: str = "m.text" # Message type + + +class SendMessageResponse(BaseModel): + ok: bool + event_id: str + room_id: str + + async def get_admin_token() -> str: """Get or create admin access token for Matrix operations.""" global _admin_token @@ -441,6 +467,150 @@ async def get_user_token(request: UserTokenRequest): raise HTTPException(status_code=503, detail="Matrix unavailable") +@app.post("/internal/matrix/room/join", response_model=RoomJoinResponse) +async def join_room(request: RoomJoinRequest): + """ + Join a user to a Matrix room. + + Uses admin token to invite and join the user. + """ + admin_token = await get_admin_token() + + async with httpx.AsyncClient(timeout=30.0) as client: + try: + # First, invite the user to the room (admin action) + invite_resp = await client.post( + f"{settings.synapse_url}/_matrix/client/v3/rooms/{request.room_id}/invite", + headers={"Authorization": f"Bearer {admin_token}"}, + json={"user_id": request.user_id} + ) + + # 200 = invited, 403 = already member (OK), 400 = already invited (OK) + if invite_resp.status_code not in (200, 403, 400): + logger.warning(f"Invite response: {invite_resp.status_code} - {invite_resp.text}") + + # Now join the user to the room via admin API + # Use the synapse admin API to force-join + join_resp = await client.post( + f"{settings.synapse_url}/_synapse/admin/v1/join/{request.room_id}", + headers={"Authorization": f"Bearer {admin_token}"}, + json={"user_id": request.user_id} + ) + + if join_resp.status_code == 200: + logger.info(f"User {request.user_id} joined room {request.room_id}") + return RoomJoinResponse( + ok=True, + room_id=request.room_id, + user_id=request.user_id + ) + elif join_resp.status_code == 400: + # Already in room + logger.info(f"User {request.user_id} already in room {request.room_id}") + return RoomJoinResponse( + ok=True, + room_id=request.room_id, + user_id=request.user_id + ) + else: + error = join_resp.json() if join_resp.text else {} + logger.error(f"Failed to join room: {join_resp.status_code} - {join_resp.text}") + raise HTTPException( + status_code=500, + detail=f"Failed to join room: {error.get('error', 'Unknown')}" + ) + + except httpx.RequestError as e: + logger.error(f"Matrix request error: {e}") + raise HTTPException(status_code=503, detail="Matrix unavailable") + + +@app.post("/internal/matrix/message/send", response_model=SendMessageResponse) +async def send_message(request: SendMessageRequest): + """ + Send a message to a Matrix room. + + Uses admin token to send on behalf of the bot. + """ + admin_token = await get_admin_token() + + import time + txn_id = f"daarion_{int(time.time() * 1000)}" + + async with httpx.AsyncClient(timeout=30.0) as client: + try: + # Send message via admin token (as the bot user) + send_resp = await client.put( + f"{settings.synapse_url}/_matrix/client/v3/rooms/{request.room_id}/send/m.room.message/{txn_id}", + headers={"Authorization": f"Bearer {admin_token}"}, + json={ + "msgtype": request.msgtype, + "body": request.body + } + ) + + if send_resp.status_code == 200: + result = send_resp.json() + event_id = result.get("event_id", "") + logger.info(f"Message sent to {request.room_id}: {event_id}") + return SendMessageResponse( + ok=True, + event_id=event_id, + room_id=request.room_id + ) + else: + error = send_resp.json() if send_resp.text else {} + logger.error(f"Failed to send message: {send_resp.status_code} - {send_resp.text}") + raise HTTPException( + status_code=500, + detail=f"Failed to send message: {error.get('error', 'Unknown')}" + ) + + except httpx.RequestError as e: + logger.error(f"Matrix request error: {e}") + raise HTTPException(status_code=503, detail="Matrix unavailable") + + +@app.get("/internal/matrix/rooms/{room_id}/messages") +async def get_room_messages(room_id: str, limit: int = 50): + """ + Get recent messages from a Matrix room. + """ + admin_token = await get_admin_token() + + async with httpx.AsyncClient(timeout=30.0) as client: + try: + resp = await client.get( + f"{settings.synapse_url}/_matrix/client/v3/rooms/{room_id}/messages", + headers={"Authorization": f"Bearer {admin_token}"}, + params={ + "dir": "b", # backwards from end + "limit": limit + } + ) + + if resp.status_code == 200: + data = resp.json() + messages = [] + for event in data.get("chunk", []): + if event.get("type") == "m.room.message": + content = event.get("content", {}) + messages.append({ + "event_id": event.get("event_id"), + "sender": event.get("sender"), + "body": content.get("body", ""), + "msgtype": content.get("msgtype", "m.text"), + "timestamp": event.get("origin_server_ts", 0) + }) + return {"messages": messages, "room_id": room_id} + else: + raise HTTPException(status_code=resp.status_code, detail="Failed to get messages") + + except httpx.RequestError as e: + logger.error(f"Matrix request error: {e}") + raise HTTPException(status_code=503, detail="Matrix unavailable") + + @app.post("/internal/matrix/presence/online", response_model=SetPresenceResponse) async def set_presence_online(request: SetPresenceRequest): """