feat: Matrix Gateway integration for Rooms Layer

Matrix Gateway:
- Add POST /internal/matrix/room/join endpoint
- Add POST /internal/matrix/message/send endpoint
- Add GET /internal/matrix/rooms/{room_id}/messages endpoint

City Service:
- Add POST /rooms/sync/matrix endpoint for bulk sync
- Update get_agent_chat_room to auto-create Matrix rooms
- Update get_node_chat_room to auto-create Matrix rooms
- Update get_microdao_chat_room to auto-create Matrix rooms
- Add join_user_to_room, send_message_to_room, get_room_messages to matrix_client
- Add ensure_room_has_matrix helper function

This enables:
- Automatic Matrix room creation for all entity types
- chat_available = true when Matrix room exists
- Real-time messaging via Matrix
This commit is contained in:
Apple
2025-11-30 10:12:27 -08:00
parent 361d114a43
commit 7108985b55
4 changed files with 771 additions and 26 deletions

View File

@@ -0,0 +1,269 @@
# TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1
Version: 1.0
Status: Ready
Priority: Critical (Rooms & Chat Layer)
---
## 1. МЕТА
Повністю інтегрувати Matrix у DAARION.city так, щоб:
- усі кімнати (City / MicroDAO / Node / Agent) мали `matrix_room_id`;
- gateway умів створювати кімнати та додавати агентів;
- чат-виджети на /agents /nodes /microdao працювали з реальними повідомленнями;
- з'явилися online-статуси та події для Agent Presence.
Цей таск спирається на вже виконаний:
- `TASK_PHASE_ROOMS_LAYER_RESTORE_AND_MATRIX_INTEGRATION.md`
(rooms seeded у БД, citizens працюють, але Matrix ще не прив'язаний).
---
## 2. ОБСЯГ РОБОТ (SCOPE)
1. **Matrix Room Creation API** у `matrix-gateway`
2. **Room ↔ Matrix синхронізація** у `city-service`
3. **Agent / Node / MicroDAO join-логіка** через gateway
4. **Message flow**: frontend → city-service → gateway → Matrix → back
5. **Статуси чату**: `chat_available`, online/offline
6. **Мінімальні тести + smoke-перевірки**
---
## 3. ВИМОГИ ТА ІНВАРІАНТИ
### 3.1. Для кожної кімнати в `rooms`:
- `rooms.id` існує
- `rooms.matrix_room_id` НЕ NULL
- запис у `matrix_rooms` з посиланням на Matrix room
### 3.2. Для кожного публічного агента:
- є хоча б одна кімната, де агент учасник (через `room_agents` або аналог)
- чат-виджет показує `chat_available = true` (якщо Matrix OK)
### 3.3. Для кожної ноди:
- є support room `node-{slug}-support`
- у ній учасники: guardian + steward + технічні агенти
---
## 4. МОДУЛЬ 1 — MATRIX-GATEWAY
### 4.1. Нові endpoints (internal)
У сервісі `matrix-gateway` (Python/FastAPI або Node, залежно від репо):
1. `POST /internal/matrix/room/create`
Вхід:
```json
{
"room_alias": "string", // наприклад "city-general"
"room_name": "string", // "General"
"visibility": "public|private",
"topic": "string|null"
}
```
Вихід:
```json
{
"room_id": "!abcdefg:matrix.daarion.city",
"room_alias": "#city-general:matrix.daarion.city"
}
```
Дії:
- викликає Matrix API `/_matrix/client/v3/createRoom`
- задає canonical alias, name, topic
- повертає `room_id` для збереження в БД
2. `POST /internal/matrix/room/join`
Вхід:
```json
{
"room_id": "!room:matrix.daarion.city",
"user_id": "@agent_daarwizz:matrix.daarion.city"
}
```
Дії:
- викликає Matrix API `join` для `user_id`
- обробляє помилки (already joined / banned / unknown)
3. `POST /internal/matrix/message/send`
Вхід:
```json
{
"room_id": "!room:matrix.daarion.city",
"sender": "@agent_x:matrix.daarion.city",
"body": "string"
}
```
Дії:
- відправляє текстове повідомлення від імені бота/агента
- повертає event_id
### 4.2. Конфігурація
- додати ENV:
- `MATRIX_HOMESERVER_URL`
- `MATRIX_ACCESS_TOKEN` (service/bot user)
- `MATRIX_DEFAULT_SENDER` (наприклад, `@daarion-bot:matrix.daarion.city`)
- healthcheck:
- `GET /health` → перевірка `/versions` на Matrix HS
---
## 5. МОДУЛЬ 2 — CITY-SERVICE (ROOM SYNC)
У `services/city-service` додати:
### 5.1. Repository-логіку
- новий модуль, наприклад `repo_rooms_matrix.py`:
- `get_rooms_without_matrix_id()`
- `update_room_matrix_id(room_id, matrix_room_id)`
- `get_agent_matrix_user(agent_id)`
- `get_node_guardian_and_steward(node_id)`
### 5.2. Сервіс-логіку синхронізації
1. `POST /api/v1/rooms/sync/matrix`
Задача:
- знайти всі кімнати, де `matrix_room_id IS NULL`
- викликати `matrix-gateway /room/create`
- зберегти `matrix_room_id`
- (для city/microdao rooms) додати публічних агентів як учасників:
- City: DARIO, DARIA, Atlas, Greeter
- MicroDAO: orchestrator + core team
- Node: guardian + steward
- Agent: сам агент
2. Авто-виклик:
- опційно: при створенні нової кімнати (event-based)
- але для MVP достатньо ручного endpoint + одноразового запуску
---
## 6. МОДУЛЬ 3 — AGENT CHAT FLOW
### 6.1. Backend API
У `city-service`:
- `GET /api/v1/agents/{agent_id}/chat-room`
Доповнити:
- якщо `matrix_room_id IS NULL` → створити кімнату через gateway
- повернути:
```json
{
"room_id": "<uuid>",
"matrix_room_id": "!room:matrix...",
"agent_id": "<uuid>",
"chat_available": true|false
}
```
- опційно: `POST /api/v1/agents/{agent_id}/message`
(для проксінгу повідомлень з frontend до gateway)
### 6.2. Frontend (apps/web)
Компонент `AgentChatWidget`:
- якщо `chat_available === false` → показувати "тимчасово недоступний"
- якщо `chat_available === true`:
- використовувати Matrix JS SDK АБО власний lightweight client:
- `GET /api/v1/agents/{id}/chat-room`
- `WS` або `long-poll` на gateway
- `POST /internal/matrix/message/send` через city-service
Для MVP можна обмежитись:
- односторонній відправлення повідомлень (log history локально)
- завантаження останніх N повідомлень із Matrix через gateway
---
## 7. МОДУЛЬ 4 — NODE / MICRODAO CHAT FLOW
Аналогічно до агента:
- `GET /api/v1/nodes/{node_id}/chat-room`
- `GET /api/v1/microdaos/{slug}/chat-room`
Логіка:
- якщо нема Matrix room → створити
- додати відповідних агентів:
- Node: guardian + steward
- MicroDAO: orchestrator + core team
Frontend вже має `AgentChatWidget`, достатньо змінити пропси:
- `entityType: "agent" | "node" | "microdao"`
- `entityId` / `slug`
---
## 8. MESSAGE FLOW & NATS (опціонально)
Якщо є час у межах таска:
- `matrix-gateway` → NATS subject `integration.matrix.message`
- `city-service` підписаний і логує `event_outbox`
- Agents можуть реагувати на події.
Якщо ні — відкласти у окремий таск.
---
## 9. TESTS / SMOKE CHECKLIST
Після завершення:
1. `POST /api/v1/rooms/sync/matrix` → всі `rooms.matrix_room_id` заповнені.
2. `GET /city`:
- список кімнат завантажується у режимі "Мапа" і "Список".
3. `/agents/daarwizz`:
- чат-виджет відкривається,
- можна надіслати повідомлення,
- у Matrix-клієнті видно це повідомлення.
4. `/nodes/node-1-hetzner-gex44`:
- чат з guardian/steward доступний.
5. `/microdao/daarion`:
- чат MicroDAO працює.
---
## 10. PROMPT ДЛЯ CURSOR
```text
Виконай TASK_PHASE_MATRIX_GATEWAY_INTEGRATION_v1.md.
Фокус:
1) matrix-gateway (internal API: room/create, room/join, message/send)
2) city-service (room sync, agent/node/microdao chat-room API)
3) apps/web (AgentChatWidget інтеграція з Matrix)
4) мінімальний smoke-тест, як описано в розділі 9.
Використовуй існуючі foundation-документи у docs/foundation/ як джерело істини.
Після кожного завершеного модуля роби окремий git commit.
Не змінюй інші підсистеми, які не стосуються Rooms/Matrix/Chat.
```
---
**Target Date**: Immediate
**Priority**: Critical
**Dependencies**: Rooms Layer seeded, Matrix Synapse running

View File

@@ -4,14 +4,14 @@ Matrix Gateway Client for City Service
import os
import httpx
import logging
from typing import Optional, Tuple
from typing import Optional, Tuple, List, Dict, Any
logger = logging.getLogger(__name__)
MATRIX_GATEWAY_URL = os.getenv("MATRIX_GATEWAY_URL", "http://daarion-matrix-gateway:7025")
async def create_matrix_room(slug: str, name: str, visibility: str = "public") -> Tuple[Optional[str], Optional[str]]:
async def create_matrix_room(slug: str, name: str, visibility: str = "public", topic: str = None) -> Tuple[Optional[str], Optional[str]]:
"""
Create a Matrix room via Matrix Gateway.
@@ -20,13 +20,17 @@ async def create_matrix_room(slug: str, name: str, visibility: str = "public") -
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
payload = {
"slug": slug,
"name": name,
"visibility": visibility
}
if topic:
payload["topic"] = topic
response = await client.post(
f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms/create",
json={
"slug": slug,
"name": name,
"visibility": visibility
}
json=payload
)
if response.status_code == 200:
@@ -73,6 +77,108 @@ async def find_matrix_room_by_alias(alias: str) -> Tuple[Optional[str], Optional
return None, None
async def join_user_to_room(room_id: str, user_id: str) -> bool:
"""
Join a Matrix user to a room.
Args:
room_id: Matrix room ID (!abc:daarion.space)
user_id: Matrix user ID (@user:daarion.space)
Returns:
True if joined successfully, False otherwise
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.post(
f"{MATRIX_GATEWAY_URL}/internal/matrix/room/join",
json={
"room_id": room_id,
"user_id": user_id
}
)
if response.status_code == 200:
data = response.json()
logger.info(f"User {user_id} joined room {room_id}")
return data.get("ok", False)
else:
logger.error(f"Failed to join room: {response.text}")
return False
except httpx.RequestError as e:
logger.error(f"Matrix Gateway request error: {e}")
return False
async def send_message_to_room(room_id: str, body: str, sender: str = None) -> Optional[str]:
"""
Send a message to a Matrix room.
Args:
room_id: Matrix room ID
body: Message text
sender: Optional sender Matrix user ID
Returns:
Event ID if sent successfully, None otherwise
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
payload = {
"room_id": room_id,
"body": body,
"sender": sender or "@daarion-bot:daarion.space"
}
response = await client.post(
f"{MATRIX_GATEWAY_URL}/internal/matrix/message/send",
json=payload
)
if response.status_code == 200:
data = response.json()
logger.info(f"Message sent to {room_id}: {data.get('event_id')}")
return data.get("event_id")
else:
logger.error(f"Failed to send message: {response.text}")
return None
except httpx.RequestError as e:
logger.error(f"Matrix Gateway request error: {e}")
return None
async def get_room_messages(room_id: str, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get recent messages from a Matrix room.
Args:
room_id: Matrix room ID
limit: Max number of messages to fetch
Returns:
List of message dicts with event_id, sender, body, timestamp
"""
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(
f"{MATRIX_GATEWAY_URL}/internal/matrix/rooms/{room_id}/messages",
params={"limit": limit}
)
if response.status_code == 200:
data = response.json()
return data.get("messages", [])
else:
logger.error(f"Failed to get messages: {response.text}")
return []
except httpx.RequestError as e:
logger.error(f"Matrix Gateway request error: {e}")
return []
async def check_matrix_gateway_health() -> bool:
"""Check if Matrix Gateway is available."""
async with httpx.AsyncClient(timeout=5.0) as client:
@@ -82,3 +188,21 @@ async def check_matrix_gateway_health() -> bool:
except Exception:
return False
async def ensure_room_has_matrix(room_slug: str, room_name: str, visibility: str = "public") -> Tuple[Optional[str], Optional[str]]:
"""
Ensure a room has a Matrix room ID. Creates one if it doesn't exist.
Returns:
Tuple of (matrix_room_id, matrix_room_alias) or (None, None) on failure
"""
# First try to find existing room
alias = f"#city_{room_slug}:daarion.space"
matrix_room_id, matrix_room_alias = await find_matrix_room_by_alias(alias)
if matrix_room_id:
return matrix_room_id, matrix_room_alias
# Create new room
return await create_matrix_room(room_slug, room_name, visibility)

View File

@@ -48,7 +48,15 @@ from models_city import (
)
import repo_city
from common.redis_client import PresenceRedis, get_redis
from matrix_client import create_matrix_room, find_matrix_room_by_alias
from matrix_client import (
create_matrix_room,
find_matrix_room_by_alias,
join_user_to_room,
send_message_to_room,
get_room_messages,
check_matrix_gateway_health,
ensure_room_has_matrix
)
from dagi_router_client import get_dagi_router_client, DagiRouterClient
logger = logging.getLogger(__name__)
@@ -876,6 +884,89 @@ async def delete_agent_microdao_membership(
# City Rooms API
# =============================================================================
@router.post("/rooms/sync/matrix")
async def sync_rooms_with_matrix():
"""
Sync all rooms with Matrix.
Creates Matrix rooms for any room without matrix_room_id.
This is an admin endpoint for initial setup or recovery.
"""
try:
# Check Matrix Gateway health
gateway_ok = await check_matrix_gateway_health()
if not gateway_ok:
raise HTTPException(status_code=503, detail="Matrix Gateway unavailable")
# Get all rooms without Matrix ID
rooms = await repo_city.get_rooms_without_matrix()
results = {
"synced": [],
"failed": [],
"skipped": []
}
for room in rooms:
room_slug = room.get("slug")
room_name = room.get("name")
if not room_slug or not room_name:
results["skipped"].append({"id": room.get("id"), "reason": "missing slug or name"})
continue
try:
# Create Matrix room
matrix_room_id, matrix_room_alias = await create_matrix_room(
slug=room_slug,
name=room_name,
visibility="public" if room.get("is_public", True) else "private"
)
if matrix_room_id:
# Update room with Matrix ID
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
results["synced"].append({
"id": room.get("id"),
"slug": room_slug,
"matrix_room_id": matrix_room_id
})
logger.info(f"Synced room {room_slug} with Matrix: {matrix_room_id}")
else:
results["failed"].append({
"id": room.get("id"),
"slug": room_slug,
"reason": "Matrix room creation failed"
})
except Exception as e:
logger.error(f"Failed to sync room {room_slug}: {e}")
results["failed"].append({
"id": room.get("id"),
"slug": room_slug,
"reason": str(e)
})
return {
"status": "completed",
"total_rooms": len(rooms),
"synced_count": len(results["synced"]),
"failed_count": len(results["failed"]),
"skipped_count": len(results["skipped"]),
"details": results
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to sync rooms with Matrix: {e}")
raise HTTPException(status_code=500, detail=f"Failed to sync rooms: {str(e)}")
@router.get("/rooms", response_model=List[CityRoomRead])
async def get_city_rooms(limit: int = 100, offset: int = 0):
"""
@@ -1263,7 +1354,7 @@ async def chat_bootstrap(
async def get_agent_chat_room(agent_id: str):
"""
Отримати інформацію про кімнату чату для агента.
Повертає room_id, agent info для ініціалізації чату.
Автоматично створює Matrix кімнату якщо її немає.
"""
try:
agent = await repo_city.get_agent_by_id(agent_id)
@@ -1271,15 +1362,42 @@ async def get_agent_chat_room(agent_id: str):
raise HTTPException(status_code=404, detail=f"Agent not found: {agent_id}")
# Get agent's primary room or create room slug
room_slug = f"agent-console-{agent.get('public_slug') or agent_id}"
agent_slug = agent.get('public_slug') or agent_id.replace('ag_', '').replace('-', '_')
room_slug = f"agent-console-{agent_slug}"
room = await repo_city.get_room_by_slug(room_slug)
# If room doesn't exist, try to get agent's primary room from dashboard
# If room doesn't exist in DB, create it
if not room:
rooms = await repo_city.get_agent_rooms(agent_id)
if rooms and len(rooms) > 0:
room = rooms[0]
room_slug = room.get("slug", room_slug)
# Create room in DB
room = await repo_city.create_room(
slug=room_slug,
name=f"{agent.get('display_name')} Console",
description=f"Chat room for agent {agent.get('display_name')}",
created_by="system"
)
logger.info(f"Created agent chat room in DB: {room_slug}")
# Check if room has Matrix ID
matrix_room_id = room.get("matrix_room_id") if room else None
# If no Matrix room, try to create one
if not matrix_room_id:
try:
matrix_room_id, matrix_room_alias = await ensure_room_has_matrix(
room_slug=room_slug,
room_name=f"{agent.get('display_name')} Console",
visibility="private"
)
if matrix_room_id and room:
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
logger.info(f"Created Matrix room for agent {agent_id}: {matrix_room_id}")
except Exception as e:
logger.warning(f"Could not create Matrix room for agent {agent_id}: {e}")
return {
"agent_id": agent_id,
@@ -1289,8 +1407,8 @@ async def get_agent_chat_room(agent_id: str):
"agent_kind": agent.get("kind"),
"room_slug": room_slug,
"room_id": room.get("id") if room else None,
"matrix_room_id": room.get("matrix_room_id") if room else None,
"chat_available": room is not None and room.get("matrix_room_id") is not None
"matrix_room_id": matrix_room_id,
"chat_available": matrix_room_id is not None
}
except HTTPException:
raise
@@ -1303,7 +1421,7 @@ async def get_agent_chat_room(agent_id: str):
async def get_node_chat_room(node_id: str):
"""
Отримати інформацію про кімнату чату для ноди.
Повертає room_id, guardian/steward agents info.
Автоматично створює Matrix кімнату якщо її немає.
"""
try:
node = await repo_city.get_node_by_id(node_id)
@@ -1315,6 +1433,38 @@ async def get_node_chat_room(node_id: str):
room_slug = f"node-support-{node_slug}"
room = await repo_city.get_room_by_slug(room_slug)
# If room doesn't exist in DB, create it
if not room:
room = await repo_city.create_room(
slug=room_slug,
name=f"{node.get('name', node_id)} Support",
description=f"Support room for node {node.get('name', node_id)}",
created_by="system"
)
logger.info(f"Created node support room in DB: {room_slug}")
# Check if room has Matrix ID
matrix_room_id = room.get("matrix_room_id") if room else None
# If no Matrix room, try to create one
if not matrix_room_id:
try:
matrix_room_id, matrix_room_alias = await ensure_room_has_matrix(
room_slug=room_slug,
room_name=f"{node.get('name', node_id)} Support",
visibility="private"
)
if matrix_room_id and room:
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
logger.info(f"Created Matrix room for node {node_id}: {matrix_room_id}")
except Exception as e:
logger.warning(f"Could not create Matrix room for node {node_id}: {e}")
# Get guardian and steward agents
guardian_agent = None
steward_agent = None
@@ -1341,8 +1491,8 @@ async def get_node_chat_room(node_id: str):
"node_status": node.get("status", "offline"),
"room_slug": room_slug,
"room_id": room.get("id") if room else None,
"matrix_room_id": room.get("matrix_room_id") if room else None,
"chat_available": room is not None and room.get("matrix_room_id") is not None,
"matrix_room_id": matrix_room_id,
"chat_available": matrix_room_id is not None,
"agents": [a for a in [guardian_agent, steward_agent] if a is not None]
}
except HTTPException:
@@ -1356,18 +1506,18 @@ async def get_node_chat_room(node_id: str):
async def get_microdao_chat_room(slug: str):
"""
Отримати інформацію про кімнату чату для MicroDAO.
Повертає room_id, orchestrator agent info.
Автоматично створює Matrix кімнату якщо її немає.
"""
try:
dao = await repo_city.get_microdao_by_slug(slug)
if not dao:
raise HTTPException(status_code=404, detail=f"MicroDAO not found: {slug}")
# Get MicroDAO lobby room
room_slug = f"microdao-lobby-{slug}"
# Get MicroDAO lobby room (prefer {slug}-lobby pattern)
room_slug = f"{slug}-lobby"
room = await repo_city.get_room_by_slug(room_slug)
# If no lobby room, try to get primary room
# If no lobby room, try to get primary room from existing rooms
if not room:
rooms = await repo_city.get_microdao_rooms(dao["id"])
if rooms and len(rooms) > 0:
@@ -1376,6 +1526,38 @@ async def get_microdao_chat_room(slug: str):
room = primary
room_slug = room.get("slug", room_slug)
# If still no room, create one
if not room:
room = await repo_city.create_room(
slug=room_slug,
name=f"{dao.get('name', slug)} Lobby",
description=f"Lobby for {dao.get('name', slug)} MicroDAO",
created_by="system"
)
logger.info(f"Created MicroDAO lobby room in DB: {room_slug}")
# Check if room has Matrix ID
matrix_room_id = room.get("matrix_room_id") if room else None
# If no Matrix room, try to create one
if not matrix_room_id:
try:
matrix_room_id, matrix_room_alias = await ensure_room_has_matrix(
room_slug=room_slug,
room_name=f"{dao.get('name', slug)} Lobby",
visibility="public"
)
if matrix_room_id and room:
await repo_city.update_room_matrix(
room_id=room.get("id"),
matrix_room_id=matrix_room_id,
matrix_room_alias=matrix_room_alias
)
logger.info(f"Created Matrix room for MicroDAO {slug}: {matrix_room_id}")
except Exception as e:
logger.warning(f"Could not create Matrix room for MicroDAO {slug}: {e}")
# Get orchestrator agent
orchestrator = None
orchestrator_id = dao.get("orchestrator_agent_id")
@@ -1396,8 +1578,8 @@ async def get_microdao_chat_room(slug: str):
"microdao_name": dao.get("name"),
"room_slug": room_slug,
"room_id": room.get("id") if room else None,
"matrix_room_id": room.get("matrix_room_id") if room else None,
"chat_available": room is not None and room.get("matrix_room_id") is not None,
"matrix_room_id": matrix_room_id,
"chat_available": matrix_room_id is not None,
"orchestrator": orchestrator
}
except HTTPException:

View File

@@ -91,6 +91,32 @@ class SetPresenceResponse(BaseModel):
status: str
# NEW: Room Join Request/Response
class RoomJoinRequest(BaseModel):
room_id: str # Matrix room ID (!abc:daarion.space)
user_id: str # Matrix user ID (@user:daarion.space)
class RoomJoinResponse(BaseModel):
ok: bool
room_id: str
user_id: str
# NEW: Send Message Request/Response
class SendMessageRequest(BaseModel):
room_id: str # Matrix room ID
sender: str # Matrix user ID (must be bot or have permissions)
body: str # Message text
msgtype: str = "m.text" # Message type
class SendMessageResponse(BaseModel):
ok: bool
event_id: str
room_id: str
async def get_admin_token() -> str:
"""Get or create admin access token for Matrix operations."""
global _admin_token
@@ -441,6 +467,150 @@ async def get_user_token(request: UserTokenRequest):
raise HTTPException(status_code=503, detail="Matrix unavailable")
@app.post("/internal/matrix/room/join", response_model=RoomJoinResponse)
async def join_room(request: RoomJoinRequest):
"""
Join a user to a Matrix room.
Uses admin token to invite and join the user.
"""
admin_token = await get_admin_token()
async with httpx.AsyncClient(timeout=30.0) as client:
try:
# First, invite the user to the room (admin action)
invite_resp = await client.post(
f"{settings.synapse_url}/_matrix/client/v3/rooms/{request.room_id}/invite",
headers={"Authorization": f"Bearer {admin_token}"},
json={"user_id": request.user_id}
)
# 200 = invited, 403 = already member (OK), 400 = already invited (OK)
if invite_resp.status_code not in (200, 403, 400):
logger.warning(f"Invite response: {invite_resp.status_code} - {invite_resp.text}")
# Now join the user to the room via admin API
# Use the synapse admin API to force-join
join_resp = await client.post(
f"{settings.synapse_url}/_synapse/admin/v1/join/{request.room_id}",
headers={"Authorization": f"Bearer {admin_token}"},
json={"user_id": request.user_id}
)
if join_resp.status_code == 200:
logger.info(f"User {request.user_id} joined room {request.room_id}")
return RoomJoinResponse(
ok=True,
room_id=request.room_id,
user_id=request.user_id
)
elif join_resp.status_code == 400:
# Already in room
logger.info(f"User {request.user_id} already in room {request.room_id}")
return RoomJoinResponse(
ok=True,
room_id=request.room_id,
user_id=request.user_id
)
else:
error = join_resp.json() if join_resp.text else {}
logger.error(f"Failed to join room: {join_resp.status_code} - {join_resp.text}")
raise HTTPException(
status_code=500,
detail=f"Failed to join room: {error.get('error', 'Unknown')}"
)
except httpx.RequestError as e:
logger.error(f"Matrix request error: {e}")
raise HTTPException(status_code=503, detail="Matrix unavailable")
@app.post("/internal/matrix/message/send", response_model=SendMessageResponse)
async def send_message(request: SendMessageRequest):
"""
Send a message to a Matrix room.
Uses admin token to send on behalf of the bot.
"""
admin_token = await get_admin_token()
import time
txn_id = f"daarion_{int(time.time() * 1000)}"
async with httpx.AsyncClient(timeout=30.0) as client:
try:
# Send message via admin token (as the bot user)
send_resp = await client.put(
f"{settings.synapse_url}/_matrix/client/v3/rooms/{request.room_id}/send/m.room.message/{txn_id}",
headers={"Authorization": f"Bearer {admin_token}"},
json={
"msgtype": request.msgtype,
"body": request.body
}
)
if send_resp.status_code == 200:
result = send_resp.json()
event_id = result.get("event_id", "")
logger.info(f"Message sent to {request.room_id}: {event_id}")
return SendMessageResponse(
ok=True,
event_id=event_id,
room_id=request.room_id
)
else:
error = send_resp.json() if send_resp.text else {}
logger.error(f"Failed to send message: {send_resp.status_code} - {send_resp.text}")
raise HTTPException(
status_code=500,
detail=f"Failed to send message: {error.get('error', 'Unknown')}"
)
except httpx.RequestError as e:
logger.error(f"Matrix request error: {e}")
raise HTTPException(status_code=503, detail="Matrix unavailable")
@app.get("/internal/matrix/rooms/{room_id}/messages")
async def get_room_messages(room_id: str, limit: int = 50):
"""
Get recent messages from a Matrix room.
"""
admin_token = await get_admin_token()
async with httpx.AsyncClient(timeout=30.0) as client:
try:
resp = await client.get(
f"{settings.synapse_url}/_matrix/client/v3/rooms/{room_id}/messages",
headers={"Authorization": f"Bearer {admin_token}"},
params={
"dir": "b", # backwards from end
"limit": limit
}
)
if resp.status_code == 200:
data = resp.json()
messages = []
for event in data.get("chunk", []):
if event.get("type") == "m.room.message":
content = event.get("content", {})
messages.append({
"event_id": event.get("event_id"),
"sender": event.get("sender"),
"body": content.get("body", ""),
"msgtype": content.get("msgtype", "m.text"),
"timestamp": event.get("origin_server_ts", 0)
})
return {"messages": messages, "room_id": room_id}
else:
raise HTTPException(status_code=resp.status_code, detail="Failed to get messages")
except httpx.RequestError as e:
logger.error(f"Matrix request error: {e}")
raise HTTPException(status_code=503, detail="Matrix unavailable")
@app.post("/internal/matrix/presence/online", response_model=SetPresenceResponse)
async def set_presence_online(request: SetPresenceRequest):
"""