""" Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 Architecture (H2): Reader task → asyncio.Queue(maxsize) → N Worker tasks ───────────────────────────────────────────────────────── Reader: sync_poll() → extract_room_messages() → rate_limit check (H1) → mark_seen / dedupe → queue.put_nowait() or DROP (audit matrix.queue_full + metric) Workers (N concurrent): queue.get() → measure wait latency (H3) → audit matrix.message.received → invoke Router (timed, H3) → send_text() (timed, H3) → audit matrix.agent.replied | matrix.error Shutdown: 1. stop_event set → reader exits loop 2. queue.join() with drain_timeout → workers finish in-flight 3. worker tasks cancelled Queue entry: _QueueEntry(event, room_id, agent_id, enqueue_time) """ import asyncio import logging import time from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional import httpx from .matrix_client import MatrixClient from .rate_limit import InMemoryRateLimiter from .room_mapping import RoomMappingConfig, RoomMapping logger = logging.getLogger(__name__) # ── Constants ────────────────────────────────────────────────────────────────── _MAX_RETRY_BACKOFF = 60.0 _INIT_RETRY_BACKOFF = 2.0 _ROUTER_TIMEOUT_S = 45.0 _AUDIT_TIMEOUT_S = 5.0 _REPLY_TEXT_MAX = 4000 _WORKER_GET_TIMEOUT_S = 1.0 # how long a worker waits on empty queue before re-checking # ── Queue entry ──────────────────────────────────────────────────────────────── @dataclass class _QueueEntry: event: Dict[str, Any] room_id: str agent_id: str enqueue_time: float # time.monotonic() at enqueue # ── Router invoke ────────────────────────────────────────────────────────────── async def _invoke_router( http_client: httpx.AsyncClient, router_url: str, agent_id: str, node_id: str, prompt: str, session_id: str, ) -> str: """POST /v1/agents/{agent_id}/infer → response text. Raises httpx.HTTPError on failure.""" url = f"{router_url.rstrip('/')}/v1/agents/{agent_id}/infer" payload = { "prompt": prompt, "session_id": session_id, "user_id": "matrix_bridge", "metadata": {"transport": "matrix", "node_id": node_id}, } resp = await http_client.post(url, json=payload, timeout=_ROUTER_TIMEOUT_S) resp.raise_for_status() data = resp.json() text = ( data.get("response") or data.get("text") or data.get("content") or data.get("message") or "" ) return (text if isinstance(text, str) else str(text)).strip() # ── Audit write ──────────────────────────────────────────────────────────────── async def _write_audit( http_client: httpx.AsyncClient, console_url: str, internal_token: str, event: str, agent_id: str, node_id: str, room_id: str, event_id: str, status: str = "ok", error_code: Optional[str] = None, duration_ms: Optional[int] = None, data: Optional[Dict[str, Any]] = None, ) -> None: """Fire-and-forget. Never raises.""" if not console_url or not internal_token: return try: await http_client.post( f"{console_url.rstrip('/')}/api/audit/internal", json={ "event": event, "operator_id": "matrix_bridge", "node_id": node_id, "agent_id": agent_id, "chat_id": room_id, "status": status, "error_code": error_code, "duration_ms": duration_ms, "data": {"matrix_event_id": event_id, "matrix_room_id": room_id, **(data or {})}, }, headers={"X-Internal-Service-Token": internal_token}, timeout=_AUDIT_TIMEOUT_S, ) except Exception as exc: logger.warning("Audit write failed (non-blocking): %s", exc) # ── Ingress loop (reader + workers) ─────────────────────────────────────────── class MatrixIngressLoop: """ Drives the full Matrix → Router → Matrix pipeline with backpressure. Reader task: sync → extract → rate_check → dedupe → queue.put_nowait Worker tasks: queue.get → invoke → send → audit Metric callbacks (all optional, called synchronously): on_message_received(room_id, agent_id) on_message_replied(room_id, agent_id, status) on_gateway_error(error_type) on_rate_limited(room_id, agent_id, limit_type) on_queue_dropped(room_id, agent_id) on_queue_size(current_size: int) on_invoke_latency(agent_id, duration_seconds) on_send_latency(agent_id, duration_seconds) on_queue_wait(agent_id, wait_seconds) """ def __init__( self, matrix_homeserver_url: str, matrix_access_token: str, matrix_user_id: str, router_url: str, node_id: str, room_map: RoomMappingConfig, sofiia_console_url: str = "", sofiia_internal_token: str = "", rate_limiter: Optional[InMemoryRateLimiter] = None, queue_max_events: int = 100, worker_concurrency: int = 2, queue_drain_timeout_s: float = 5.0, # Callbacks on_message_received: Optional[Callable[[str, str], None]] = None, on_message_replied: Optional[Callable[[str, str, str], None]] = None, on_gateway_error: Optional[Callable[[str], None]] = None, on_rate_limited: Optional[Callable[[str, str, str], None]] = None, on_queue_dropped: Optional[Callable[[str, str], None]] = None, on_queue_size: Optional[Callable[[int], None]] = None, on_invoke_latency: Optional[Callable[[str, float], None]] = None, on_send_latency: Optional[Callable[[str, float], None]] = None, on_queue_wait: Optional[Callable[[str, float], None]] = None, ) -> None: self._hs_url = matrix_homeserver_url self._token = matrix_access_token self._user_id = matrix_user_id self._router_url = router_url self._node_id = node_id self._room_map = room_map self._console_url = sofiia_console_url self._internal_token = sofiia_internal_token self._rate_limiter = rate_limiter self._queue_max = queue_max_events self._worker_count = worker_concurrency self._drain_timeout_s = queue_drain_timeout_s # Callbacks self._on_message_received = on_message_received self._on_message_replied = on_message_replied self._on_gateway_error = on_gateway_error self._on_rate_limited = on_rate_limited self._on_queue_dropped = on_queue_dropped self._on_queue_size = on_queue_size self._on_invoke_latency = on_invoke_latency self._on_send_latency = on_send_latency self._on_queue_wait = on_queue_wait self._next_batch: Optional[str] = None self._queue: Optional[asyncio.Queue] = None # exposed for /health @property def next_batch(self) -> Optional[str]: return self._next_batch @property def queue_size(self) -> int: return self._queue.qsize() if self._queue else 0 @property def worker_count(self) -> int: return self._worker_count # ── Public run ───────────────────────────────────────────────────────────── async def run(self, stop_event: asyncio.Event) -> None: logger.info( "Matrix ingress loop started | hs=%s node=%s mappings=%d " "queue_max=%d workers=%d", self._hs_url, self._node_id, self._room_map.total_mappings, self._queue_max, self._worker_count, ) if self._room_map.total_mappings == 0: logger.warning("No room mappings — ingress loop is idle") queue: asyncio.Queue[Optional[_QueueEntry]] = asyncio.Queue( maxsize=self._queue_max ) self._queue = queue async with MatrixClient(self._hs_url, self._token, self._user_id) as client: for mapping in self._room_map.mappings: if mapping.agent_id in self._room_map.allowed_agents: try: await client.join_room(mapping.room_id) except Exception as exc: logger.warning("Could not join room %s: %s", mapping.room_id, exc) async with httpx.AsyncClient() as http_client: # Start workers worker_tasks = [ asyncio.create_task( self._worker(queue, client, http_client), name=f"matrix_worker_{i}", ) for i in range(self._worker_count) ] # Run reader until stop_event await self._reader(client, queue, http_client, stop_event) # Drain: wait for all enqueued items to be processed logger.info( "Reader stopped. Draining queue (%d items, timeout=%.1fs)...", queue.qsize(), self._drain_timeout_s, ) try: await asyncio.wait_for(queue.join(), timeout=self._drain_timeout_s) logger.info("Queue drained successfully") except asyncio.TimeoutError: remaining = queue.qsize() logger.warning( "Drain timeout (%.1fs): %d items not processed", self._drain_timeout_s, remaining, ) # Cancel workers for task in worker_tasks: task.cancel() results = await asyncio.gather(*worker_tasks, return_exceptions=True) cancelled = sum(1 for r in results if isinstance(r, asyncio.CancelledError)) logger.info("Workers stopped (%d cancelled)", cancelled) self._queue = None logger.info("Matrix ingress loop stopped") # ── Reader ───────────────────────────────────────────────────────────────── async def _reader( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, stop_event: asyncio.Event, ) -> None: backoff = _INIT_RETRY_BACKOFF while not stop_event.is_set(): try: sync_resp = await client.sync_poll(since=self._next_batch) self._next_batch = sync_resp.get("next_batch") backoff = _INIT_RETRY_BACKOFF await self._enqueue_from_sync(client, queue, http_client, sync_resp) except asyncio.CancelledError: break except Exception as exc: logger.error("Reader error (retry in %.1fs): %s", backoff, exc) if self._on_gateway_error: self._on_gateway_error("sync_error") try: await asyncio.wait_for(stop_event.wait(), timeout=backoff) except asyncio.TimeoutError: pass backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) async def _enqueue_from_sync( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, sync_resp: Dict[str, Any], ) -> None: for mapping in self._room_map.mappings: if mapping.agent_id not in self._room_map.allowed_agents: continue messages = client.extract_room_messages(sync_resp, mapping.room_id) for event in messages: await self._try_enqueue(client, queue, http_client, event, mapping) async def _try_enqueue( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, event: Dict[str, Any], mapping: RoomMapping, ) -> None: event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() room_id = mapping.room_id agent_id = mapping.agent_id if not text: return # H1: Rate limit (before mark_seen — don't charge quota on drop) if self._rate_limiter is not None: allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender) if not allowed: logger.warning( "Rate limited: room=%s sender=%s limit_type=%s event=%s", room_id, sender, limit_type, event_id, ) if self._on_rate_limited: self._on_rate_limited(room_id, agent_id, limit_type or "unknown") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.rate_limited", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=f"rate_limit_{limit_type}", data={"sender": sender, "limit_type": limit_type}, ) return # Dedupe — mark before enqueue (prevents double-enqueue on retry) client.mark_seen(event_id) # H2: Enqueue or drop entry = _QueueEntry( event=event, room_id=room_id, agent_id=agent_id, enqueue_time=time.monotonic(), ) try: queue.put_nowait(entry) qsize = queue.qsize() logger.debug("Enqueued event=%s qsize=%d", event_id, qsize) if self._on_queue_size: self._on_queue_size(qsize) except asyncio.QueueFull: logger.warning( "Queue full (max=%d): dropping event=%s room=%s agent=%s", self._queue_max, event_id, room_id, agent_id, ) if self._on_queue_dropped: self._on_queue_dropped(room_id, agent_id) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.queue_full", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="queue_full", data={"queue_max": self._queue_max, "sender": sender}, ) # ── Worker ───────────────────────────────────────────────────────────────── async def _worker( self, queue: "asyncio.Queue[Optional[_QueueEntry]]", client: MatrixClient, http_client: httpx.AsyncClient, ) -> None: """Consume queue entries until cancelled.""" while True: entry = await queue.get() # blocks until item available; raises CancelledError on cancel try: await self._process_entry(client, http_client, entry) except Exception as exc: logger.error("Worker unhandled error: %s", exc) finally: queue.task_done() if self._on_queue_size: self._on_queue_size(queue.qsize()) # ── Process (invoke + send + audit) ─────────────────────────────────────── async def _process_entry( self, client: MatrixClient, http_client: httpx.AsyncClient, entry: _QueueEntry, ) -> None: event = entry.event event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() room_id = entry.room_id agent_id = entry.agent_id # H3: Queue wait latency wait_s = time.monotonic() - entry.enqueue_time if self._on_queue_wait: self._on_queue_wait(agent_id, wait_s) logger.info( "Processing: room=%s agent=%s event=%s len=%d wait=%.3fs", room_id, agent_id, event_id, len(text), wait_s, ) if self._on_message_received: self._on_message_received(room_id, agent_id) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.message.received", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", data={"sender": sender, "text_len": len(text), "queue_wait_ms": int(wait_s * 1000)}, ) session_id = f"matrix:{room_id.replace('!', '').replace(':', '_')}" # H3: Invoke with latency t0 = time.monotonic() reply_text: Optional[str] = None invoke_ok = False invoke_duration_s = 0.0 try: reply_text = await _invoke_router( http_client, self._router_url, agent_id=agent_id, node_id=self._node_id, prompt=text, session_id=session_id, ) invoke_ok = True invoke_duration_s = time.monotonic() - t0 if self._on_invoke_latency: self._on_invoke_latency(agent_id, invoke_duration_s) logger.info( "Invoke ok: agent=%s event=%s reply_len=%d duration=%dms", agent_id, event_id, len(reply_text or ""), int(invoke_duration_s * 1000), ) except httpx.HTTPStatusError as exc: invoke_duration_s = time.monotonic() - t0 logger.error( "Router HTTP %d agent=%s event=%s duration=%dms", exc.response.status_code, agent_id, event_id, int(invoke_duration_s * 1000), ) if self._on_gateway_error: self._on_gateway_error(f"http_{exc.response.status_code}") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=f"router_http_{exc.response.status_code}", duration_ms=int(invoke_duration_s * 1000), ) except (httpx.ConnectError, httpx.TimeoutException) as exc: invoke_duration_s = time.monotonic() - t0 logger.error("Router network error agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_gateway_error: self._on_gateway_error("network_error") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="router_network_error", duration_ms=int(invoke_duration_s * 1000), ) except Exception as exc: invoke_duration_s = time.monotonic() - t0 logger.error("Unexpected invoke error agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_gateway_error: self._on_gateway_error("unexpected") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="router_unexpected", duration_ms=int(invoke_duration_s * 1000), ) if not invoke_ok or not reply_text: if invoke_ok: logger.warning("Empty reply from router agent=%s event=%s", agent_id, event_id) return # H3: Send with latency send_text = reply_text[:_REPLY_TEXT_MAX] txn_id = MatrixClient.make_txn_id(room_id, event_id) send_t0 = time.monotonic() try: await client.send_text(room_id, send_text, txn_id) send_duration_s = time.monotonic() - send_t0 if self._on_send_latency: self._on_send_latency(agent_id, send_duration_s) if self._on_message_replied: self._on_message_replied(room_id, agent_id, "ok") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.agent.replied", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", duration_ms=int(send_duration_s * 1000), data={ "reply_len": len(send_text), "truncated": len(reply_text) > _REPLY_TEXT_MAX, "router_duration_ms": int(invoke_duration_s * 1000), "queue_wait_ms": int(wait_s * 1000), }, ) logger.info( "Reply sent: agent=%s event=%s reply_len=%d send_ms=%d", agent_id, event_id, len(send_text), int(send_duration_s * 1000), ) except Exception as exc: send_duration_s = time.monotonic() - send_t0 logger.error("Send failed agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_message_replied: self._on_message_replied(room_id, agent_id, "error") if self._on_gateway_error: self._on_gateway_error("matrix_send_error") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="matrix_send_failed", duration_ms=int(send_duration_s * 1000), )