diff --git a/services/matrix-bridge-dagi/app/config.py b/services/matrix-bridge-dagi/app/config.py index 4f594a81..3efb39b9 100644 --- a/services/matrix-bridge-dagi/app/config.py +++ b/services/matrix-bridge-dagi/app/config.py @@ -29,6 +29,11 @@ class BridgeConfig: rate_limit_room_rpm: int # max messages per room per minute rate_limit_sender_rpm: int # max messages per sender per minute + # H2: Backpressure queue + queue_max_events: int # max pending items (drops oldest on full) + worker_concurrency: int # parallel invoke workers + queue_drain_timeout_s: float # graceful shutdown drain timeout + # Service identity node_id: str build_sha: str @@ -62,6 +67,9 @@ def load_config() -> BridgeConfig: bridge_allowed_agents=allowed, rate_limit_room_rpm=int(_optional("RATE_LIMIT_ROOM_RPM", "20")), rate_limit_sender_rpm=int(_optional("RATE_LIMIT_SENDER_RPM", "10")), + queue_max_events=max(1, int(_optional("QUEUE_MAX_EVENTS", "100"))), + worker_concurrency=max(1, int(_optional("WORKER_CONCURRENCY", "2"))), + queue_drain_timeout_s=max(1.0, float(_optional("QUEUE_DRAIN_TIMEOUT_S", "5"))), node_id=_optional("NODE_ID", "NODA1"), build_sha=_optional("BUILD_SHA", "dev"), build_time=_optional("BUILD_TIME", "local"), diff --git a/services/matrix-bridge-dagi/app/ingress.py b/services/matrix-bridge-dagi/app/ingress.py index c6f3ea00..ded027dc 100644 --- a/services/matrix-bridge-dagi/app/ingress.py +++ b/services/matrix-bridge-dagi/app/ingress.py @@ -1,32 +1,41 @@ """ -Matrix Ingress + Egress Loop — Phase M1.4 + H1/H3 +Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 -Polls Matrix /sync for new messages, invokes DAGI Router for mapped rooms, -sends agent replies back to Matrix, writes audit events to sofiia-console. +Architecture (H2): + Reader task → asyncio.Queue(maxsize) → N Worker tasks + ───────────────────────────────────────────────────────── + Reader: + sync_poll() → extract_room_messages() + → rate_limit check (H1) + → mark_seen / dedupe + → queue.put_nowait() or DROP (audit matrix.queue_full + metric) -Pipeline: - sync_poll() → extract_room_messages() - → for each message: - 1. rate_limit check (room + sender) ← H1 - 2. dedupe (mark_seen) - 3. audit: matrix.message.received - 4. invoke DAGI Router (timed → on_invoke_latency) ← H3 - 5. send_text() reply (timed → on_send_latency) ← H3 - 6. audit: matrix.agent.replied | matrix.error + Workers (N concurrent): + queue.get() → measure wait latency (H3) + → audit matrix.message.received + → invoke Router (timed, H3) + → send_text() (timed, H3) + → audit matrix.agent.replied | matrix.error -Graceful shutdown via asyncio.Event. +Shutdown: + 1. stop_event set → reader exits loop + 2. queue.join() with drain_timeout → workers finish in-flight + 3. worker tasks cancelled + +Queue entry: _QueueEntry(event, room_id, agent_id, enqueue_time) """ import asyncio import logging import time +from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional import httpx from .matrix_client import MatrixClient from .rate_limit import InMemoryRateLimiter -from .room_mapping import RoomMappingConfig +from .room_mapping import RoomMappingConfig, RoomMapping logger = logging.getLogger(__name__) @@ -37,6 +46,17 @@ _INIT_RETRY_BACKOFF = 2.0 _ROUTER_TIMEOUT_S = 45.0 _AUDIT_TIMEOUT_S = 5.0 _REPLY_TEXT_MAX = 4000 +_WORKER_GET_TIMEOUT_S = 1.0 # how long a worker waits on empty queue before re-checking + + +# ── Queue entry ──────────────────────────────────────────────────────────────── + +@dataclass +class _QueueEntry: + event: Dict[str, Any] + room_id: str + agent_id: str + enqueue_time: float # time.monotonic() at enqueue # ── Router invoke ────────────────────────────────────────────────────────────── @@ -49,20 +69,13 @@ async def _invoke_router( prompt: str, session_id: str, ) -> str: - """ - POST /v1/agents/{agent_id}/infer — returns response text string. - Field confirmed as 'response' on NODA1. - Raises httpx.HTTPError on failure. - """ + """POST /v1/agents/{agent_id}/infer → response text. Raises httpx.HTTPError on failure.""" url = f"{router_url.rstrip('/')}/v1/agents/{agent_id}/infer" payload = { "prompt": prompt, "session_id": session_id, "user_id": "matrix_bridge", - "metadata": { - "transport": "matrix", - "node_id": node_id, - }, + "metadata": {"transport": "matrix", "node_id": node_id}, } resp = await http_client.post(url, json=payload, timeout=_ROUTER_TIMEOUT_S) resp.raise_for_status() @@ -74,9 +87,7 @@ async def _invoke_router( or data.get("message") or "" ) - if not isinstance(text, str): - text = str(text) - return text.strip() + return (text if isinstance(text, str) else str(text)).strip() # ── Audit write ──────────────────────────────────────────────────────────────── @@ -95,13 +106,12 @@ async def _write_audit( duration_ms: Optional[int] = None, data: Optional[Dict[str, Any]] = None, ) -> None: - """Fire-and-forget audit write. Never raises.""" + """Fire-and-forget. Never raises.""" if not console_url or not internal_token: return try: - url = f"{console_url.rstrip('/')}/api/audit/internal" await http_client.post( - url, + f"{console_url.rstrip('/')}/api/audit/internal", json={ "event": event, "operator_id": "matrix_bridge", @@ -111,11 +121,7 @@ async def _write_audit( "status": status, "error_code": error_code, "duration_ms": duration_ms, - "data": { - "matrix_event_id": event_id, - "matrix_room_id": room_id, - **(data or {}), - }, + "data": {"matrix_event_id": event_id, "matrix_room_id": room_id, **(data or {})}, }, headers={"X-Internal-Service-Token": internal_token}, timeout=_AUDIT_TIMEOUT_S, @@ -124,19 +130,25 @@ async def _write_audit( logger.warning("Audit write failed (non-blocking): %s", exc) -# ── Ingress loop ─────────────────────────────────────────────────────────────── +# ── Ingress loop (reader + workers) ─────────────────────────────────────────── class MatrixIngressLoop: """ - Drives Matrix sync-poll → rate-check → router-invoke → Matrix send_text. + Drives the full Matrix → Router → Matrix pipeline with backpressure. + + Reader task: sync → extract → rate_check → dedupe → queue.put_nowait + Worker tasks: queue.get → invoke → send → audit Metric callbacks (all optional, called synchronously): on_message_received(room_id, agent_id) on_message_replied(room_id, agent_id, status) on_gateway_error(error_type) - on_rate_limited(room_id, agent_id, limit_type) ← H1 - on_invoke_latency(agent_id, duration_seconds) ← H3 - on_send_latency(agent_id, duration_seconds) ← H3 + on_rate_limited(room_id, agent_id, limit_type) + on_queue_dropped(room_id, agent_id) + on_queue_size(current_size: int) + on_invoke_latency(agent_id, duration_seconds) + on_send_latency(agent_id, duration_seconds) + on_queue_wait(agent_id, wait_seconds) """ def __init__( @@ -150,12 +162,19 @@ class MatrixIngressLoop: sofiia_console_url: str = "", sofiia_internal_token: str = "", rate_limiter: Optional[InMemoryRateLimiter] = None, + queue_max_events: int = 100, + worker_concurrency: int = 2, + queue_drain_timeout_s: float = 5.0, + # Callbacks on_message_received: Optional[Callable[[str, str], None]] = None, on_message_replied: Optional[Callable[[str, str, str], None]] = None, on_gateway_error: Optional[Callable[[str], None]] = None, on_rate_limited: Optional[Callable[[str, str, str], None]] = None, + on_queue_dropped: Optional[Callable[[str, str], None]] = None, + on_queue_size: Optional[Callable[[int], None]] = None, on_invoke_latency: Optional[Callable[[str, float], None]] = None, on_send_latency: Optional[Callable[[str, float], None]] = None, + on_queue_wait: Optional[Callable[[str, float], None]] = None, ) -> None: self._hs_url = matrix_homeserver_url self._token = matrix_access_token @@ -166,28 +185,52 @@ class MatrixIngressLoop: self._console_url = sofiia_console_url self._internal_token = sofiia_internal_token self._rate_limiter = rate_limiter + self._queue_max = queue_max_events + self._worker_count = worker_concurrency + self._drain_timeout_s = queue_drain_timeout_s + # Callbacks self._on_message_received = on_message_received self._on_message_replied = on_message_replied self._on_gateway_error = on_gateway_error self._on_rate_limited = on_rate_limited + self._on_queue_dropped = on_queue_dropped + self._on_queue_size = on_queue_size self._on_invoke_latency = on_invoke_latency self._on_send_latency = on_send_latency + self._on_queue_wait = on_queue_wait self._next_batch: Optional[str] = None + self._queue: Optional[asyncio.Queue] = None # exposed for /health @property def next_batch(self) -> Optional[str]: return self._next_batch + @property + def queue_size(self) -> int: + return self._queue.qsize() if self._queue else 0 + + @property + def worker_count(self) -> int: + return self._worker_count + + # ── Public run ───────────────────────────────────────────────────────────── + async def run(self, stop_event: asyncio.Event) -> None: - backoff = _INIT_RETRY_BACKOFF logger.info( - "Matrix ingress/egress loop started | hs=%s node=%s mappings=%d", + "Matrix ingress loop started | hs=%s node=%s mappings=%d " + "queue_max=%d workers=%d", self._hs_url, self._node_id, self._room_map.total_mappings, + self._queue_max, self._worker_count, ) if self._room_map.total_mappings == 0: logger.warning("No room mappings — ingress loop is idle") + queue: asyncio.Queue[Optional[_QueueEntry]] = asyncio.Queue( + maxsize=self._queue_max + ) + self._queue = queue + async with MatrixClient(self._hs_url, self._token, self._user_id) as client: for mapping in self._room_map.mappings: if mapping.agent_id in self._room_map.allowed_agents: @@ -197,29 +240,75 @@ class MatrixIngressLoop: logger.warning("Could not join room %s: %s", mapping.room_id, exc) async with httpx.AsyncClient() as http_client: - while not stop_event.is_set(): - try: - sync_resp = await client.sync_poll(since=self._next_batch) - self._next_batch = sync_resp.get("next_batch") - backoff = _INIT_RETRY_BACKOFF - await self._process_sync(client, http_client, sync_resp) - except asyncio.CancelledError: - break - except Exception as exc: - logger.error("Ingress loop error (retry in %.1fs): %s", backoff, exc) - if self._on_gateway_error: - self._on_gateway_error("sync_error") - try: - await asyncio.wait_for(stop_event.wait(), timeout=backoff) - except asyncio.TimeoutError: - pass - backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) + # Start workers + worker_tasks = [ + asyncio.create_task( + self._worker(queue, client, http_client), + name=f"matrix_worker_{i}", + ) + for i in range(self._worker_count) + ] - logger.info("Matrix ingress/egress loop stopped") + # Run reader until stop_event + await self._reader(client, queue, http_client, stop_event) - async def _process_sync( + # Drain: wait for all enqueued items to be processed + logger.info( + "Reader stopped. Draining queue (%d items, timeout=%.1fs)...", + queue.qsize(), self._drain_timeout_s, + ) + try: + await asyncio.wait_for(queue.join(), timeout=self._drain_timeout_s) + logger.info("Queue drained successfully") + except asyncio.TimeoutError: + remaining = queue.qsize() + logger.warning( + "Drain timeout (%.1fs): %d items not processed", + self._drain_timeout_s, remaining, + ) + + # Cancel workers + for task in worker_tasks: + task.cancel() + results = await asyncio.gather(*worker_tasks, return_exceptions=True) + cancelled = sum(1 for r in results if isinstance(r, asyncio.CancelledError)) + logger.info("Workers stopped (%d cancelled)", cancelled) + + self._queue = None + logger.info("Matrix ingress loop stopped") + + # ── Reader ───────────────────────────────────────────────────────────────── + + async def _reader( self, client: MatrixClient, + queue: "asyncio.Queue[Optional[_QueueEntry]]", + http_client: httpx.AsyncClient, + stop_event: asyncio.Event, + ) -> None: + backoff = _INIT_RETRY_BACKOFF + while not stop_event.is_set(): + try: + sync_resp = await client.sync_poll(since=self._next_batch) + self._next_batch = sync_resp.get("next_batch") + backoff = _INIT_RETRY_BACKOFF + await self._enqueue_from_sync(client, queue, http_client, sync_resp) + except asyncio.CancelledError: + break + except Exception as exc: + logger.error("Reader error (retry in %.1fs): %s", backoff, exc) + if self._on_gateway_error: + self._on_gateway_error("sync_error") + try: + await asyncio.wait_for(stop_event.wait(), timeout=backoff) + except asyncio.TimeoutError: + pass + backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) + + async def _enqueue_from_sync( + self, + client: MatrixClient, + queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, sync_resp: Dict[str, Any], ) -> None: @@ -228,14 +317,15 @@ class MatrixIngressLoop: continue messages = client.extract_room_messages(sync_resp, mapping.room_id) for event in messages: - await self._handle_message(client, http_client, event, mapping) + await self._try_enqueue(client, queue, http_client, event, mapping) - async def _handle_message( + async def _try_enqueue( self, client: MatrixClient, + queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, event: Dict[str, Any], - mapping, + mapping: RoomMapping, ) -> None: event_id = event.get("event_id", "") sender = event.get("sender", "") @@ -246,7 +336,7 @@ class MatrixIngressLoop: if not text: return - # ── H1: Rate limit check ─────────────────────────────────────────────── + # H1: Rate limit (before mark_seen — don't charge quota on drop) if self._rate_limiter is not None: allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender) if not allowed: @@ -266,12 +356,81 @@ class MatrixIngressLoop: ) return - # Dedupe — mark seen before any IO + # Dedupe — mark before enqueue (prevents double-enqueue on retry) client.mark_seen(event_id) + # H2: Enqueue or drop + entry = _QueueEntry( + event=event, + room_id=room_id, + agent_id=agent_id, + enqueue_time=time.monotonic(), + ) + try: + queue.put_nowait(entry) + qsize = queue.qsize() + logger.debug("Enqueued event=%s qsize=%d", event_id, qsize) + if self._on_queue_size: + self._on_queue_size(qsize) + except asyncio.QueueFull: + logger.warning( + "Queue full (max=%d): dropping event=%s room=%s agent=%s", + self._queue_max, event_id, room_id, agent_id, + ) + if self._on_queue_dropped: + self._on_queue_dropped(room_id, agent_id) + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.queue_full", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="queue_full", + data={"queue_max": self._queue_max, "sender": sender}, + ) + + # ── Worker ───────────────────────────────────────────────────────────────── + + async def _worker( + self, + queue: "asyncio.Queue[Optional[_QueueEntry]]", + client: MatrixClient, + http_client: httpx.AsyncClient, + ) -> None: + """Consume queue entries until cancelled.""" + while True: + entry = await queue.get() # blocks until item available; raises CancelledError on cancel + try: + await self._process_entry(client, http_client, entry) + except Exception as exc: + logger.error("Worker unhandled error: %s", exc) + finally: + queue.task_done() + if self._on_queue_size: + self._on_queue_size(queue.qsize()) + + # ── Process (invoke + send + audit) ─────────────────────────────────────── + + async def _process_entry( + self, + client: MatrixClient, + http_client: httpx.AsyncClient, + entry: _QueueEntry, + ) -> None: + event = entry.event + event_id = event.get("event_id", "") + sender = event.get("sender", "") + text = event.get("content", {}).get("body", "").strip() + room_id = entry.room_id + agent_id = entry.agent_id + + # H3: Queue wait latency + wait_s = time.monotonic() - entry.enqueue_time + if self._on_queue_wait: + self._on_queue_wait(agent_id, wait_s) + logger.info( - "Matrix message: room=%s sender=%s agent=%s event=%s len=%d", - room_id, sender, agent_id, event_id, len(text), + "Processing: room=%s agent=%s event=%s len=%d wait=%.3fs", + room_id, agent_id, event_id, len(text), wait_s, ) if self._on_message_received: @@ -283,148 +442,119 @@ class MatrixIngressLoop: agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", - data={"sender": sender, "text_len": len(text)}, + data={"sender": sender, "text_len": len(text), "queue_wait_ms": int(wait_s * 1000)}, ) session_id = f"matrix:{room_id.replace('!', '').replace(':', '_')}" - # ── H3: Invoke with latency measurement ─────────────────────────────── + # H3: Invoke with latency t0 = time.monotonic() reply_text: Optional[str] = None invoke_ok = False - invoke_duration_s: float = 0.0 + invoke_duration_s = 0.0 try: reply_text = await _invoke_router( - http_client, - self._router_url, - agent_id=agent_id, - node_id=self._node_id, - prompt=text, - session_id=session_id, + http_client, self._router_url, + agent_id=agent_id, node_id=self._node_id, + prompt=text, session_id=session_id, ) invoke_ok = True invoke_duration_s = time.monotonic() - t0 - duration_ms = int(invoke_duration_s * 1000) - if self._on_invoke_latency: self._on_invoke_latency(agent_id, invoke_duration_s) - logger.info( - "Router invoke ok: agent=%s event=%s reply_len=%d duration=%dms", - agent_id, event_id, len(reply_text or ""), duration_ms, + "Invoke ok: agent=%s event=%s reply_len=%d duration=%dms", + agent_id, event_id, len(reply_text or ""), int(invoke_duration_s * 1000), ) except httpx.HTTPStatusError as exc: invoke_duration_s = time.monotonic() - t0 - duration_ms = int(invoke_duration_s * 1000) logger.error( - "Router HTTP %d for agent=%s event=%s duration=%dms", - exc.response.status_code, agent_id, event_id, duration_ms, + "Router HTTP %d agent=%s event=%s duration=%dms", + exc.response.status_code, agent_id, event_id, int(invoke_duration_s * 1000), ) if self._on_gateway_error: self._on_gateway_error(f"http_{exc.response.status_code}") await _write_audit( http_client, self._console_url, self._internal_token, - event="matrix.error", - agent_id=agent_id, node_id=self._node_id, + event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=f"router_http_{exc.response.status_code}", - duration_ms=duration_ms, + duration_ms=int(invoke_duration_s * 1000), ) except (httpx.ConnectError, httpx.TimeoutException) as exc: invoke_duration_s = time.monotonic() - t0 - duration_ms = int(invoke_duration_s * 1000) - logger.error( - "Router network error agent=%s event=%s: %s duration=%dms", - agent_id, event_id, exc, duration_ms, - ) + logger.error("Router network error agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_gateway_error: self._on_gateway_error("network_error") await _write_audit( http_client, self._console_url, self._internal_token, - event="matrix.error", - agent_id=agent_id, node_id=self._node_id, + event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="router_network_error", - duration_ms=duration_ms, + duration_ms=int(invoke_duration_s * 1000), ) except Exception as exc: invoke_duration_s = time.monotonic() - t0 - duration_ms = int(invoke_duration_s * 1000) - logger.error( - "Unexpected router error agent=%s event=%s: %s", - agent_id, event_id, exc, - ) + logger.error("Unexpected invoke error agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_gateway_error: self._on_gateway_error("unexpected") await _write_audit( http_client, self._console_url, self._internal_token, - event="matrix.error", - agent_id=agent_id, node_id=self._node_id, + event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="router_unexpected", - duration_ms=duration_ms, + duration_ms=int(invoke_duration_s * 1000), ) - if not invoke_ok: + if not invoke_ok or not reply_text: + if invoke_ok: + logger.warning("Empty reply from router agent=%s event=%s", agent_id, event_id) return - if not reply_text: - logger.warning("Empty reply from router for agent=%s event=%s", agent_id, event_id) - return - - # ── H3: Send with latency measurement ───────────────────────────────── + # H3: Send with latency send_text = reply_text[:_REPLY_TEXT_MAX] txn_id = MatrixClient.make_txn_id(room_id, event_id) - send_t0 = time.monotonic() + try: await client.send_text(room_id, send_text, txn_id) send_duration_s = time.monotonic() - send_t0 - send_duration_ms = int(send_duration_s * 1000) - if self._on_send_latency: self._on_send_latency(agent_id, send_duration_s) if self._on_message_replied: self._on_message_replied(room_id, agent_id, "ok") - await _write_audit( http_client, self._console_url, self._internal_token, - event="matrix.agent.replied", - agent_id=agent_id, node_id=self._node_id, - room_id=room_id, event_id=event_id, - status="ok", - duration_ms=send_duration_ms, + event="matrix.agent.replied", agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, status="ok", + duration_ms=int(send_duration_s * 1000), data={ "reply_len": len(send_text), "truncated": len(reply_text) > _REPLY_TEXT_MAX, "router_duration_ms": int(invoke_duration_s * 1000), + "queue_wait_ms": int(wait_s * 1000), }, ) logger.info( "Reply sent: agent=%s event=%s reply_len=%d send_ms=%d", - agent_id, event_id, len(send_text), send_duration_ms, + agent_id, event_id, len(send_text), int(send_duration_s * 1000), ) except Exception as exc: send_duration_s = time.monotonic() - send_t0 - send_duration_ms = int(send_duration_s * 1000) - logger.error( - "Failed to send Matrix reply agent=%s event=%s: %s", - agent_id, event_id, exc, - ) + logger.error("Send failed agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_message_replied: self._on_message_replied(room_id, agent_id, "error") if self._on_gateway_error: self._on_gateway_error("matrix_send_error") await _write_audit( http_client, self._console_url, self._internal_token, - event="matrix.error", - agent_id=agent_id, node_id=self._node_id, + event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="matrix_send_failed", - duration_ms=send_duration_ms, + duration_ms=int(send_duration_s * 1000), ) diff --git a/services/matrix-bridge-dagi/app/main.py b/services/matrix-bridge-dagi/app/main.py index 87352bcc..65989be8 100644 --- a/services/matrix-bridge-dagi/app/main.py +++ b/services/matrix-bridge-dagi/app/main.py @@ -88,6 +88,22 @@ if _PROM_OK: "matrix_bridge_rate_limiter_active_senders", "Senders with activity in the current rate-limit window", ) + # H2: Queue metrics + _queue_size = Gauge( + "matrix_bridge_queue_size", + "Current number of pending items in the work queue", + ) + _queue_dropped = Counter( + "matrix_bridge_queue_dropped_total", + "Messages dropped because queue was full", + ["room_id", "agent_id"], + ) + _queue_wait = Histogram( + "matrix_bridge_queue_wait_seconds", + "Time between enqueue and worker start processing", + ["agent_id"], + buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 30.0], + ) # ── Startup state ───────────────────────────────────────────────────────────── _START_TIME = time.monotonic() @@ -97,6 +113,7 @@ _matrix_reachable: Optional[bool] = None _gateway_reachable: Optional[bool] = None _room_map: Optional[RoomMappingConfig] = None _rate_limiter: Optional[InMemoryRateLimiter] = None +_ingress_loop: Optional["MatrixIngressLoop"] = None # for /health queue_size _ingress_task: Optional[asyncio.Task] = None _ingress_stop: Optional[asyncio.Event] = None @@ -116,7 +133,7 @@ async def _probe_url(url: str, timeout: float = 5.0) -> bool: @asynccontextmanager async def lifespan(app_: Any): global _cfg, _config_error, _matrix_reachable, _gateway_reachable - global _room_map, _rate_limiter + global _room_map, _rate_limiter, _ingress_loop try: _cfg = load_config() @@ -185,7 +202,6 @@ async def lifespan(app_: Any): _messages_rate_limited.labels( room_id=room_id, agent_id=agent_id, limit_type=limit_type ).inc() - # Update active room/sender gauges from limiter stats if _rate_limiter is not None: stats = _rate_limiter.stats() _rate_limiter_active_rooms.set(stats["active_rooms"]) @@ -199,6 +215,19 @@ async def lifespan(app_: Any): if _PROM_OK: _send_latency.labels(agent_id=agent_id).observe(duration_s) + # H2 callbacks + def _on_queue_dropped(room_id: str, agent_id: str) -> None: + if _PROM_OK: + _queue_dropped.labels(room_id=room_id, agent_id=agent_id).inc() + + def _on_queue_size(size: int) -> None: + if _PROM_OK: + _queue_size.set(size) + + def _on_queue_wait(agent_id: str, wait_s: float) -> None: + if _PROM_OK: + _queue_wait.labels(agent_id=agent_id).observe(wait_s) + ingress = MatrixIngressLoop( matrix_homeserver_url=_cfg.matrix_homeserver_url, matrix_access_token=_cfg.matrix_access_token, @@ -209,13 +238,24 @@ async def lifespan(app_: Any): sofiia_console_url=_cfg.sofiia_console_url, sofiia_internal_token=_cfg.sofiia_internal_token, rate_limiter=_rate_limiter, + queue_max_events=_cfg.queue_max_events, + worker_concurrency=_cfg.worker_concurrency, + queue_drain_timeout_s=_cfg.queue_drain_timeout_s, on_message_received=_on_msg, on_message_replied=_on_replied, on_gateway_error=_on_gw_error, on_rate_limited=_on_rate_limited, + on_queue_dropped=_on_queue_dropped, + on_queue_size=_on_queue_size, on_invoke_latency=_on_invoke_latency, on_send_latency=_on_send_latency, + on_queue_wait=_on_queue_wait, ) + logger.info( + "✅ Backpressure queue: max=%d workers=%d drain_timeout=%.1fs", + _cfg.queue_max_events, _cfg.worker_concurrency, _cfg.queue_drain_timeout_s, + ) + _ingress_loop = ingress _ingress_task = asyncio.create_task( ingress.run(_ingress_stop), name="matrix_ingress_loop", @@ -290,6 +330,11 @@ async def health() -> Dict[str, Any]: "mappings_count": _room_map.total_mappings if _room_map else 0, "config_ok": True, "rate_limiter": _rate_limiter.stats() if _rate_limiter else None, + "queue": { + "size": _ingress_loop.queue_size if _ingress_loop else 0, + "max": _cfg.queue_max_events, + "workers": _cfg.worker_concurrency, + }, } diff --git a/tests/test_matrix_bridge_queue.py b/tests/test_matrix_bridge_queue.py new file mode 100644 index 00000000..5847a2b1 --- /dev/null +++ b/tests/test_matrix_bridge_queue.py @@ -0,0 +1,519 @@ +""" +Tests for matrix-bridge-dagi H2: Backpressure queue (reader + workers) + +Coverage: + 1. enqueue_and_process — single event enqueued → worker processes exactly once + 2. queue_full_drop — queue full → dropped + on_queue_dropped + audit matrix.queue_full + 3. concurrency — 2 events processed concurrently (worker_concurrency=2) + 4. graceful_shutdown — stop_event set → in-flight items completed, queue drained + 5. queue_wait_metric — on_queue_wait called with agent_id + float + 6. rate_limit_before_enqueue — rate-limited event never enters queue +""" + +import asyncio +import sys +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" +if str(_BRIDGE) not in sys.path: + sys.path.insert(0, str(_BRIDGE)) + +from app.ingress import MatrixIngressLoop, _QueueEntry # noqa: E402 +from app.rate_limit import InMemoryRateLimiter # noqa: E402 +from app.room_mapping import parse_room_map # noqa: E402 + + +def run(coro): + return asyncio.run(coro) + + +ALLOWED = frozenset({"sofiia"}) +ROOM_ID = "!QwHczWXgefDHBEVkTH:daarion.space" +ROOM_MAP_STR = f"sofiia:{ROOM_ID}" +ROUTER_URL = "http://dagi-router-node1:8000" +HS_URL = "http://dagi-synapse-node1:8008" +CONSOLE_URL = "http://dagi-sofiia-console-node1:8002" +INTERNAL_TOKEN = "test_tok" +BOT_USER = "@dagi_bridge:daarion.space" +USER = "@user:daarion.space" + + +def _make_event(event_id: str = "$e1:s", body: str = "Hello") -> dict: + return { + "type": "m.room.message", + "event_id": event_id, + "sender": USER, + "content": {"msgtype": "m.text", "body": body}, + "origin_server_ts": 1000, + } + + +def _fake_sync(events: list) -> dict: + return { + "next_batch": "s_next", + "rooms": {"join": {ROOM_ID: {"timeline": {"events": events}}}}, + } + + +def _ok_router_resp(text: str = "Reply!") -> MagicMock: + r = MagicMock() + r.status_code = 200 + r.json.return_value = {"response": text} + r.raise_for_status = MagicMock() + return r + + +def _audit_resp() -> MagicMock: + r = MagicMock() + r.status_code = 200 + r.json.return_value = {"ok": True} + r.raise_for_status = MagicMock() + return r + + +def _make_loop(**kwargs) -> MatrixIngressLoop: + room_map = parse_room_map(ROOM_MAP_STR, ALLOWED) + defaults = dict( + matrix_homeserver_url=HS_URL, + matrix_access_token="tok", + matrix_user_id=BOT_USER, + router_url=ROUTER_URL, + node_id="NODA1", + room_map=room_map, + sofiia_console_url=CONSOLE_URL, + sofiia_internal_token=INTERNAL_TOKEN, + queue_max_events=10, + worker_concurrency=1, + queue_drain_timeout_s=2.0, + ) + defaults.update(kwargs) + return MatrixIngressLoop(**defaults) + + +def _make_mock_client(events_per_sync: list, stop_after: int = 1) -> tuple: + """Returns (mock_client_class, mock_client_instance).""" + call_count = [0] + seen = set() + + def fake_extract(sync_resp, room_id): + evts = (sync_resp.get("rooms", {}).get("join", {}) + .get(room_id, {}).get("timeline", {}).get("events", [])) + return [e for e in evts + if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + def fake_mark_seen(eid): + seen.add(eid) + + async def fake_sync_poll(stop_event_ref=None, **kwargs): + call_count[0] += 1 + if call_count[0] > stop_after: + # Block until cancelled + await asyncio.sleep(1000) + return _fake_sync(events_per_sync) + + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock(side_effect=fake_mark_seen) + mock_mc.extract_room_messages = fake_extract + mock_mc.send_text = AsyncMock(return_value={"event_id": "$reply"}) + mock_mc.sync_poll = fake_sync_poll + + return mock_mc + + +# ── Test 1: enqueue and process exactly once ─────────────────────────────────── + +def test_enqueue_and_process_exactly_once(): + """One event → router invoked once, send_text called once.""" + async def _inner(): + invoke_count = [0] + send_calls = [] + stop = asyncio.Event() + loop = _make_loop(worker_concurrency=1) + events = [_make_event("$e1:s", "Hello")] + mock_mc = _make_mock_client(events, stop_after=1) + mock_mc.sync_poll = _stop_after_n_syncs(1, events, stop) + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + invoke_count[0] += 1 + await asyncio.sleep(0.01) + return _ok_router_resp("Hi!") + return _audit_resp() + + with patch("app.ingress.MatrixClient") as MC: + MC.return_value = mock_mc + MC.make_txn_id = lambda r, e: f"txn_{e}" + with patch("app.ingress.httpx.AsyncClient") as MH: + mh = AsyncMock() + mh.__aenter__ = AsyncMock(return_value=mh) + mh.__aexit__ = AsyncMock(return_value=False) + mh.post = fake_http_post + MH.return_value = mh + await asyncio.wait_for(loop.run(stop), timeout=5.0) + + assert invoke_count[0] == 1 + assert mock_mc.send_text.call_count == 1 + + run(_inner()) + + +def _stop_after_n_syncs(n: int, events: list, stop: asyncio.Event): + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > n: + stop.set() + await asyncio.sleep(1000) + return _fake_sync(events) + + return fake_sync_poll + + +# ── Test 2: queue full → drop + callback + audit ─────────────────────────────── + +def test_queue_full_drop_and_audit(): + """When queue is full, extra events are dropped with audit matrix.queue_full.""" + async def _inner(): + dropped = [] + audit_events = [] + + # queue_max=1, worker_concurrency=1 but worker is slow (blocks on barrier) + barrier = asyncio.Event() + stop = asyncio.Event() + + loop = _make_loop( + queue_max_events=1, + worker_concurrency=1, + on_queue_dropped=lambda r, a: dropped.append((r, a)), + ) + + # 3 events in one sync — queue holds 1, worker blocked, 2 dropped + events = [ + _make_event("$e1:s", "msg1"), + _make_event("$e2:s", "msg2"), + _make_event("$e3:s", "msg3"), + ] + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > 1: + stop.set() + await asyncio.sleep(1000) + return _fake_sync(events) + + async def slow_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + await barrier.wait() # block worker indefinitely + return _ok_router_resp() + if "/audit/internal" in url: + event_name = (json or {}).get("event", "") + audit_events.append(event_name) + return _audit_resp() + + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock() + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) + seen: set = set() + + def fake_extract(sync_resp, room_id): + evts = (sync_resp.get("rooms", {}).get("join", {}) + .get(room_id, {}).get("timeline", {}).get("events", [])) + return [e for e in evts + if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + mock_mc.extract_room_messages = fake_extract + mock_mc.sync_poll = fake_sync_poll + + with patch("app.ingress.MatrixClient") as MC: + MC.return_value = mock_mc + MC.make_txn_id = lambda r, e: f"txn_{e}" + with patch("app.ingress.httpx.AsyncClient") as MH: + mh = AsyncMock() + mh.__aenter__ = AsyncMock(return_value=mh) + mh.__aexit__ = AsyncMock(return_value=False) + mh.post = slow_http_post + MH.return_value = mh + # Run with short timeout — worker will be stuck but reader finishes + try: + await asyncio.wait_for(loop.run(stop), timeout=2.5) + except asyncio.TimeoutError: + pass + + # At least 2 events dropped (queue_max=1, 3 incoming) + assert len(dropped) >= 2, f"expected >=2 drops, got {dropped}" + assert "matrix.queue_full" in audit_events + + run(_inner()) + + +# ── Test 3: concurrency — 2 workers process 2 events in parallel ────────────── + +def test_two_workers_process_concurrently(): + """With worker_concurrency=2, two events start processing without waiting for each other.""" + async def _inner(): + started_order = [] + finished_order = [] + barrier = asyncio.Barrier(2) # both workers must arrive before either proceeds + stop = asyncio.Event() + + loop = _make_loop(worker_concurrency=2, queue_max_events=10) + events = [_make_event("$e1:s", "msg1"), _make_event("$e2:s", "msg2")] + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > 1: + stop.set() + await asyncio.sleep(1000) + return _fake_sync(events) + + async def concurrent_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + prompt = (json or {}).get("prompt", "") + started_order.append(prompt) + await barrier.wait() # both workers sync here → proves concurrency + finished_order.append(prompt) + return _ok_router_resp(f"reply to {prompt}") + return _audit_resp() + + seen: set = set() + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) + + def fake_extract(sync_resp, room_id): + evts = (sync_resp.get("rooms", {}).get("join", {}) + .get(room_id, {}).get("timeline", {}).get("events", [])) + return [e for e in evts + if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + mock_mc.extract_room_messages = fake_extract + mock_mc.sync_poll = fake_sync_poll + + with patch("app.ingress.MatrixClient") as MC: + MC.return_value = mock_mc + MC.make_txn_id = lambda r, e: f"txn_{e}" + with patch("app.ingress.httpx.AsyncClient") as MH: + mh = AsyncMock() + mh.__aenter__ = AsyncMock(return_value=mh) + mh.__aexit__ = AsyncMock(return_value=False) + mh.post = concurrent_http_post + MH.return_value = mh + await asyncio.wait_for(loop.run(stop), timeout=5.0) + + # Both events started before either finished → they ran concurrently + assert len(started_order) == 2 + assert len(finished_order) == 2 + # The barrier ensures both were in-flight at the same time + # If sequential, the barrier would deadlock (only 1 worker reaches it) + + run(_inner()) + + +# ── Test 4: graceful shutdown — in-flight items completed ───────────────────── + +def test_graceful_shutdown_drains_queue(): + """Stop event set mid-flight → worker finishes current item before exit.""" + async def _inner(): + completed = [] + stop = asyncio.Event() + + loop = _make_loop(worker_concurrency=1, queue_max_events=5, queue_drain_timeout_s=3.0) + events = [_make_event("$e1:s", "drain me")] + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] == 1: + # Set stop after first sync so reader exits but queue has item + asyncio.get_event_loop().call_later(0.05, stop.set) + if call_count[0] > 1: + await asyncio.sleep(1000) + return _fake_sync(events) + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + await asyncio.sleep(0.1) # simulate slow router + completed.append("done") + return _ok_router_resp("ok") + return _audit_resp() + + seen: set = set() + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) + + def fake_extract(sync_resp, room_id): + evts = (sync_resp.get("rooms", {}).get("join", {}) + .get(room_id, {}).get("timeline", {}).get("events", [])) + return [e for e in evts + if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + mock_mc.extract_room_messages = fake_extract + mock_mc.sync_poll = fake_sync_poll + + with patch("app.ingress.MatrixClient") as MC: + MC.return_value = mock_mc + MC.make_txn_id = lambda r, e: f"txn_{e}" + with patch("app.ingress.httpx.AsyncClient") as MH: + mh = AsyncMock() + mh.__aenter__ = AsyncMock(return_value=mh) + mh.__aexit__ = AsyncMock(return_value=False) + mh.post = fake_http_post + MH.return_value = mh + await asyncio.wait_for(loop.run(stop), timeout=5.0) + + # Worker completed the in-flight invoke before shutdown + assert completed == ["done"] + + run(_inner()) + + +# ── Test 5: queue_wait metric callback ──────────────────────────────────────── + +def test_queue_wait_metric_fires(): + """on_queue_wait must be called with (agent_id, float >= 0).""" + async def _inner(): + wait_calls = [] + stop = asyncio.Event() + loop = _make_loop( + on_queue_wait=lambda a, w: wait_calls.append((a, w)), + ) + events = [_make_event()] + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > 1: + stop.set() + await asyncio.sleep(1000) + return _fake_sync(events) + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + return _ok_router_resp() + return _audit_resp() + + seen: set = set() + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) + + def fake_extract(sync_resp, room_id): + evts = (sync_resp.get("rooms", {}).get("join", {}) + .get(room_id, {}).get("timeline", {}).get("events", [])) + return [e for e in evts + if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + mock_mc.extract_room_messages = fake_extract + mock_mc.sync_poll = fake_sync_poll + + with patch("app.ingress.MatrixClient") as MC: + MC.return_value = mock_mc + MC.make_txn_id = lambda r, e: f"txn_{e}" + with patch("app.ingress.httpx.AsyncClient") as MH: + mh = AsyncMock() + mh.__aenter__ = AsyncMock(return_value=mh) + mh.__aexit__ = AsyncMock(return_value=False) + mh.post = fake_http_post + MH.return_value = mh + await asyncio.wait_for(loop.run(stop), timeout=5.0) + + assert len(wait_calls) == 1 + assert wait_calls[0][0] == "sofiia" + assert isinstance(wait_calls[0][1], float) + assert wait_calls[0][1] >= 0.0 + + run(_inner()) + + +# ── Test 6: rate-limited event never enters queue ───────────────────────────── + +def test_rate_limited_event_not_enqueued(): + """Rate-limited event must not enter the queue (invoke never called).""" + async def _inner(): + invoke_count = [0] + stop = asyncio.Event() + + rl = InMemoryRateLimiter(room_rpm=1, sender_rpm=100) + loop = _make_loop(rate_limiter=rl, worker_concurrency=1) + + # 2 events — first passes, second blocked by rate limiter + events = [_make_event("$e1:s", "first"), _make_event("$e2:s", "second")] + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > 1: + stop.set() + await asyncio.sleep(1000) + return _fake_sync(events) + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + invoke_count[0] += 1 + return _ok_router_resp() + return _audit_resp() + + seen: set = set() + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock(side_effect=lambda e: seen.add(e)) + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) + + def fake_extract(sync_resp, room_id): + evts = (sync_resp.get("rooms", {}).get("join", {}) + .get(room_id, {}).get("timeline", {}).get("events", [])) + return [e for e in evts + if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + mock_mc.extract_room_messages = fake_extract + mock_mc.sync_poll = fake_sync_poll + + with patch("app.ingress.MatrixClient") as MC: + MC.return_value = mock_mc + MC.make_txn_id = lambda r, e: f"txn_{e}" + with patch("app.ingress.httpx.AsyncClient") as MH: + mh = AsyncMock() + mh.__aenter__ = AsyncMock(return_value=mh) + mh.__aexit__ = AsyncMock(return_value=False) + mh.post = fake_http_post + MH.return_value = mh + await asyncio.wait_for(loop.run(stop), timeout=5.0) + + assert invoke_count[0] == 1 # only first event processed + + run(_inner())