From d8506da17953069cb1e9d27dfa03fa728d74de7d Mon Sep 17 00:00:00 2001 From: Apple Date: Tue, 3 Mar 2026 07:38:54 -0800 Subject: [PATCH] feat(matrix-bridge-dagi): add matrix client wrapper and synapse setup (PR-M1.1) - adds MatrixClient with send_text/sync_poll/join_room/whoami (idempotent via txn_id) - LRU dedupe for incoming event_ids (2048 capacity) - exponential backoff retry (max 3 attempts) for 429/5xx/network errors - extract_room_messages: filters own messages, non-text, duplicates - health endpoint now probes matrix_reachable + gateway_reachable at startup - adds docker-compose.synapse-node1.yml (Synapse + Postgres for NODA1) - adds ops/runbook-matrix-setup.md (10-step setup: DNS, config, bot, room, .env) - 19 tests passing, no real Synapse required Made-with: Cursor --- docker-compose.synapse-node1.yml | 68 ++++ ops/runbook-matrix-setup.md | 235 ++++++++++++ services/matrix-bridge-dagi/app/main.py | 51 ++- .../matrix-bridge-dagi/app/matrix_client.py | 308 +++++++++++++++ tests/test_matrix_bridge_client.py | 356 ++++++++++++++++++ 5 files changed, 1013 insertions(+), 5 deletions(-) create mode 100644 docker-compose.synapse-node1.yml create mode 100644 ops/runbook-matrix-setup.md create mode 100644 services/matrix-bridge-dagi/app/matrix_client.py create mode 100644 tests/test_matrix_bridge_client.py diff --git a/docker-compose.synapse-node1.yml b/docker-compose.synapse-node1.yml new file mode 100644 index 00000000..99d11171 --- /dev/null +++ b/docker-compose.synapse-node1.yml @@ -0,0 +1,68 @@ +# Synapse Matrix Homeserver — NODA1 setup +# +# BEFORE FIRST RUN: +# 1. Set DNS: A record matrix.daarion.space → 144.76.224.179 +# 2. Generate Synapse config (one-time): +# docker run --rm -v $(pwd)/synapse-data:/data \ +# -e SYNAPSE_SERVER_NAME=daarion.space \ +# -e SYNAPSE_REPORT_STATS=no \ +# matrixdotorg/synapse:latest generate +# 3. Set SYNAPSE_POSTGRES_PASSWORD in .env +# 4. Run: docker compose -f docker-compose.synapse-node1.yml up -d +# 5. Create admin user (see ops/runbook-matrix-setup.md) + +version: "3.9" + +services: + synapse: + image: matrixdotorg/synapse:latest + container_name: dagi-synapse-node1 + restart: unless-stopped + environment: + - SYNAPSE_CONFIG_PATH=/data/homeserver.yaml + volumes: + - synapse-data:/data + depends_on: + synapse-db: + condition: service_healthy + healthcheck: + test: + - "CMD" + - "python3" + - "-c" + - "import urllib.request; urllib.request.urlopen('http://localhost:8008/_matrix/client/versions', timeout=5)" + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + networks: + - dagi-network + # Port 8008 internal only — exposed via Nginx on dagi-network + # Do NOT expose publicly; Nginx handles TLS termination + + synapse-db: + image: postgres:15-alpine + container_name: dagi-synapse-db-node1 + restart: unless-stopped + environment: + - POSTGRES_USER=synapse + - POSTGRES_PASSWORD=${SYNAPSE_POSTGRES_PASSWORD:-changeme_synapse} + - POSTGRES_DB=synapse + - POSTGRES_INITDB_ARGS=--encoding=UTF-8 --lc-collate=C --lc-ctype=C + volumes: + - synapse-db-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U synapse"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - dagi-network + +volumes: + synapse-data: + synapse-db-data: + +networks: + dagi-network: + external: true diff --git a/ops/runbook-matrix-setup.md b/ops/runbook-matrix-setup.md new file mode 100644 index 00000000..4bdcb542 --- /dev/null +++ b/ops/runbook-matrix-setup.md @@ -0,0 +1,235 @@ +# Runbook: Matrix/Synapse Setup — NODA1 (Phase M1) + +**Мета**: Підняти Synapse на NODA1, зареєструвати бот `@dagi_bridge:daarion.space`, +створити room для Sofiia, отримати 4 значення для `matrix-bridge-dagi`. + +--- + +## Передумови + +- NODA1: `ssh root@144.76.224.179` +- DNS: додати A-запис `matrix.daarion.space → 144.76.224.179` в панелі DNS +- Certbot/Nginx вже є на NODA1 + +--- + +## Крок 1: DNS + +В панелі управління DNS додати: +``` +Type: A +Name: matrix +Value: 144.76.224.179 +TTL: 300 +``` + +Перевірка (зачекати 5–10 хв): +```bash +host matrix.daarion.space +# очікується: matrix.daarion.space has address 144.76.224.179 +``` + +--- + +## Крок 2: Генерація Synapse конфіга (one-time, на NODA1) + +```bash +cd /opt/microdao-daarion + +docker run --rm \ + -v $(pwd)/synapse-data:/data \ + -e SYNAPSE_SERVER_NAME=daarion.space \ + -e SYNAPSE_REPORT_STATS=no \ + matrixdotorg/synapse:latest generate + +# Результат: synapse-data/homeserver.yaml +``` + +--- + +## Крок 3: Налаштування homeserver.yaml + +Відредагувати `synapse-data/homeserver.yaml`, додати/змінити: + +```yaml +server_name: "daarion.space" + +database: + name: psycopg2 + args: + user: synapse + password: "YOUR_SYNAPSE_POSTGRES_PASSWORD" + database: synapse + host: dagi-synapse-db-node1 + port: 5432 + cp_min: 5 + cp_max: 10 + +enable_registration: false +enable_registration_without_verification: false + +password_config: + enabled: true + +report_stats: false + +listeners: + - port: 8008 + tls: false + type: http + x_forwarded: true + bind_addresses: ['0.0.0.0'] + resources: + - names: [client, federation] + compress: false +``` + +--- + +## Крок 4: .env + запуск + +```bash +# Додати пароль Postgres в .env +echo "SYNAPSE_POSTGRES_PASSWORD=<пароль>" >> /opt/microdao-daarion/.env + +# Запуск +docker compose -f docker-compose.synapse-node1.yml up -d + +# Перевірка через 30с +sleep 30 +curl -s http://localhost:8008/_matrix/client/versions | python3 -c 'import sys,json; print("OK:", json.load(sys.stdin)["versions"][:2])' +``` + +--- + +## Крок 5: Nginx — matrix.daarion.space + +```nginx +server { + listen 80; + server_name matrix.daarion.space; + return 301 https://$host$request_uri; +} + +server { + listen 443 ssl http2; + server_name matrix.daarion.space; + + ssl_certificate /etc/letsencrypt/live/matrix.daarion.space/fullchain.pem; + ssl_certificate_key /etc/letsencrypt/live/matrix.daarion.space/privkey.pem; + + location /_matrix/ { + proxy_pass http://127.0.0.1:8008; + proxy_set_header Host $host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto https; + client_max_body_size 50M; + } + + location /.well-known/matrix/ { + proxy_pass http://127.0.0.1:8008; + proxy_set_header Host $host; + } +} +``` + +```bash +certbot certonly --nginx -d matrix.daarion.space +nginx -t && systemctl reload nginx + +# Перевірка: +curl -s https://matrix.daarion.space/_matrix/client/versions | python3 -c 'import sys,json; print(json.load(sys.stdin)["versions"][:2])' +``` + +--- + +## Крок 6: Створення бота dagi_bridge + +```bash +# Тимчасово enable_registration: true у homeserver.yaml, потім: +docker restart dagi-synapse-node1 && sleep 15 + +# Реєстрація (в контейнері): +docker exec -it dagi-synapse-node1 \ + register_new_matrix_user -c /data/homeserver.yaml http://localhost:8008 +# → user: dagi_bridge | password: <пароль> | admin: no +``` + +--- + +## Крок 7: Отримати access token + +```bash +HS="http://localhost:8008" +BOT_PASS="<пароль_бота>" + +MATRIX_ACCESS_TOKEN=$(curl -s -X POST "$HS/_matrix/client/v3/login" \ + -H "Content-Type: application/json" \ + -d "{ + \"type\":\"m.login.password\", + \"identifier\":{\"type\":\"m.id.user\",\"user\":\"dagi_bridge\"}, + \"password\":\"$BOT_PASS\" + }" | python3 -c 'import sys,json; print(json.load(sys.stdin)["access_token"])') + +echo "MATRIX_ACCESS_TOKEN=$MATRIX_ACCESS_TOKEN" +``` + +--- + +## Крок 8: Створити room + +```bash +SOFIIA_ROOM_ID=$(curl -s -X POST "$HS/_matrix/client/v3/createRoom" \ + -H "Authorization: Bearer $MATRIX_ACCESS_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "DAGI — Sofiia", + "topic": "DAGI Agent: Sofiia (Chief AI Architect)", + "preset": "private_chat", + "is_direct": false + }' | python3 -c 'import sys,json; print(json.load(sys.stdin)["room_id"])') + +echo "SOFIIA_ROOM_ID=$SOFIIA_ROOM_ID" +``` + +--- + +## Крок 9: Вимкнути реєстрацію + зберегти змінні + +```bash +# enable_registration: false у homeserver.yaml +docker restart dagi-synapse-node1 + +# Додати в .env: +cat >> /opt/microdao-daarion/.env < bool: + """Quick GET probe — returns True if HTTP 2xx.""" + if not _HTTPX_OK or not url: + return False + try: + async with _httpx.AsyncClient(timeout=timeout) as client: + r = await client.get(url) + return r.status_code < 400 + except Exception: + return False # ── Lifespan ────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app_: Any): - global _cfg, _config_error + global _cfg, _config_error, _matrix_reachable, _gateway_reachable try: _cfg = load_config() logger.info( @@ -74,6 +95,21 @@ async def lifespan(app_: Any): _cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url, _cfg.sofiia_room_id, list(_cfg.bridge_allowed_agents), ) + # Connectivity smoke probes (non-blocking failures) + _matrix_reachable = await _probe_url( + f"{_cfg.matrix_homeserver_url}/_matrix/client/versions" + ) + _gateway_reachable = await _probe_url( + f"{_cfg.dagi_gateway_url}/health" + ) + if _matrix_reachable: + logger.info("✅ Matrix homeserver reachable: %s", _cfg.matrix_homeserver_url) + else: + logger.warning("⚠️ Matrix homeserver NOT reachable: %s", _cfg.matrix_homeserver_url) + if _gateway_reachable: + logger.info("✅ DAGI Gateway reachable: %s", _cfg.dagi_gateway_url) + else: + logger.warning("⚠️ DAGI Gateway NOT reachable: %s", _cfg.dagi_gateway_url) if _PROM_OK: _bridge_up.set(1) except RuntimeError as exc: @@ -111,8 +147,11 @@ async def health() -> Dict[str, Any]: "uptime_s": uptime, "error": _config_error or "service not initialised", } + matrix_ok = _matrix_reachable is True + gateway_ok = _gateway_reachable is True + overall_ok = matrix_ok and gateway_ok return { - "ok": True, + "ok": overall_ok, "service": "matrix-bridge-dagi", "version": "0.1.0", "build": _cfg.build_sha, @@ -121,10 +160,12 @@ async def health() -> Dict[str, Any]: "uptime_s": uptime, "node_id": _cfg.node_id, "homeserver": _cfg.matrix_homeserver_url, + "matrix_reachable": _matrix_reachable, "bridge_user": _cfg.matrix_user_id, "sofiia_room_id": _cfg.sofiia_room_id, "allowed_agents": list(_cfg.bridge_allowed_agents), "gateway": _cfg.dagi_gateway_url, + "gateway_reachable": _gateway_reachable, "config_ok": True, } diff --git a/services/matrix-bridge-dagi/app/matrix_client.py b/services/matrix-bridge-dagi/app/matrix_client.py new file mode 100644 index 00000000..a4e47307 --- /dev/null +++ b/services/matrix-bridge-dagi/app/matrix_client.py @@ -0,0 +1,308 @@ +""" +Matrix Client Wrapper — Phase M1 + +Provides minimal, idempotent Matrix CS-API calls: + - send_text(room_id, text, txn_id) — idempotent PUT via txn_id + - sync_poll(since) — GET /sync with timeout + - join_room(room_id) — POST join if not already joined + - whoami() — GET /account/whoami (for smoke) + +Design: + - No state beyond in-memory dedupe LRU for incoming event_ids. + - Idempotency: txn_id = sha256(room_id + source_event_id) for replies. + - Retry: simple exponential backoff, max 3 attempts, surface errors up. + - No asyncio background tasks here — caller drives the sync loop. +""" + +import asyncio +import hashlib +import logging +import time +from collections import OrderedDict +from typing import Any, Dict, List, Optional + +import httpx + +logger = logging.getLogger(__name__) + +# ── Dedupe LRU cache ─────────────────────────────────────────────────────────── + +class _LRUSet: + """Fixed-size LRU set for event_id deduplication.""" + + def __init__(self, maxsize: int = 2048): + self._data: OrderedDict[str, None] = OrderedDict() + self._maxsize = maxsize + + def contains(self, key: str) -> bool: + if key in self._data: + self._data.move_to_end(key) + return True + return False + + def add(self, key: str) -> None: + if key in self._data: + self._data.move_to_end(key) + return + self._data[key] = None + while len(self._data) > self._maxsize: + self._data.popitem(last=False) + + +# ── Matrix Client ────────────────────────────────────────────────────────────── + +class MatrixClient: + """ + Minimal async Matrix CS-API client for matrix-bridge-dagi M1. + + Usage: + client = MatrixClient(homeserver_url, access_token, bot_user_id) + async with client: + await client.join_room(room_id) + txn_id = MatrixClient.make_txn_id(room_id, event_id) + await client.send_text(room_id, "Hello", txn_id) + """ + + # Sync timeout: how long Matrix server holds the /sync connection open + SYNC_TIMEOUT_MS = 30_000 + + # HTTP timeout for non-sync requests + HTTP_TIMEOUT_S = 15.0 + + # Max retries for transient errors (429, 5xx, network) + MAX_RETRIES = 3 + + # Initial backoff seconds (doubles each retry) + BACKOFF_INITIAL = 1.0 + + def __init__( + self, + homeserver_url: str, + access_token: str, + bot_user_id: str, + dedupe_maxsize: int = 2048, + ) -> None: + self._hs = homeserver_url.rstrip("/") + self._token = access_token + self._bot_user_id = bot_user_id + self._dedupe = _LRUSet(dedupe_maxsize) + self._client: Optional[httpx.AsyncClient] = None + + # ── Context manager ──────────────────────────────────────────────────────── + + async def __aenter__(self) -> "MatrixClient": + self._client = httpx.AsyncClient( + headers={ + "Authorization": f"Bearer {self._token}", + "Content-Type": "application/json", + "User-Agent": "matrix-bridge-dagi/0.1", + }, + timeout=self.HTTP_TIMEOUT_S, + follow_redirects=True, + ) + return self + + async def __aexit__(self, *_: Any) -> None: + if self._client: + await self._client.aclose() + self._client = None + + # ── Helpers ──────────────────────────────────────────────────────────────── + + @staticmethod + def make_txn_id(room_id: str, source_event_id: str) -> str: + """ + Deterministic txn_id for idempotent reply sends. + SHA-256 of (room_id + source_event_id), hex[:32]. + """ + raw = f"{room_id}:{source_event_id}".encode() + return hashlib.sha256(raw).hexdigest()[:32] + + def is_duplicate(self, event_id: str) -> bool: + """True if event_id was seen before (dedupe for incoming events).""" + return self._dedupe.contains(event_id) + + def mark_seen(self, event_id: str) -> None: + """Mark incoming event_id as processed.""" + self._dedupe.add(event_id) + + def _url(self, path: str) -> str: + return f"{self._hs}/_matrix/client/v3{path}" + + def _ensure_client(self) -> httpx.AsyncClient: + if self._client is None: + raise RuntimeError( + "MatrixClient not initialised — use 'async with MatrixClient(...) as client:'" + ) + return self._client + + async def _request_with_retry( + self, + method: str, + path: str, + *, + json: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + ) -> Dict[str, Any]: + """ + Execute HTTP request with exponential backoff on 429/5xx. + Raises httpx.HTTPStatusError on final failure. + """ + client = self._ensure_client() + url = self._url(path) + backoff = self.BACKOFF_INITIAL + + for attempt in range(1, self.MAX_RETRIES + 1): + try: + resp = await client.request( + method, + url, + json=json, + params=params, + timeout=timeout or self.HTTP_TIMEOUT_S, + ) + if resp.status_code == 429: + retry_after = float( + resp.json().get("retry_after_ms", backoff * 1000) + ) / 1000.0 + logger.warning( + "Rate limited by homeserver, retry in %.1fs (attempt %d/%d)", + retry_after, attempt, self.MAX_RETRIES, + ) + if attempt < self.MAX_RETRIES: + await asyncio.sleep(retry_after) + backoff *= 2 + continue + resp.raise_for_status() + return resp.json() + + except (httpx.ConnectError, httpx.TimeoutException) as exc: + logger.warning( + "Network error on %s %s (attempt %d/%d): %s", + method, path, attempt, self.MAX_RETRIES, exc, + ) + if attempt < self.MAX_RETRIES: + await asyncio.sleep(backoff) + backoff *= 2 + else: + raise + + except httpx.HTTPStatusError as exc: + if exc.response.status_code >= 500 and attempt < self.MAX_RETRIES: + logger.warning( + "Server error %d on %s %s, retry (attempt %d/%d)", + exc.response.status_code, method, path, attempt, self.MAX_RETRIES, + ) + await asyncio.sleep(backoff) + backoff *= 2 + else: + raise + + raise RuntimeError(f"Exhausted {self.MAX_RETRIES} retries for {method} {path}") + + # ── Public API ───────────────────────────────────────────────────────────── + + async def whoami(self) -> Dict[str, Any]: + """GET /account/whoami — returns {user_id, device_id, ...}""" + return await self._request_with_retry("GET", "/account/whoami") + + async def join_room(self, room_id: str) -> Dict[str, Any]: + """ + POST /join/{room_id} — join the room (safe to call if already joined; + homeserver returns 200 with room_id). + """ + encoded = room_id.replace("!", "%21").replace(":", "%3A") + result = await self._request_with_retry("POST", f"/join/{encoded}", json={}) + logger.info("Joined room %s", room_id) + return result + + async def send_text( + self, room_id: str, text: str, txn_id: str + ) -> Dict[str, Any]: + """ + PUT /rooms/{room_id}/send/m.room.message/{txn_id} + Idempotent: same txn_id → homeserver deduplicates. + Returns {event_id: ...} + """ + encoded_room = room_id.replace("!", "%21").replace(":", "%3A") + path = f"/rooms/{encoded_room}/send/m.room.message/{txn_id}" + result = await self._request_with_retry( + "PUT", + path, + json={"msgtype": "m.text", "body": text}, + ) + logger.debug("Sent message to %s: event_id=%s", room_id, result.get("event_id")) + return result + + async def sync_poll( + self, + since: Optional[str] = None, + filter_id: Optional[str] = None, + timeout_ms: int = SYNC_TIMEOUT_MS, + ) -> Dict[str, Any]: + """ + GET /sync — long-poll for new events. + Returns raw sync response dict. + + Args: + since: next_batch token from previous sync (None for initial). + filter_id: optional filter (to limit event types/rooms). + timeout_ms: how long server should hold the request (default 30s). + """ + params: Dict[str, Any] = {"timeout": timeout_ms} + if since: + params["since"] = since + if filter_id: + params["filter"] = filter_id + + # Add extra buffer beyond timeout_ms for our HTTP timeout + http_timeout = (timeout_ms / 1000.0) + 15.0 + + return await self._request_with_retry( + "GET", "/sync", params=params, timeout=http_timeout + ) + + def extract_room_messages( + self, + sync_response: Dict[str, Any], + room_id: str, + ) -> List[Dict[str, Any]]: + """ + Extract new m.room.message events for a specific room from a sync response. + + Filters out: + - messages from the bot itself (_bot_user_id) + - already-seen event_ids (dedupe) + + Returns list of event dicts with at least: {event_id, sender, content, origin_server_ts}. + """ + rooms_data = sync_response.get("rooms", {}) + join_data = rooms_data.get("join", {}) + room_data = join_data.get(room_id, {}) + timeline = room_data.get("timeline", {}) + events = timeline.get("events", []) + + results = [] + for event in events: + if event.get("type") != "m.room.message": + continue + event_id = event.get("event_id", "") + sender = event.get("sender", "") + + # Skip own messages + if sender == self._bot_user_id: + continue + + # Skip already-processed + if self.is_duplicate(event_id): + continue + + # Only text messages for M1 + content = event.get("content", {}) + if content.get("msgtype") != "m.text": + continue + + results.append(event) + + return results diff --git a/tests/test_matrix_bridge_client.py b/tests/test_matrix_bridge_client.py new file mode 100644 index 00000000..2addc7ea --- /dev/null +++ b/tests/test_matrix_bridge_client.py @@ -0,0 +1,356 @@ +""" +Tests for services/matrix-bridge-dagi/app/matrix_client.py + +Uses monkeypatching of httpx.AsyncClient — no real Synapse required. +""" + +import asyncio +import hashlib +import sys +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# Make the bridge app importable without installing +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.matrix_client import MatrixClient, _LRUSet # noqa: E402 + + +def run(coro): + """Helper: run async coroutine in sync test.""" + return asyncio.run(coro) + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +HS = "https://matrix.example.com" +TOKEN = "syt_test_token_abc123" +BOT = "@dagi_bridge:example.com" +ROOM = "!testroom:example.com" + + +def _make_client() -> MatrixClient: + return MatrixClient(hs=HS, access_token=TOKEN, bot_user_id=BOT) # type: ignore[call-arg] + + +def _make_client_direct() -> MatrixClient: + return MatrixClient(HS, TOKEN, BOT) + + +def _fake_resp(status_code: int, body: dict) -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = body + resp.raise_for_status = MagicMock() + if status_code >= 400: + from httpx import HTTPStatusError, Request, Response + resp.raise_for_status.side_effect = HTTPStatusError( + "error", request=MagicMock(), response=resp + ) + return resp + + +# ── LRUSet tests ─────────────────────────────────────────────────────────────── + +def test_lru_set_basic(): + s = _LRUSet(maxsize=3) + assert not s.contains("a") + s.add("a") + assert s.contains("a") + + +def test_lru_set_eviction(): + s = _LRUSet(maxsize=2) + s.add("a") + s.add("b") + s.add("c") # evicts "a" + assert not s.contains("a") + assert s.contains("b") + assert s.contains("c") + + +def test_lru_set_moves_to_end_on_access(): + s = _LRUSet(maxsize=2) + s.add("a") + s.add("b") + s.contains("a") # refreshes "a" + s.add("c") # evicts "b", not "a" + assert s.contains("a") + assert not s.contains("b") + assert s.contains("c") + + +# ── make_txn_id ──────────────────────────────────────────────────────────────── + +def test_make_txn_id_deterministic(): + tid1 = MatrixClient.make_txn_id(ROOM, "$event1") + tid2 = MatrixClient.make_txn_id(ROOM, "$event1") + assert tid1 == tid2 + assert len(tid1) == 32 # first 32 hex chars of sha256 + + +def test_make_txn_id_different_events(): + tid1 = MatrixClient.make_txn_id(ROOM, "$event1") + tid2 = MatrixClient.make_txn_id(ROOM, "$event2") + assert tid1 != tid2 + + +def test_make_txn_id_different_rooms(): + tid1 = MatrixClient.make_txn_id("!room1:x", "$event1") + tid2 = MatrixClient.make_txn_id("!room2:x", "$event1") + assert tid1 != tid2 + + +# ── Context manager guard ────────────────────────────────────────────────────── + +def test_client_not_initialised_raises(): + async def _inner(): + client = _make_client_direct() + with pytest.raises(RuntimeError, match="not initialised"): + await client.whoami() + run(_inner()) + + +# ── whoami ───────────────────────────────────────────────────────────────────── + +def test_whoami_success(): + async def _inner(): + client = _make_client_direct() + async with client: + client._client.request = AsyncMock( + return_value=_fake_resp(200, {"user_id": BOT, "device_id": "DAGI1"}) + ) + result = await client.whoami() + assert result["user_id"] == BOT + run(_inner()) + + +# ── join_room ────────────────────────────────────────────────────────────────── + +def test_join_room_success(): + async def _inner(): + client = _make_client_direct() + async with client: + client._client.request = AsyncMock( + return_value=_fake_resp(200, {"room_id": ROOM}) + ) + result = await client.join_room(ROOM) + assert result["room_id"] == ROOM + run(_inner()) + + +# ── send_text ────────────────────────────────────────────────────────────────── + +def test_send_text_success(): + async def _inner(): + client = _make_client_direct() + event_id = "$out_event1" + txn_id = MatrixClient.make_txn_id(ROOM, "$source_event1") + async with client: + client._client.request = AsyncMock( + return_value=_fake_resp(200, {"event_id": event_id}) + ) + result = await client.send_text(ROOM, "Hello DAGI!", txn_id) + assert result["event_id"] == event_id + run(_inner()) + + +def test_send_text_uses_correct_method_and_path(): + async def _inner(): + client = _make_client_direct() + txn_id = MatrixClient.make_txn_id(ROOM, "$src") + captured = {} + + async def fake_request(method, url, **kwargs): + captured["method"] = method + captured["url"] = url + captured["json"] = kwargs.get("json") + return _fake_resp(200, {"event_id": "$out"}) + + async with client: + client._client.request = fake_request + await client.send_text(ROOM, "test", txn_id) + + assert captured["method"] == "PUT" + assert "send/m.room.message" in captured["url"] + assert txn_id in captured["url"] + assert captured["json"]["msgtype"] == "m.text" + assert captured["json"]["body"] == "test" + run(_inner()) + + +# ── sync_poll ────────────────────────────────────────────────────────────────── + +def test_sync_poll_initial(): + async def _inner(): + client = _make_client_direct() + sync_data = { + "next_batch": "s1_token", + "rooms": {"join": {ROOM: {"timeline": {"events": []}}}} + } + async with client: + client._client.request = AsyncMock( + return_value=_fake_resp(200, sync_data) + ) + result = await client.sync_poll() + assert result["next_batch"] == "s1_token" + run(_inner()) + + +def test_sync_poll_passes_since(): + async def _inner(): + client = _make_client_direct() + captured = {} + + async def fake_request(method, url, **kwargs): + captured["params"] = kwargs.get("params", {}) + return _fake_resp(200, {"next_batch": "s2"}) + + async with client: + client._client.request = fake_request + await client.sync_poll(since="s1_token") + + assert captured["params"].get("since") == "s1_token" + run(_inner()) + + +# ── extract_room_messages ────────────────────────────────────────────────────── + +def _make_sync_with_events(events: list) -> dict: + return { + "next_batch": "s_test", + "rooms": { + "join": { + ROOM: { + "timeline": { + "events": events + } + } + } + } + } + + +def test_extract_filters_own_messages(): + client = _make_client_direct() + sync = _make_sync_with_events([ + { + "type": "m.room.message", + "event_id": "$e1", + "sender": BOT, # own message + "content": {"msgtype": "m.text", "body": "I said this"}, + "origin_server_ts": 1000, + } + ]) + msgs = client.extract_room_messages(sync, ROOM) + assert msgs == [] + + +def test_extract_filters_non_text(): + client = _make_client_direct() + sync = _make_sync_with_events([ + { + "type": "m.room.message", + "event_id": "$e2", + "sender": "@user:example.com", + "content": {"msgtype": "m.image", "url": "mxc://..."}, + "origin_server_ts": 1000, + } + ]) + msgs = client.extract_room_messages(sync, ROOM) + assert msgs == [] + + +def test_extract_filters_duplicate_events(): + client = _make_client_direct() + event = { + "type": "m.room.message", + "event_id": "$e3", + "sender": "@user:example.com", + "content": {"msgtype": "m.text", "body": "hello"}, + "origin_server_ts": 1000, + } + sync = _make_sync_with_events([event]) + + client.mark_seen("$e3") + msgs = client.extract_room_messages(sync, ROOM) + assert msgs == [] + + +def test_extract_returns_new_text_messages(): + client = _make_client_direct() + events = [ + { + "type": "m.room.message", + "event_id": "$e4", + "sender": "@user:example.com", + "content": {"msgtype": "m.text", "body": "What is AgroMatrix?"}, + "origin_server_ts": 2000, + }, + { + "type": "m.room.message", + "event_id": "$e5", + "sender": "@user2:example.com", + "content": {"msgtype": "m.text", "body": "Hello!"}, + "origin_server_ts": 3000, + }, + ] + sync = _make_sync_with_events(events) + msgs = client.extract_room_messages(sync, ROOM) + assert len(msgs) == 2 + assert msgs[0]["event_id"] == "$e4" + assert msgs[1]["event_id"] == "$e5" + + +def test_extract_ignores_other_room(): + client = _make_client_direct() + other_room = "!otherroom:example.com" + sync = { + "next_batch": "s_x", + "rooms": { + "join": { + other_room: { + "timeline": { + "events": [{ + "type": "m.room.message", + "event_id": "$e6", + "sender": "@user:example.com", + "content": {"msgtype": "m.text", "body": "hi"}, + "origin_server_ts": 1000, + }] + } + } + } + } + } + msgs = client.extract_room_messages(sync, ROOM) + assert msgs == [] + + +# ── Retry on 429 ─────────────────────────────────────────────────────────────── + +def test_retry_on_rate_limit(): + async def _inner(): + client = _make_client_direct() + call_count = 0 + + async def fake_request(method, url, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 3: + return _fake_resp(429, {"retry_after_ms": 10}) + return _fake_resp(200, {"user_id": BOT}) + + async with client: + client._client.request = fake_request + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await client.whoami() + + assert result["user_id"] == BOT + assert call_count == 3 + run(_inner())