feat(matrix-bridge-dagi): add matrix client wrapper and synapse setup (PR-M1.1)
- adds MatrixClient with send_text/sync_poll/join_room/whoami (idempotent via txn_id) - LRU dedupe for incoming event_ids (2048 capacity) - exponential backoff retry (max 3 attempts) for 429/5xx/network errors - extract_room_messages: filters own messages, non-text, duplicates - health endpoint now probes matrix_reachable + gateway_reachable at startup - adds docker-compose.synapse-node1.yml (Synapse + Postgres for NODA1) - adds ops/runbook-matrix-setup.md (10-step setup: DNS, config, bot, room, .env) - 19 tests passing, no real Synapse required Made-with: Cursor
This commit is contained in:
@@ -8,11 +8,18 @@ import logging
|
||||
import os
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from fastapi import FastAPI, Response
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
try:
|
||||
import httpx as _httpx
|
||||
_HTTPX_OK = True
|
||||
except ImportError: # pragma: no cover
|
||||
_httpx = None # type: ignore
|
||||
_HTTPX_OK = False
|
||||
|
||||
try:
|
||||
from prometheus_client import (
|
||||
Counter, Histogram, Gauge,
|
||||
@@ -60,13 +67,27 @@ if _PROM_OK:
|
||||
|
||||
# ── Startup state ─────────────────────────────────────────────────────────────
|
||||
_START_TIME = time.monotonic()
|
||||
_cfg: BridgeConfig | None = None
|
||||
_config_error: str | None = None
|
||||
_cfg: Optional[BridgeConfig] = None
|
||||
_config_error: Optional[str] = None
|
||||
_matrix_reachable: Optional[bool] = None # probed at startup
|
||||
_gateway_reachable: Optional[bool] = None # probed at startup
|
||||
|
||||
|
||||
async def _probe_url(url: str, timeout: float = 5.0) -> bool:
|
||||
"""Quick GET probe — returns True if HTTP 2xx."""
|
||||
if not _HTTPX_OK or not url:
|
||||
return False
|
||||
try:
|
||||
async with _httpx.AsyncClient(timeout=timeout) as client:
|
||||
r = await client.get(url)
|
||||
return r.status_code < 400
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# ── Lifespan ──────────────────────────────────────────────────────────────────
|
||||
@asynccontextmanager
|
||||
async def lifespan(app_: Any):
|
||||
global _cfg, _config_error
|
||||
global _cfg, _config_error, _matrix_reachable, _gateway_reachable
|
||||
try:
|
||||
_cfg = load_config()
|
||||
logger.info(
|
||||
@@ -74,6 +95,21 @@ async def lifespan(app_: Any):
|
||||
_cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url,
|
||||
_cfg.sofiia_room_id, list(_cfg.bridge_allowed_agents),
|
||||
)
|
||||
# Connectivity smoke probes (non-blocking failures)
|
||||
_matrix_reachable = await _probe_url(
|
||||
f"{_cfg.matrix_homeserver_url}/_matrix/client/versions"
|
||||
)
|
||||
_gateway_reachable = await _probe_url(
|
||||
f"{_cfg.dagi_gateway_url}/health"
|
||||
)
|
||||
if _matrix_reachable:
|
||||
logger.info("✅ Matrix homeserver reachable: %s", _cfg.matrix_homeserver_url)
|
||||
else:
|
||||
logger.warning("⚠️ Matrix homeserver NOT reachable: %s", _cfg.matrix_homeserver_url)
|
||||
if _gateway_reachable:
|
||||
logger.info("✅ DAGI Gateway reachable: %s", _cfg.dagi_gateway_url)
|
||||
else:
|
||||
logger.warning("⚠️ DAGI Gateway NOT reachable: %s", _cfg.dagi_gateway_url)
|
||||
if _PROM_OK:
|
||||
_bridge_up.set(1)
|
||||
except RuntimeError as exc:
|
||||
@@ -111,8 +147,11 @@ async def health() -> Dict[str, Any]:
|
||||
"uptime_s": uptime,
|
||||
"error": _config_error or "service not initialised",
|
||||
}
|
||||
matrix_ok = _matrix_reachable is True
|
||||
gateway_ok = _gateway_reachable is True
|
||||
overall_ok = matrix_ok and gateway_ok
|
||||
return {
|
||||
"ok": True,
|
||||
"ok": overall_ok,
|
||||
"service": "matrix-bridge-dagi",
|
||||
"version": "0.1.0",
|
||||
"build": _cfg.build_sha,
|
||||
@@ -121,10 +160,12 @@ async def health() -> Dict[str, Any]:
|
||||
"uptime_s": uptime,
|
||||
"node_id": _cfg.node_id,
|
||||
"homeserver": _cfg.matrix_homeserver_url,
|
||||
"matrix_reachable": _matrix_reachable,
|
||||
"bridge_user": _cfg.matrix_user_id,
|
||||
"sofiia_room_id": _cfg.sofiia_room_id,
|
||||
"allowed_agents": list(_cfg.bridge_allowed_agents),
|
||||
"gateway": _cfg.dagi_gateway_url,
|
||||
"gateway_reachable": _gateway_reachable,
|
||||
"config_ok": True,
|
||||
}
|
||||
|
||||
|
||||
308
services/matrix-bridge-dagi/app/matrix_client.py
Normal file
308
services/matrix-bridge-dagi/app/matrix_client.py
Normal file
@@ -0,0 +1,308 @@
|
||||
"""
|
||||
Matrix Client Wrapper — Phase M1
|
||||
|
||||
Provides minimal, idempotent Matrix CS-API calls:
|
||||
- send_text(room_id, text, txn_id) — idempotent PUT via txn_id
|
||||
- sync_poll(since) — GET /sync with timeout
|
||||
- join_room(room_id) — POST join if not already joined
|
||||
- whoami() — GET /account/whoami (for smoke)
|
||||
|
||||
Design:
|
||||
- No state beyond in-memory dedupe LRU for incoming event_ids.
|
||||
- Idempotency: txn_id = sha256(room_id + source_event_id) for replies.
|
||||
- Retry: simple exponential backoff, max 3 attempts, surface errors up.
|
||||
- No asyncio background tasks here — caller drives the sync loop.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Dedupe LRU cache ───────────────────────────────────────────────────────────
|
||||
|
||||
class _LRUSet:
|
||||
"""Fixed-size LRU set for event_id deduplication."""
|
||||
|
||||
def __init__(self, maxsize: int = 2048):
|
||||
self._data: OrderedDict[str, None] = OrderedDict()
|
||||
self._maxsize = maxsize
|
||||
|
||||
def contains(self, key: str) -> bool:
|
||||
if key in self._data:
|
||||
self._data.move_to_end(key)
|
||||
return True
|
||||
return False
|
||||
|
||||
def add(self, key: str) -> None:
|
||||
if key in self._data:
|
||||
self._data.move_to_end(key)
|
||||
return
|
||||
self._data[key] = None
|
||||
while len(self._data) > self._maxsize:
|
||||
self._data.popitem(last=False)
|
||||
|
||||
|
||||
# ── Matrix Client ──────────────────────────────────────────────────────────────
|
||||
|
||||
class MatrixClient:
|
||||
"""
|
||||
Minimal async Matrix CS-API client for matrix-bridge-dagi M1.
|
||||
|
||||
Usage:
|
||||
client = MatrixClient(homeserver_url, access_token, bot_user_id)
|
||||
async with client:
|
||||
await client.join_room(room_id)
|
||||
txn_id = MatrixClient.make_txn_id(room_id, event_id)
|
||||
await client.send_text(room_id, "Hello", txn_id)
|
||||
"""
|
||||
|
||||
# Sync timeout: how long Matrix server holds the /sync connection open
|
||||
SYNC_TIMEOUT_MS = 30_000
|
||||
|
||||
# HTTP timeout for non-sync requests
|
||||
HTTP_TIMEOUT_S = 15.0
|
||||
|
||||
# Max retries for transient errors (429, 5xx, network)
|
||||
MAX_RETRIES = 3
|
||||
|
||||
# Initial backoff seconds (doubles each retry)
|
||||
BACKOFF_INITIAL = 1.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
homeserver_url: str,
|
||||
access_token: str,
|
||||
bot_user_id: str,
|
||||
dedupe_maxsize: int = 2048,
|
||||
) -> None:
|
||||
self._hs = homeserver_url.rstrip("/")
|
||||
self._token = access_token
|
||||
self._bot_user_id = bot_user_id
|
||||
self._dedupe = _LRUSet(dedupe_maxsize)
|
||||
self._client: Optional[httpx.AsyncClient] = None
|
||||
|
||||
# ── Context manager ────────────────────────────────────────────────────────
|
||||
|
||||
async def __aenter__(self) -> "MatrixClient":
|
||||
self._client = httpx.AsyncClient(
|
||||
headers={
|
||||
"Authorization": f"Bearer {self._token}",
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "matrix-bridge-dagi/0.1",
|
||||
},
|
||||
timeout=self.HTTP_TIMEOUT_S,
|
||||
follow_redirects=True,
|
||||
)
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_: Any) -> None:
|
||||
if self._client:
|
||||
await self._client.aclose()
|
||||
self._client = None
|
||||
|
||||
# ── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def make_txn_id(room_id: str, source_event_id: str) -> str:
|
||||
"""
|
||||
Deterministic txn_id for idempotent reply sends.
|
||||
SHA-256 of (room_id + source_event_id), hex[:32].
|
||||
"""
|
||||
raw = f"{room_id}:{source_event_id}".encode()
|
||||
return hashlib.sha256(raw).hexdigest()[:32]
|
||||
|
||||
def is_duplicate(self, event_id: str) -> bool:
|
||||
"""True if event_id was seen before (dedupe for incoming events)."""
|
||||
return self._dedupe.contains(event_id)
|
||||
|
||||
def mark_seen(self, event_id: str) -> None:
|
||||
"""Mark incoming event_id as processed."""
|
||||
self._dedupe.add(event_id)
|
||||
|
||||
def _url(self, path: str) -> str:
|
||||
return f"{self._hs}/_matrix/client/v3{path}"
|
||||
|
||||
def _ensure_client(self) -> httpx.AsyncClient:
|
||||
if self._client is None:
|
||||
raise RuntimeError(
|
||||
"MatrixClient not initialised — use 'async with MatrixClient(...) as client:'"
|
||||
)
|
||||
return self._client
|
||||
|
||||
async def _request_with_retry(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
json: Optional[Dict[str, Any]] = None,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
timeout: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute HTTP request with exponential backoff on 429/5xx.
|
||||
Raises httpx.HTTPStatusError on final failure.
|
||||
"""
|
||||
client = self._ensure_client()
|
||||
url = self._url(path)
|
||||
backoff = self.BACKOFF_INITIAL
|
||||
|
||||
for attempt in range(1, self.MAX_RETRIES + 1):
|
||||
try:
|
||||
resp = await client.request(
|
||||
method,
|
||||
url,
|
||||
json=json,
|
||||
params=params,
|
||||
timeout=timeout or self.HTTP_TIMEOUT_S,
|
||||
)
|
||||
if resp.status_code == 429:
|
||||
retry_after = float(
|
||||
resp.json().get("retry_after_ms", backoff * 1000)
|
||||
) / 1000.0
|
||||
logger.warning(
|
||||
"Rate limited by homeserver, retry in %.1fs (attempt %d/%d)",
|
||||
retry_after, attempt, self.MAX_RETRIES,
|
||||
)
|
||||
if attempt < self.MAX_RETRIES:
|
||||
await asyncio.sleep(retry_after)
|
||||
backoff *= 2
|
||||
continue
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
except (httpx.ConnectError, httpx.TimeoutException) as exc:
|
||||
logger.warning(
|
||||
"Network error on %s %s (attempt %d/%d): %s",
|
||||
method, path, attempt, self.MAX_RETRIES, exc,
|
||||
)
|
||||
if attempt < self.MAX_RETRIES:
|
||||
await asyncio.sleep(backoff)
|
||||
backoff *= 2
|
||||
else:
|
||||
raise
|
||||
|
||||
except httpx.HTTPStatusError as exc:
|
||||
if exc.response.status_code >= 500 and attempt < self.MAX_RETRIES:
|
||||
logger.warning(
|
||||
"Server error %d on %s %s, retry (attempt %d/%d)",
|
||||
exc.response.status_code, method, path, attempt, self.MAX_RETRIES,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
backoff *= 2
|
||||
else:
|
||||
raise
|
||||
|
||||
raise RuntimeError(f"Exhausted {self.MAX_RETRIES} retries for {method} {path}")
|
||||
|
||||
# ── Public API ─────────────────────────────────────────────────────────────
|
||||
|
||||
async def whoami(self) -> Dict[str, Any]:
|
||||
"""GET /account/whoami — returns {user_id, device_id, ...}"""
|
||||
return await self._request_with_retry("GET", "/account/whoami")
|
||||
|
||||
async def join_room(self, room_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
POST /join/{room_id} — join the room (safe to call if already joined;
|
||||
homeserver returns 200 with room_id).
|
||||
"""
|
||||
encoded = room_id.replace("!", "%21").replace(":", "%3A")
|
||||
result = await self._request_with_retry("POST", f"/join/{encoded}", json={})
|
||||
logger.info("Joined room %s", room_id)
|
||||
return result
|
||||
|
||||
async def send_text(
|
||||
self, room_id: str, text: str, txn_id: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
PUT /rooms/{room_id}/send/m.room.message/{txn_id}
|
||||
Idempotent: same txn_id → homeserver deduplicates.
|
||||
Returns {event_id: ...}
|
||||
"""
|
||||
encoded_room = room_id.replace("!", "%21").replace(":", "%3A")
|
||||
path = f"/rooms/{encoded_room}/send/m.room.message/{txn_id}"
|
||||
result = await self._request_with_retry(
|
||||
"PUT",
|
||||
path,
|
||||
json={"msgtype": "m.text", "body": text},
|
||||
)
|
||||
logger.debug("Sent message to %s: event_id=%s", room_id, result.get("event_id"))
|
||||
return result
|
||||
|
||||
async def sync_poll(
|
||||
self,
|
||||
since: Optional[str] = None,
|
||||
filter_id: Optional[str] = None,
|
||||
timeout_ms: int = SYNC_TIMEOUT_MS,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
GET /sync — long-poll for new events.
|
||||
Returns raw sync response dict.
|
||||
|
||||
Args:
|
||||
since: next_batch token from previous sync (None for initial).
|
||||
filter_id: optional filter (to limit event types/rooms).
|
||||
timeout_ms: how long server should hold the request (default 30s).
|
||||
"""
|
||||
params: Dict[str, Any] = {"timeout": timeout_ms}
|
||||
if since:
|
||||
params["since"] = since
|
||||
if filter_id:
|
||||
params["filter"] = filter_id
|
||||
|
||||
# Add extra buffer beyond timeout_ms for our HTTP timeout
|
||||
http_timeout = (timeout_ms / 1000.0) + 15.0
|
||||
|
||||
return await self._request_with_retry(
|
||||
"GET", "/sync", params=params, timeout=http_timeout
|
||||
)
|
||||
|
||||
def extract_room_messages(
|
||||
self,
|
||||
sync_response: Dict[str, Any],
|
||||
room_id: str,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Extract new m.room.message events for a specific room from a sync response.
|
||||
|
||||
Filters out:
|
||||
- messages from the bot itself (_bot_user_id)
|
||||
- already-seen event_ids (dedupe)
|
||||
|
||||
Returns list of event dicts with at least: {event_id, sender, content, origin_server_ts}.
|
||||
"""
|
||||
rooms_data = sync_response.get("rooms", {})
|
||||
join_data = rooms_data.get("join", {})
|
||||
room_data = join_data.get(room_id, {})
|
||||
timeline = room_data.get("timeline", {})
|
||||
events = timeline.get("events", [])
|
||||
|
||||
results = []
|
||||
for event in events:
|
||||
if event.get("type") != "m.room.message":
|
||||
continue
|
||||
event_id = event.get("event_id", "")
|
||||
sender = event.get("sender", "")
|
||||
|
||||
# Skip own messages
|
||||
if sender == self._bot_user_id:
|
||||
continue
|
||||
|
||||
# Skip already-processed
|
||||
if self.is_duplicate(event_id):
|
||||
continue
|
||||
|
||||
# Only text messages for M1
|
||||
content = event.get("content", {})
|
||||
if content.get("msgtype") != "m.text":
|
||||
continue
|
||||
|
||||
results.append(event)
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user