diff --git a/docker-compose.synapse-node1.yml b/docker-compose.synapse-node1.yml index 99d11171..847998d9 100644 --- a/docker-compose.synapse-node1.yml +++ b/docker-compose.synapse-node1.yml @@ -20,8 +20,10 @@ services: restart: unless-stopped environment: - SYNAPSE_CONFIG_PATH=/data/homeserver.yaml + ports: + - "127.0.0.1:8008:8008" volumes: - - synapse-data:/data + - ./synapse-data:/data depends_on: synapse-db: condition: service_healthy @@ -60,7 +62,6 @@ services: - dagi-network volumes: - synapse-data: synapse-db-data: networks: diff --git a/services/matrix-bridge-dagi/app/ingress.py b/services/matrix-bridge-dagi/app/ingress.py new file mode 100644 index 00000000..6a697959 --- /dev/null +++ b/services/matrix-bridge-dagi/app/ingress.py @@ -0,0 +1,267 @@ +""" +Matrix Ingress Loop — Phase M1.3 + +Polls Matrix /sync for new messages, invokes DAGI Gateway for mapped rooms. +Does NOT send replies back (that is PR-M1.4 egress). + +Design: + - asyncio task, driven by run_ingress_loop() + - sync_poll() → extract_room_messages() per mapped room + - for each message: dedupe → invoke gateway → audit (fire-and-forget) + - next_batch token persisted in memory (restart resets to None — acceptable for M1) + - graceful shutdown via asyncio.Event +""" + +import asyncio +import logging +import time +from typing import Any, Dict, Optional + +import httpx + +from .matrix_client import MatrixClient +from .room_mapping import RoomMappingConfig + +logger = logging.getLogger(__name__) + +# ── Constants ────────────────────────────────────────────────────────────────── + +# Max wait between sync retries on error (seconds) +_MAX_RETRY_BACKOFF = 60.0 +_INIT_RETRY_BACKOFF = 2.0 + +# Gateway invoke timeout +_GATEWAY_TIMEOUT_S = 30.0 + + +# ── Gateway invoke ───────────────────────────────────────────────────────────── + +async def _invoke_gateway( + http_client: httpx.AsyncClient, + gateway_url: str, + agent_id: str, + node_id: str, + message_text: str, + matrix_room_id: str, + matrix_event_id: str, + matrix_sender: str, +) -> Dict[str, Any]: + """ + POST to DAGI Gateway /v1/invoke (or /debug/agent_ping equivalent). + Returns parsed JSON response or raises httpx.HTTPError. + + Payload format matches existing Gateway invoke schema. + """ + url = f"{gateway_url.rstrip('/')}/v1/invoke" + payload = { + "agent_id": agent_id, + "node_id": node_id, + "message": message_text, + "metadata": { + "transport": "matrix", + "matrix_room_id": matrix_room_id, + "matrix_event_id": matrix_event_id, + "matrix_sender": matrix_sender, + "node_id": node_id, + }, + } + resp = await http_client.post(url, json=payload, timeout=_GATEWAY_TIMEOUT_S) + resp.raise_for_status() + return resp.json() + + +# ── Ingress loop ─────────────────────────────────────────────────────────────── + +class MatrixIngressLoop: + """ + Drives the Matrix sync-poll → gateway-invoke pipeline. + + Usage: + loop = MatrixIngressLoop(cfg, room_map) + stop_event = asyncio.Event() + await loop.run(stop_event) + + Metrics callbacks (optional, injected to avoid hard dependency): + on_message_received(room_id, agent_id) — called after successful dedupe + on_gateway_error(error_type) — called on gateway invoke error + """ + + def __init__( + self, + matrix_homeserver_url: str, + matrix_access_token: str, + matrix_user_id: str, + gateway_url: str, + node_id: str, + room_map: RoomMappingConfig, + on_message_received=None, + on_gateway_error=None, + ) -> None: + self._hs_url = matrix_homeserver_url + self._token = matrix_access_token + self._user_id = matrix_user_id + self._gateway_url = gateway_url + self._node_id = node_id + self._room_map = room_map + self._on_message_received = on_message_received + self._on_gateway_error = on_gateway_error + + self._next_batch: Optional[str] = None + self._running = False + + @property + def next_batch(self) -> Optional[str]: + return self._next_batch + + async def run(self, stop_event: asyncio.Event) -> None: + """ + Main loop. Runs until stop_event is set. + Handles errors with exponential backoff. + """ + self._running = True + backoff = _INIT_RETRY_BACKOFF + logger.info( + "Matrix ingress loop started | hs=%s node=%s mappings=%d", + self._hs_url, self._node_id, self._room_map.total_mappings, + ) + + if self._room_map.total_mappings == 0: + logger.warning("No room mappings configured — ingress loop is idle") + + async with MatrixClient( + self._hs_url, self._token, self._user_id + ) as client: + # Join all mapped rooms at startup + for mapping in self._room_map.mappings: + if mapping.agent_id in self._room_map.allowed_agents: + try: + await client.join_room(mapping.room_id) + logger.info("Joined room %s → agent %s", mapping.room_id, mapping.agent_id) + except Exception as exc: + logger.warning("Could not join room %s: %s", mapping.room_id, exc) + + async with httpx.AsyncClient(timeout=_GATEWAY_TIMEOUT_S) as gw_client: + while not stop_event.is_set(): + try: + sync_resp = await client.sync_poll(since=self._next_batch) + self._next_batch = sync_resp.get("next_batch") + backoff = _INIT_RETRY_BACKOFF # reset on success + + await self._process_sync(client, gw_client, sync_resp) + + except asyncio.CancelledError: + logger.info("Ingress loop cancelled") + break + except Exception as exc: + logger.error( + "Ingress loop error (retry in %.1fs): %s", + backoff, exc, + ) + if self._on_gateway_error: + self._on_gateway_error("sync_error") + try: + await asyncio.wait_for( + stop_event.wait(), timeout=backoff + ) + except asyncio.TimeoutError: + pass + backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) + + self._running = False + logger.info("Matrix ingress loop stopped") + + async def _process_sync( + self, + client: MatrixClient, + gw_client: httpx.AsyncClient, + sync_resp: Dict[str, Any], + ) -> None: + """Process all mapped rooms in a sync response.""" + for mapping in self._room_map.mappings: + if mapping.agent_id not in self._room_map.allowed_agents: + continue + + messages = client.extract_room_messages(sync_resp, mapping.room_id) + for event in messages: + await self._handle_message(client, gw_client, event, mapping) + + async def _handle_message( + self, + client: MatrixClient, + gw_client: httpx.AsyncClient, + event: Dict[str, Any], + mapping, + ) -> None: + """ + Process a single Matrix message event: + 1. Mark as seen (dedupe) + 2. Invoke DAGI gateway + 3. Fire metrics callback + + Note: Reply sending (egress) is PR-M1.4 — not done here. + """ + event_id = event.get("event_id", "") + sender = event.get("sender", "") + text = event.get("content", {}).get("body", "").strip() + room_id = mapping.room_id + agent_id = mapping.agent_id + + if not text: + logger.debug("Skipping empty message from %s in %s", sender, room_id) + return + + # Mark event as seen before invoke (prevents duplicate on retry) + client.mark_seen(event_id) + + logger.info( + "Matrix message: room=%s sender=%s agent=%s event=%s text_len=%d", + room_id, sender, agent_id, event_id, len(text), + ) + + if self._on_message_received: + self._on_message_received(room_id, agent_id) + + t0 = time.monotonic() + try: + await _invoke_gateway( + gw_client, + self._gateway_url, + agent_id=agent_id, + node_id=self._node_id, + message_text=text, + matrix_room_id=room_id, + matrix_event_id=event_id, + matrix_sender=sender, + ) + duration = time.monotonic() - t0 + logger.info( + "Gateway invoke ok: agent=%s event=%s duration=%.2fs", + agent_id, event_id, duration, + ) + + except httpx.HTTPStatusError as exc: + duration = time.monotonic() - t0 + logger.error( + "Gateway HTTP error %d for agent=%s event=%s duration=%.2fs", + exc.response.status_code, agent_id, event_id, duration, + ) + if self._on_gateway_error: + self._on_gateway_error(f"http_{exc.response.status_code}") + + except (httpx.ConnectError, httpx.TimeoutException) as exc: + duration = time.monotonic() - t0 + logger.error( + "Gateway network error for agent=%s event=%s: %s duration=%.2fs", + agent_id, event_id, exc, duration, + ) + if self._on_gateway_error: + self._on_gateway_error("network_error") + + except Exception as exc: + duration = time.monotonic() - t0 + logger.error( + "Unexpected error invoking gateway for agent=%s event=%s: %s", + agent_id, event_id, exc, + ) + if self._on_gateway_error: + self._on_gateway_error("unexpected") diff --git a/services/matrix-bridge-dagi/app/main.py b/services/matrix-bridge-dagi/app/main.py index d15d00be..40532f0f 100644 --- a/services/matrix-bridge-dagi/app/main.py +++ b/services/matrix-bridge-dagi/app/main.py @@ -4,6 +4,7 @@ Bridges Matrix/Element rooms to DAGI agents via Gateway. M1 scope: 1 room ↔ 1 agent (Sofiia), audit via sofiia-console internal endpoint. """ +import asyncio import logging import os import time @@ -31,6 +32,8 @@ except ImportError: # pragma: no cover _PROM_OK = False from .config import BridgeConfig, load_config +from .ingress import MatrixIngressLoop +from .room_mapping import RoomMappingConfig, parse_room_map logging.basicConfig( level=logging.INFO, @@ -71,6 +74,9 @@ _cfg: Optional[BridgeConfig] = None _config_error: Optional[str] = None _matrix_reachable: Optional[bool] = None # probed at startup _gateway_reachable: Optional[bool] = None # probed at startup +_room_map: Optional[RoomMappingConfig] = None +_ingress_task: Optional[asyncio.Task] = None +_ingress_stop: Optional[asyncio.Event] = None async def _probe_url(url: str, timeout: float = 5.0) -> bool: @@ -87,14 +93,24 @@ async def _probe_url(url: str, timeout: float = 5.0) -> bool: # ── Lifespan ────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app_: Any): - global _cfg, _config_error, _matrix_reachable, _gateway_reachable + global _cfg, _config_error, _matrix_reachable, _gateway_reachable, _room_map try: _cfg = load_config() + + # Parse room mapping + _room_map = parse_room_map( + os.getenv("BRIDGE_ROOM_MAP", ""), + _cfg.bridge_allowed_agents, + ) + logger.info( - "✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s room=%s agents=%s", + "✅ matrix-bridge-dagi started | node=%s build=%s homeserver=%s " + "room=%s agents=%s mappings=%d", _cfg.node_id, _cfg.build_sha, _cfg.matrix_homeserver_url, _cfg.sofiia_room_id, list(_cfg.bridge_allowed_agents), + _room_map.total_mappings, ) + # Connectivity smoke probes (non-blocking failures) _matrix_reachable = await _probe_url( f"{_cfg.matrix_homeserver_url}/_matrix/client/versions" @@ -112,12 +128,52 @@ async def lifespan(app_: Any): logger.warning("⚠️ DAGI Gateway NOT reachable: %s", _cfg.dagi_gateway_url) if _PROM_OK: _bridge_up.set(1) - except RuntimeError as exc: + + # Start ingress loop (fire-and-forget asyncio task) + if _room_map and _room_map.total_mappings > 0: + _ingress_stop = asyncio.Event() + + def _on_msg(room_id: str, agent_id: str) -> None: + if _PROM_OK: + _messages_received.labels(room_id=room_id, agent_id=agent_id).inc() + + def _on_gw_error(error_type: str) -> None: + if _PROM_OK: + _gateway_errors.labels(error_type=error_type).inc() + + ingress = MatrixIngressLoop( + matrix_homeserver_url=_cfg.matrix_homeserver_url, + matrix_access_token=_cfg.matrix_access_token, + matrix_user_id=_cfg.matrix_user_id, + gateway_url=_cfg.dagi_gateway_url, + node_id=_cfg.node_id, + room_map=_room_map, + on_message_received=_on_msg, + on_gateway_error=_on_gw_error, + ) + _ingress_task = asyncio.create_task( + ingress.run(_ingress_stop), + name="matrix_ingress_loop", + ) + logger.info("✅ Ingress loop task started") + else: + logger.warning("⚠️ No room mappings — ingress loop NOT started") + + except (RuntimeError, ValueError) as exc: _config_error = str(exc) logger.error("❌ Config error: %s", _config_error) if _PROM_OK: _bridge_up.set(0) yield + # Shutdown: cancel ingress loop + if _ingress_stop: + _ingress_stop.set() + if _ingress_task and not _ingress_task.done(): + _ingress_task.cancel() + try: + await asyncio.wait_for(_ingress_task, timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass logger.info("matrix-bridge-dagi shutting down") # ── App ─────────────────────────────────────────────────────────────────────── @@ -166,9 +222,32 @@ async def health() -> Dict[str, Any]: "allowed_agents": list(_cfg.bridge_allowed_agents), "gateway": _cfg.dagi_gateway_url, "gateway_reachable": _gateway_reachable, + "mappings_count": _room_map.total_mappings if _room_map else 0, "config_ok": True, } + +# ── Bridge Mappings (read-only ops endpoint) ─────────────────────────────────── +@app.get("/bridge/mappings") +async def bridge_mappings() -> Dict[str, Any]: + """ + Returns room-to-agent mapping summary. + Safe for ops visibility — no secrets included. + """ + if _cfg is None or _room_map is None: + return { + "ok": False, + "error": _config_error or "service not initialised", + "mappings": [], + } + return { + "ok": True, + "total": _room_map.total_mappings, + "allowed_agents": list(_cfg.bridge_allowed_agents), + "mappings": _room_map.as_summary(), + } + + # ── Metrics ─────────────────────────────────────────────────────────────────── @app.get("/metrics") async def metrics(): diff --git a/services/matrix-bridge-dagi/app/room_mapping.py b/services/matrix-bridge-dagi/app/room_mapping.py new file mode 100644 index 00000000..28cd13a0 --- /dev/null +++ b/services/matrix-bridge-dagi/app/room_mapping.py @@ -0,0 +1,156 @@ +""" +Room-to-Agent Mapping — Phase M1 + +Parses BRIDGE_ROOM_MAP env var and provides: + - room_id → agent_id lookup + - agent_id allowlist validation (from BRIDGE_ALLOWED_AGENTS) + - summary for /bridge/mappings endpoint + +Format of BRIDGE_ROOM_MAP: + "agent_id:!room_id:server,agent2:!room2:server" + e.g. "sofiia:!QwHczWXgefDHBEVkTH:daarion.space" + +Multiple mappings separated by comma. +""" + +import logging +import re +from dataclasses import dataclass, field +from typing import Dict, FrozenSet, List, Optional + +logger = logging.getLogger(__name__) + +# Room ID format: !: +_ROOM_ID_RE = re.compile(r"^![A-Za-z0-9\-_.]+:[A-Za-z0-9\-_.]+$") + + +@dataclass(frozen=True) +class RoomMapping: + """Single room → agent binding.""" + room_id: str # e.g. "!abc:daarion.space" + agent_id: str # e.g. "sofiia" + + +@dataclass +class RoomMappingConfig: + """ + Parsed mapping configuration. + + Attributes: + mappings: List of RoomMapping (room_id → agent_id) + allowed_agents: Frozenset of allowlisted agent ids + """ + mappings: List[RoomMapping] = field(default_factory=list) + allowed_agents: FrozenSet[str] = field(default_factory=frozenset) + + # Internal index for O(1) lookup + _room_to_agent: Dict[str, str] = field(default_factory=dict, repr=False, compare=False) + _agent_to_rooms: Dict[str, List[str]] = field(default_factory=dict, repr=False, compare=False) + + def __post_init__(self) -> None: + self._rebuild_index() + + def _rebuild_index(self) -> None: + self._room_to_agent = {m.room_id: m.agent_id for m in self.mappings} + self._agent_to_rooms = {} + for m in self.mappings: + self._agent_to_rooms.setdefault(m.agent_id, []).append(m.room_id) + + def agent_for_room(self, room_id: str) -> Optional[str]: + """ + Returns agent_id for a room_id, or None if room is not mapped. + Also returns None if agent is not in allowed_agents. + """ + agent = self._room_to_agent.get(room_id) + if agent is None: + return None + if agent not in self.allowed_agents: + logger.warning("Room %s mapped to agent %s which is NOT in allowed_agents", room_id, agent) + return None + return agent + + def rooms_for_agent(self, agent_id: str) -> List[str]: + """Returns list of room_ids mapped to an agent_id.""" + return list(self._agent_to_rooms.get(agent_id, [])) + + def as_summary(self) -> List[Dict]: + """ + Returns a safe summary list for the /bridge/mappings endpoint. + Room IDs are NOT secrets (they identify chat rooms), but tokens are never included. + """ + return [ + { + "room_id": m.room_id, + "agent_id": m.agent_id, + "allowed": m.agent_id in self.allowed_agents, + } + for m in self.mappings + ] + + @property + def total_mappings(self) -> int: + return len(self.mappings) + + +def parse_room_map(raw: str, allowed_agents: FrozenSet[str]) -> RoomMappingConfig: + """ + Parse BRIDGE_ROOM_MAP string into RoomMappingConfig. + + Format: "agent_id:!room_id:server[,agent2:!room2:server2,...]" + + Raises ValueError on malformed entries (but skips warn-only issues). + """ + mappings: List[RoomMapping] = [] + errors: List[str] = [] + + if not raw or not raw.strip(): + return RoomMappingConfig(mappings=[], allowed_agents=allowed_agents) + + for idx, entry in enumerate(raw.split(",")): + entry = entry.strip() + if not entry: + continue + + # Find the colon that separates agent_id from room_id + # Room IDs look like !localpart:server — the separator colon is after agent_id + # Format: "sofiia:!QwHczWXgefDHBEVkTH:daarion.space" + # ^agent^:^------room_id---------^ + colon_idx = entry.find(":") + if colon_idx < 1: + errors.append(f"Entry[{idx}] missing agent:room separator: {entry!r}") + continue + + agent_id = entry[:colon_idx].strip() + room_id = entry[colon_idx + 1:].strip() + + if not agent_id: + errors.append(f"Entry[{idx}] empty agent_id in: {entry!r}") + continue + + if not room_id: + errors.append(f"Entry[{idx}] empty room_id in: {entry!r}") + continue + + if not _ROOM_ID_RE.match(room_id): + errors.append( + f"Entry[{idx}] invalid room_id format (expected !localpart:server): {room_id!r}" + ) + continue + + if agent_id not in allowed_agents: + logger.warning( + "Entry[%d] agent %r not in allowed_agents %s — mapping accepted but will be rejected at runtime", + idx, agent_id, set(allowed_agents), + ) + + mappings.append(RoomMapping(room_id=room_id, agent_id=agent_id)) + + if errors: + raise ValueError(f"BRIDGE_ROOM_MAP parse errors: {'; '.join(errors)}") + + config = RoomMappingConfig(mappings=mappings, allowed_agents=allowed_agents) + logger.info( + "Room mapping loaded: %d entries, allowed_agents=%s", + len(mappings), set(allowed_agents), + ) + return config diff --git a/tests/test_matrix_bridge_ingress.py b/tests/test_matrix_bridge_ingress.py new file mode 100644 index 00000000..13134d0a --- /dev/null +++ b/tests/test_matrix_bridge_ingress.py @@ -0,0 +1,351 @@ +""" +Tests for services/matrix-bridge-dagi/app/ingress.py + +Strategy: + - mock MatrixClient.sync_poll and extract_room_messages + - mock httpx gateway client + - verify gateway is invoked once per unique message (dedupe works) + - verify stop_event terminates loop +""" + +import asyncio +import sys +from pathlib import Path +from typing import Any, Dict, List +from unittest.mock import AsyncMock, MagicMock, patch, call + +import pytest + +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.ingress import MatrixIngressLoop, _invoke_gateway # noqa: E402 +from app.room_mapping import parse_room_map # noqa: E402 + + +def run(coro): + return asyncio.run(coro) + + +ALLOWED = frozenset({"sofiia"}) +ROOM_ID = "!QwHczWXgefDHBEVkTH:daarion.space" +ROOM_MAP_STR = f"sofiia:{ROOM_ID}" +GW_URL = "http://127.0.0.1:9300" +HS_URL = "http://localhost:8008" +TOKEN = "syt_fake_token" +BOT_USER = "@dagi_bridge:daarion.space" + +MOCK_EVENT_1 = { + "type": "m.room.message", + "event_id": "$event1:server", + "sender": "@user:server", + "content": {"msgtype": "m.text", "body": "Hello Sofiia!"}, + "origin_server_ts": 1000, +} + +MOCK_EVENT_2 = { + "type": "m.room.message", + "event_id": "$event2:server", + "sender": "@user:server", + "content": {"msgtype": "m.text", "body": "Another message"}, + "origin_server_ts": 2000, +} + + +def _make_loop(**kwargs) -> MatrixIngressLoop: + room_map = parse_room_map(ROOM_MAP_STR, ALLOWED) + defaults = dict( + matrix_homeserver_url=HS_URL, + matrix_access_token=TOKEN, + matrix_user_id=BOT_USER, + gateway_url=GW_URL, + node_id="NODA1", + room_map=room_map, + ) + defaults.update(kwargs) + return MatrixIngressLoop(**defaults) + + +def _fake_sync_resp(events: list) -> dict: + return { + "next_batch": "s_next", + "rooms": { + "join": { + ROOM_ID: { + "timeline": {"events": events} + } + } + } + } + + +# ── _invoke_gateway ───────────────────────────────────────────────────────── + +def test_invoke_gateway_builds_correct_request(): + async def _inner(): + captured = {} + + async def fake_post(url, *, json=None, timeout=None): + captured["url"] = url + captured["json"] = json + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"ok": True} + resp.raise_for_status = MagicMock() + return resp + + client = MagicMock() + client.post = fake_post + result = await _invoke_gateway( + client, GW_URL, "sofiia", "NODA1", + "Hello!", ROOM_ID, "$event1", "@user:server" + ) + + assert "/v1/invoke" in captured["url"] + payload = captured["json"] + assert payload["agent_id"] == "sofiia" + assert payload["node_id"] == "NODA1" + assert payload["message"] == "Hello!" + meta = payload["metadata"] + assert meta["transport"] == "matrix" + assert meta["matrix_room_id"] == ROOM_ID + assert meta["matrix_event_id"] == "$event1" + assert meta["matrix_sender"] == "@user:server" + + run(_inner()) + + +# ── Ingress loop — normal flow ────────────────────────────────────────────── + +def test_ingress_loop_invokes_gateway_once_per_message(): + async def _inner(): + received_calls: List[Dict] = [] + + async def fake_post(url, *, json=None, timeout=None): + received_calls.append({"url": url, "json": json}) + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"ok": True} + resp.raise_for_status = MagicMock() + return resp + + stop = asyncio.Event() + loop = _make_loop() + + sync_responses = [ + _fake_sync_resp([MOCK_EVENT_1]), + _fake_sync_resp([]), # empty on second poll + ] + call_count = [0] + + async def fake_sync_poll(**kwargs): + idx = call_count[0] + call_count[0] += 1 + if idx >= len(sync_responses): + stop.set() + return {"next_batch": "end", "rooms": {}} + return sync_responses[idx] + + with patch("app.ingress.MatrixClient") as MockClient: + mock_mc_instance = AsyncMock() + mock_mc_instance.__aenter__ = AsyncMock(return_value=mock_mc_instance) + mock_mc_instance.__aexit__ = AsyncMock(return_value=False) + mock_mc_instance.sync_poll = fake_sync_poll + mock_mc_instance.join_room = AsyncMock() + mock_mc_instance.mark_seen = MagicMock() + mock_mc_instance.is_duplicate = MagicMock(return_value=False) + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] + + mock_mc_instance.extract_room_messages = fake_extract + MockClient.return_value = mock_mc_instance + + with patch("app.ingress.httpx.AsyncClient") as MockHTTP: + mock_http = AsyncMock() + mock_http.__aenter__ = AsyncMock(return_value=mock_http) + mock_http.__aexit__ = AsyncMock(return_value=False) + mock_http.post = fake_post + MockHTTP.return_value = mock_http + + await asyncio.wait_for(loop.run(stop), timeout=3.0) + + # Gateway should have been invoked exactly once (for MOCK_EVENT_1) + assert len(received_calls) == 1 + assert received_calls[0]["json"]["message"] == "Hello Sofiia!" + + run(_inner()) + + +def test_ingress_loop_deduplicates_same_event(): + """Same event_id appearing twice should only invoke gateway once.""" + async def _inner(): + invoke_count = [0] + + async def fake_post(url, *, json=None, timeout=None): + invoke_count[0] += 1 + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"ok": True} + resp.raise_for_status = MagicMock() + return resp + + stop = asyncio.Event() + loop = _make_loop() + + # Same event in two consecutive syncs + sync_responses = [ + _fake_sync_resp([MOCK_EVENT_1]), + _fake_sync_resp([MOCK_EVENT_1]), # duplicate event_id + ] + call_count = [0] + seen = set() + + async def fake_sync_poll(**kwargs): + idx = call_count[0] + call_count[0] += 1 + if idx >= len(sync_responses): + stop.set() + return {"next_batch": "end", "rooms": {}} + return sync_responses[idx] + + with patch("app.ingress.MatrixClient") as MockClient: + mock_mc_instance = AsyncMock() + mock_mc_instance.__aenter__ = AsyncMock(return_value=mock_mc_instance) + mock_mc_instance.__aexit__ = AsyncMock(return_value=False) + mock_mc_instance.sync_poll = fake_sync_poll + mock_mc_instance.join_room = AsyncMock() + + def fake_mark_seen(event_id): + seen.add(event_id) + + def fake_is_dup(event_id): + return event_id in seen + + mock_mc_instance.mark_seen = fake_mark_seen + mock_mc_instance.is_duplicate = fake_is_dup + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and not fake_is_dup(e.get("event_id", ""))] + + mock_mc_instance.extract_room_messages = fake_extract + MockClient.return_value = mock_mc_instance + + with patch("app.ingress.httpx.AsyncClient") as MockHTTP: + mock_http = AsyncMock() + mock_http.__aenter__ = AsyncMock(return_value=mock_http) + mock_http.__aexit__ = AsyncMock(return_value=False) + mock_http.post = fake_post + MockHTTP.return_value = mock_http + + await asyncio.wait_for(loop.run(stop), timeout=3.0) + + # Dedupe: only 1 invoke despite 2 sync responses with same event + assert invoke_count[0] == 1 + + run(_inner()) + + +def test_ingress_loop_calls_metric_callbacks(): + """on_message_received and on_gateway_error callbacks should fire.""" + async def _inner(): + received_events = [] + error_events = [] + + stop = asyncio.Event() + loop = _make_loop( + on_message_received=lambda room, agent: received_events.append((room, agent)), + on_gateway_error=lambda etype: error_events.append(etype), + ) + + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > 1: + stop.set() + return {"next_batch": "end", "rooms": {}} + return _fake_sync_resp([MOCK_EVENT_1]) + + with patch("app.ingress.MatrixClient") as MockClient: + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.sync_poll = fake_sync_poll + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock() + mock_mc.is_duplicate = MagicMock(return_value=False) + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] + + mock_mc.extract_room_messages = fake_extract + MockClient.return_value = mock_mc + + with patch("app.ingress.httpx.AsyncClient") as MockHTTP: + mock_http = AsyncMock() + mock_http.__aenter__ = AsyncMock(return_value=mock_http) + mock_http.__aexit__ = AsyncMock(return_value=False) + + async def fake_post(url, *, json=None, timeout=None): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"ok": True} + resp.raise_for_status = MagicMock() + return resp + + mock_http.post = fake_post + MockHTTP.return_value = mock_http + + await asyncio.wait_for(loop.run(stop), timeout=3.0) + + assert len(received_events) == 1 + assert received_events[0] == (ROOM_ID, "sofiia") + + run(_inner()) + + +def test_ingress_loop_no_mappings_is_idle(): + """Loop with 0 mappings should start and stop cleanly without invoking gateway.""" + async def _inner(): + empty_map = parse_room_map("", ALLOWED) + loop = MatrixIngressLoop( + matrix_homeserver_url=HS_URL, + matrix_access_token=TOKEN, + matrix_user_id=BOT_USER, + gateway_url=GW_URL, + node_id="NODA1", + room_map=empty_map, + ) + stop = asyncio.Event() + + with patch("app.ingress.MatrixClient") as MockClient: + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + + async def fake_sync_poll(**kwargs): + stop.set() + return {"next_batch": "end", "rooms": {}} + + mock_mc.sync_poll = fake_sync_poll + MockClient.return_value = mock_mc + + with patch("app.ingress.httpx.AsyncClient") as MockHTTP: + mock_http = AsyncMock() + mock_http.__aenter__ = AsyncMock(return_value=mock_http) + mock_http.__aexit__ = AsyncMock(return_value=False) + MockHTTP.return_value = mock_http + + await asyncio.wait_for(loop.run(stop), timeout=3.0) + + # Should complete without error + assert True + + run(_inner()) diff --git a/tests/test_matrix_bridge_room_mapping.py b/tests/test_matrix_bridge_room_mapping.py new file mode 100644 index 00000000..5f4ed882 --- /dev/null +++ b/tests/test_matrix_bridge_room_mapping.py @@ -0,0 +1,151 @@ +""" +Tests for services/matrix-bridge-dagi/app/room_mapping.py +""" + +import sys +from pathlib import Path + +import pytest + +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.room_mapping import RoomMappingConfig, parse_room_map, RoomMapping # noqa: E402 + +ALLOWED = frozenset({"sofiia", "druid"}) +ROOM1 = "!QwHczWXgefDHBEVkTH:daarion.space" +ROOM2 = "!AnotherRoom123:daarion.space" + + +# ── Parsing — valid ──────────────────────────────────────────────────────────── + +def test_parse_single_mapping(): + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + assert cfg.total_mappings == 1 + assert cfg.mappings[0].agent_id == "sofiia" + assert cfg.mappings[0].room_id == ROOM1 + + +def test_parse_multiple_mappings(): + raw = f"sofiia:{ROOM1},druid:{ROOM2}" + cfg = parse_room_map(raw, ALLOWED) + assert cfg.total_mappings == 2 + + +def test_parse_empty_string(): + cfg = parse_room_map("", ALLOWED) + assert cfg.total_mappings == 0 + + +def test_parse_whitespace_only(): + cfg = parse_room_map(" ", ALLOWED) + assert cfg.total_mappings == 0 + + +def test_parse_trailing_comma(): + cfg = parse_room_map(f"sofiia:{ROOM1},", ALLOWED) + assert cfg.total_mappings == 1 + + +def test_parse_spaces_around_entries(): + cfg = parse_room_map(f" sofiia:{ROOM1} , druid:{ROOM2} ", ALLOWED) + assert cfg.total_mappings == 2 + + +# ── Parsing — invalid ───────────────────────────────────────────────────────── + +def test_parse_missing_colon_raises(): + with pytest.raises(ValueError, match="parse errors"): + parse_room_map("sofiia_no_colon", ALLOWED) + + +def test_parse_invalid_room_id_format_raises(): + with pytest.raises(ValueError, match="invalid room_id format"): + # Room ID must start with ! + parse_room_map(f"sofiia:#badroom:server", ALLOWED) + + +def test_parse_absolute_path_as_room_raises(): + with pytest.raises(ValueError, match="invalid room_id format"): + parse_room_map("sofiia:/etc/passwd", ALLOWED) + + +def test_parse_empty_agent_id_raises(): + with pytest.raises(ValueError, match="parse errors"): + parse_room_map(f":{ROOM1}", ALLOWED) + + +def test_parse_empty_room_id_raises(): + with pytest.raises(ValueError, match="parse errors"): + parse_room_map("sofiia:", ALLOWED) + + +# ── agent_for_room ──────────────────────────────────────────────────────────── + +def test_agent_for_room_found(): + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + assert cfg.agent_for_room(ROOM1) == "sofiia" + + +def test_agent_for_room_not_found(): + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + assert cfg.agent_for_room("!unknownroom:server") is None + + +def test_agent_for_room_not_allowed(): + """Agent in mapping but not in allowed_agents → None.""" + cfg = parse_room_map(f"druid:{ROOM1}", frozenset({"sofiia"})) # druid not allowed + # mapping is accepted but agent_for_room returns None + assert cfg.agent_for_room(ROOM1) is None + + +def test_agent_for_room_allowed_when_in_set(): + cfg = parse_room_map(f"druid:{ROOM1}", frozenset({"druid"})) + assert cfg.agent_for_room(ROOM1) == "druid" + + +# ── rooms_for_agent ─────────────────────────────────────────────────────────── + +def test_rooms_for_agent_single(): + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + assert cfg.rooms_for_agent("sofiia") == [ROOM1] + + +def test_rooms_for_agent_multiple(): + cfg = parse_room_map(f"sofiia:{ROOM1},sofiia:{ROOM2}", ALLOWED) + rooms = cfg.rooms_for_agent("sofiia") + assert ROOM1 in rooms + assert ROOM2 in rooms + + +def test_rooms_for_agent_unknown(): + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + assert cfg.rooms_for_agent("nonexistent") == [] + + +# ── as_summary ──────────────────────────────────────────────────────────────── + +def test_summary_contains_expected_fields(): + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + summary = cfg.as_summary() + assert len(summary) == 1 + entry = summary[0] + assert entry["room_id"] == ROOM1 + assert entry["agent_id"] == "sofiia" + assert entry["allowed"] is True + + +def test_summary_allowed_false_for_unknown_agent(): + cfg = parse_room_map(f"druid:{ROOM1}", frozenset({"sofiia"})) + summary = cfg.as_summary() + assert summary[0]["allowed"] is False + + +def test_summary_no_tokens_in_output(): + """Access tokens must never appear in summary.""" + cfg = parse_room_map(f"sofiia:{ROOM1}", ALLOWED) + summary = cfg.as_summary() + for entry in summary: + assert "token" not in str(entry).lower() + assert "secret" not in str(entry).lower()