""" Tests for matrix-bridge-dagi H2: Backpressure queue (reader + workers) Coverage: 1. enqueue_and_process — single event enqueued → worker processes exactly once 2. queue_full_drop — queue full → dropped + on_queue_dropped + audit matrix.queue_full 3. concurrency — 2 events processed concurrently (worker_concurrency=2) 4. graceful_shutdown — stop_event set → in-flight items completed, queue drained 5. queue_wait_metric — on_queue_wait called with agent_id + float 6. rate_limit_before_enqueue — rate-limited event never enters queue """ import asyncio import sys import time from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch _BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" if str(_BRIDGE) not in sys.path: sys.path.insert(0, str(_BRIDGE)) from app.ingress import MatrixIngressLoop, _QueueEntry # noqa: E402 from app.rate_limit import InMemoryRateLimiter # noqa: E402 from app.room_mapping import parse_room_map # noqa: E402 def run(coro): return asyncio.run(coro) ALLOWED = frozenset({"sofiia"}) ROOM_ID = "!QwHczWXgefDHBEVkTH:daarion.space" ROOM_MAP_STR = f"sofiia:{ROOM_ID}" ROUTER_URL = "http://dagi-router-node1:8000" HS_URL = "http://dagi-synapse-node1:8008" CONSOLE_URL = "http://dagi-sofiia-console-node1:8002" INTERNAL_TOKEN = "test_tok" BOT_USER = "@dagi_bridge:daarion.space" USER = "@user:daarion.space" def _make_event(event_id: str = "$e1:s", body: str = "Hello") -> dict: return { "type": "m.room.message", "event_id": event_id, "sender": USER, "content": {"msgtype": "m.text", "body": body}, "origin_server_ts": 1000, } def _fake_sync(events: list) -> dict: return { "next_batch": "s_next", "rooms": {"join": {ROOM_ID: {"timeline": {"events": events}}}}, } def _ok_router_resp(text: str = "Reply!") -> MagicMock: r = MagicMock() r.status_code = 200 r.json.return_value = {"response": text} r.raise_for_status = MagicMock() return r def _audit_resp() -> MagicMock: r = MagicMock() r.status_code = 200 r.json.return_value = {"ok": True} r.raise_for_status = MagicMock() return r def _make_loop(**kwargs) -> MatrixIngressLoop: room_map = parse_room_map(ROOM_MAP_STR, ALLOWED) defaults = dict( matrix_homeserver_url=HS_URL, matrix_access_token="tok", matrix_user_id=BOT_USER, router_url=ROUTER_URL, node_id="NODA1", room_map=room_map, sofiia_console_url=CONSOLE_URL, sofiia_internal_token=INTERNAL_TOKEN, queue_max_events=10, worker_concurrency=1, queue_drain_timeout_s=2.0, ) defaults.update(kwargs) return MatrixIngressLoop(**defaults) def _make_mock_client(events_per_sync: list, stop_after: int = 1) -> tuple: """Returns (mock_client_class, mock_client_instance).""" call_count = [0] seen = set() def fake_extract(sync_resp, room_id): evts = (sync_resp.get("rooms", {}).get("join", {}) .get(room_id, {}).get("timeline", {}).get("events", [])) return [e for e in evts if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER and e.get("event_id") not in seen] def fake_mark_seen(eid): seen.add(eid) async def fake_sync_poll(stop_event_ref=None, **kwargs): call_count[0] += 1 if call_count[0] > stop_after: # Block until cancelled await asyncio.sleep(1000) return _fake_sync(events_per_sync) mock_mc = AsyncMock() mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) mock_mc.__aexit__ = AsyncMock(return_value=False) mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock(side_effect=fake_mark_seen) mock_mc.extract_room_messages = fake_extract mock_mc.send_text = AsyncMock(return_value={"event_id": "$reply"}) mock_mc.sync_poll = fake_sync_poll return mock_mc # ── Test 1: enqueue and process exactly once ─────────────────────────────────── def test_enqueue_and_process_exactly_once(): """One event → router invoked once, send_text called once.""" async def _inner(): invoke_count = [0] send_calls = [] stop = asyncio.Event() loop = _make_loop(worker_concurrency=1) events = [_make_event("$e1:s", "Hello")] mock_mc = _make_mock_client(events, stop_after=1) mock_mc.sync_poll = _stop_after_n_syncs(1, events, stop) async def fake_http_post(url, *, json=None, headers=None, timeout=None): if "/infer" in url: invoke_count[0] += 1 await asyncio.sleep(0.01) return _ok_router_resp("Hi!") return _audit_resp() with patch("app.ingress.MatrixClient") as MC: MC.return_value = mock_mc MC.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MH: mh = AsyncMock() mh.__aenter__ = AsyncMock(return_value=mh) mh.__aexit__ = AsyncMock(return_value=False) mh.post = fake_http_post MH.return_value = mh await asyncio.wait_for(loop.run(stop), timeout=5.0) assert invoke_count[0] == 1 assert mock_mc.send_text.call_count == 1 run(_inner()) def _stop_after_n_syncs(n: int, events: list, stop: asyncio.Event): call_count = [0] async def fake_sync_poll(**kwargs): call_count[0] += 1 if call_count[0] > n: stop.set() await asyncio.sleep(1000) return _fake_sync(events) return fake_sync_poll # ── Test 2: queue full → drop + callback + audit ─────────────────────────────── def test_queue_full_drop_and_audit(): """When queue is full, extra events are dropped with audit matrix.queue_full.""" async def _inner(): dropped = [] audit_events = [] # queue_max=1, worker_concurrency=1 but worker is slow (blocks on barrier) barrier = asyncio.Event() stop = asyncio.Event() loop = _make_loop( queue_max_events=1, worker_concurrency=1, on_queue_dropped=lambda r, a: dropped.append((r, a)), ) # 3 events in one sync — queue holds 1, worker blocked, 2 dropped events = [ _make_event("$e1:s", "msg1"), _make_event("$e2:s", "msg2"), _make_event("$e3:s", "msg3"), ] call_count = [0] async def fake_sync_poll(**kwargs): call_count[0] += 1 if call_count[0] > 1: stop.set() await asyncio.sleep(1000) return _fake_sync(events) async def slow_http_post(url, *, json=None, headers=None, timeout=None): if "/infer" in url: await barrier.wait() # block worker indefinitely return _ok_router_resp() if "/audit/internal" in url: event_name = (json or {}).get("event", "") audit_events.append(event_name) return _audit_resp() mock_mc = AsyncMock() mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) mock_mc.__aexit__ = AsyncMock(return_value=False) mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock() mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) seen: set = set() def fake_extract(sync_resp, room_id): evts = (sync_resp.get("rooms", {}).get("join", {}) .get(room_id, {}).get("timeline", {}).get("events", [])) return [e for e in evts if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER and e.get("event_id") not in seen] mock_mc.extract_room_messages = fake_extract mock_mc.sync_poll = fake_sync_poll with patch("app.ingress.MatrixClient") as MC: MC.return_value = mock_mc MC.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MH: mh = AsyncMock() mh.__aenter__ = AsyncMock(return_value=mh) mh.__aexit__ = AsyncMock(return_value=False) mh.post = slow_http_post MH.return_value = mh # Run with short timeout — worker will be stuck but reader finishes try: await asyncio.wait_for(loop.run(stop), timeout=2.5) except asyncio.TimeoutError: pass # At least 2 events dropped (queue_max=1, 3 incoming) assert len(dropped) >= 2, f"expected >=2 drops, got {dropped}" assert "matrix.queue_full" in audit_events run(_inner()) # ── Test 3: concurrency — 2 workers process 2 events in parallel ────────────── def test_two_workers_process_concurrently(): """With worker_concurrency=2, two events start processing without waiting for each other.""" async def _inner(): started_order = [] finished_order = [] barrier = asyncio.Barrier(2) # both workers must arrive before either proceeds stop = asyncio.Event() loop = _make_loop(worker_concurrency=2, queue_max_events=10) events = [_make_event("$e1:s", "msg1"), _make_event("$e2:s", "msg2")] call_count = [0] async def fake_sync_poll(**kwargs): call_count[0] += 1 if call_count[0] > 1: stop.set() await asyncio.sleep(1000) return _fake_sync(events) async def concurrent_http_post(url, *, json=None, headers=None, timeout=None): if "/infer" in url: prompt = (json or {}).get("prompt", "") started_order.append(prompt) await barrier.wait() # both workers sync here → proves concurrency finished_order.append(prompt) return _ok_router_resp(f"reply to {prompt}") return _audit_resp() seen: set = set() mock_mc = AsyncMock() mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) mock_mc.__aexit__ = AsyncMock(return_value=False) mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) def fake_extract(sync_resp, room_id): evts = (sync_resp.get("rooms", {}).get("join", {}) .get(room_id, {}).get("timeline", {}).get("events", [])) return [e for e in evts if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER and e.get("event_id") not in seen] mock_mc.extract_room_messages = fake_extract mock_mc.sync_poll = fake_sync_poll with patch("app.ingress.MatrixClient") as MC: MC.return_value = mock_mc MC.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MH: mh = AsyncMock() mh.__aenter__ = AsyncMock(return_value=mh) mh.__aexit__ = AsyncMock(return_value=False) mh.post = concurrent_http_post MH.return_value = mh await asyncio.wait_for(loop.run(stop), timeout=5.0) # Both events started before either finished → they ran concurrently assert len(started_order) == 2 assert len(finished_order) == 2 # The barrier ensures both were in-flight at the same time # If sequential, the barrier would deadlock (only 1 worker reaches it) run(_inner()) # ── Test 4: graceful shutdown — in-flight items completed ───────────────────── def test_graceful_shutdown_drains_queue(): """Stop event set mid-flight → worker finishes current item before exit.""" async def _inner(): completed = [] stop = asyncio.Event() loop = _make_loop(worker_concurrency=1, queue_max_events=5, queue_drain_timeout_s=3.0) events = [_make_event("$e1:s", "drain me")] call_count = [0] async def fake_sync_poll(**kwargs): call_count[0] += 1 if call_count[0] == 1: # Set stop after first sync so reader exits but queue has item asyncio.get_event_loop().call_later(0.05, stop.set) if call_count[0] > 1: await asyncio.sleep(1000) return _fake_sync(events) async def fake_http_post(url, *, json=None, headers=None, timeout=None): if "/infer" in url: await asyncio.sleep(0.1) # simulate slow router completed.append("done") return _ok_router_resp("ok") return _audit_resp() seen: set = set() mock_mc = AsyncMock() mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) mock_mc.__aexit__ = AsyncMock(return_value=False) mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) def fake_extract(sync_resp, room_id): evts = (sync_resp.get("rooms", {}).get("join", {}) .get(room_id, {}).get("timeline", {}).get("events", [])) return [e for e in evts if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER and e.get("event_id") not in seen] mock_mc.extract_room_messages = fake_extract mock_mc.sync_poll = fake_sync_poll with patch("app.ingress.MatrixClient") as MC: MC.return_value = mock_mc MC.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MH: mh = AsyncMock() mh.__aenter__ = AsyncMock(return_value=mh) mh.__aexit__ = AsyncMock(return_value=False) mh.post = fake_http_post MH.return_value = mh await asyncio.wait_for(loop.run(stop), timeout=5.0) # Worker completed the in-flight invoke before shutdown assert completed == ["done"] run(_inner()) # ── Test 5: queue_wait metric callback ──────────────────────────────────────── def test_queue_wait_metric_fires(): """on_queue_wait must be called with (agent_id, float >= 0).""" async def _inner(): wait_calls = [] stop = asyncio.Event() loop = _make_loop( on_queue_wait=lambda a, w: wait_calls.append((a, w)), ) events = [_make_event()] call_count = [0] async def fake_sync_poll(**kwargs): call_count[0] += 1 if call_count[0] > 1: stop.set() await asyncio.sleep(1000) return _fake_sync(events) async def fake_http_post(url, *, json=None, headers=None, timeout=None): if "/infer" in url: return _ok_router_resp() return _audit_resp() seen: set = set() mock_mc = AsyncMock() mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) mock_mc.__aexit__ = AsyncMock(return_value=False) mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) def fake_extract(sync_resp, room_id): evts = (sync_resp.get("rooms", {}).get("join", {}) .get(room_id, {}).get("timeline", {}).get("events", [])) return [e for e in evts if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER and e.get("event_id") not in seen] mock_mc.extract_room_messages = fake_extract mock_mc.sync_poll = fake_sync_poll with patch("app.ingress.MatrixClient") as MC: MC.return_value = mock_mc MC.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MH: mh = AsyncMock() mh.__aenter__ = AsyncMock(return_value=mh) mh.__aexit__ = AsyncMock(return_value=False) mh.post = fake_http_post MH.return_value = mh await asyncio.wait_for(loop.run(stop), timeout=5.0) assert len(wait_calls) == 1 assert wait_calls[0][0] == "sofiia" assert isinstance(wait_calls[0][1], float) assert wait_calls[0][1] >= 0.0 run(_inner()) # ── Test 6: rate-limited event never enters queue ───────────────────────────── def test_rate_limited_event_not_enqueued(): """Rate-limited event must not enter the queue (invoke never called).""" async def _inner(): invoke_count = [0] stop = asyncio.Event() rl = InMemoryRateLimiter(room_rpm=1, sender_rpm=100) loop = _make_loop(rate_limiter=rl, worker_concurrency=1) # 2 events — first passes, second blocked by rate limiter events = [_make_event("$e1:s", "first"), _make_event("$e2:s", "second")] call_count = [0] async def fake_sync_poll(**kwargs): call_count[0] += 1 if call_count[0] > 1: stop.set() await asyncio.sleep(1000) return _fake_sync(events) async def fake_http_post(url, *, json=None, headers=None, timeout=None): if "/infer" in url: invoke_count[0] += 1 return _ok_router_resp() return _audit_resp() seen: set = set() mock_mc = AsyncMock() mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) mock_mc.__aexit__ = AsyncMock(return_value=False) mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) def fake_extract(sync_resp, room_id): evts = (sync_resp.get("rooms", {}).get("join", {}) .get(room_id, {}).get("timeline", {}).get("events", [])) return [e for e in evts if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER and e.get("event_id") not in seen] mock_mc.extract_room_messages = fake_extract mock_mc.sync_poll = fake_sync_poll with patch("app.ingress.MatrixClient") as MC: MC.return_value = mock_mc MC.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MH: mh = AsyncMock() mh.__aenter__ = AsyncMock(return_value=mh) mh.__aexit__ = AsyncMock(return_value=False) mh.post = fake_http_post MH.return_value = mh await asyncio.wait_for(loop.run(stop), timeout=5.0) assert invoke_count[0] == 1 # only first event processed run(_inner())