""" Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 + M2.2 + M3.0 (control channel) Architecture (H2): Reader task → asyncio.Queue(maxsize) → N Worker tasks ───────────────────────────────────────────────────────── Reader: sync_poll() → extract_room_messages() → rate_limit check (H1) → mark_seen / dedupe → queue.put_nowait() or DROP (audit matrix.queue_full + metric) Workers (N concurrent): queue.get() → measure wait latency (H3) → audit matrix.message.received → invoke Router (timed, H3) → send_text() (timed, H3) → audit matrix.agent.replied | matrix.error Shutdown: 1. stop_event set → reader exits loop 2. queue.join() with drain_timeout → workers finish in-flight 3. worker tasks cancelled Queue entry: _QueueEntry(event, room_id, agent_id, enqueue_time, routing_reason, is_mixed) """ import asyncio import logging import time from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional import httpx from .control import ( ControlConfig, ControlCommand, check_authorization, parse_command, is_control_message, not_implemented_reply, unknown_command_reply, unauthorized_reply, help_reply, VERB_HELP, ) from .matrix_client import MatrixClient from .mixed_routing import ( MixedRoomConfig, route_message, reply_prefix, REASON_REJECTED_UNKNOWN_AGENT, REASON_REJECTED_SLASH_TOO_LONG, REASON_REJECTED_NO_MAPPING, ) from .rate_limit import InMemoryRateLimiter from .room_mapping import RoomMappingConfig, RoomMapping logger = logging.getLogger(__name__) # ── Constants ────────────────────────────────────────────────────────────────── _MAX_RETRY_BACKOFF = 60.0 _INIT_RETRY_BACKOFF = 2.0 _ROUTER_TIMEOUT_S = 45.0 _AUDIT_TIMEOUT_S = 5.0 _REPLY_TEXT_MAX = 4000 _WORKER_GET_TIMEOUT_S = 1.0 # how long a worker waits on empty queue before re-checking # ── Queue entry ──────────────────────────────────────────────────────────────── @dataclass class _QueueEntry: event: Dict[str, Any] room_id: str agent_id: str enqueue_time: float # time.monotonic() at enqueue routing_reason: str = "direct" is_mixed: bool = False # True for mixed-room entries (reply tagging, session isolation) # ── Router invoke ────────────────────────────────────────────────────────────── async def _invoke_router( http_client: httpx.AsyncClient, router_url: str, agent_id: str, node_id: str, prompt: str, session_id: str, ) -> str: """POST /v1/agents/{agent_id}/infer → response text. Raises httpx.HTTPError on failure.""" url = f"{router_url.rstrip('/')}/v1/agents/{agent_id}/infer" payload = { "prompt": prompt, "session_id": session_id, "user_id": "matrix_bridge", "metadata": {"transport": "matrix", "node_id": node_id}, } resp = await http_client.post(url, json=payload, timeout=_ROUTER_TIMEOUT_S) resp.raise_for_status() data = resp.json() text = ( data.get("response") or data.get("text") or data.get("content") or data.get("message") or "" ) return (text if isinstance(text, str) else str(text)).strip() # ── Audit write ──────────────────────────────────────────────────────────────── async def _write_audit( http_client: httpx.AsyncClient, console_url: str, internal_token: str, event: str, agent_id: str, node_id: str, room_id: str, event_id: str, status: str = "ok", error_code: Optional[str] = None, duration_ms: Optional[int] = None, data: Optional[Dict[str, Any]] = None, ) -> None: """Fire-and-forget. Never raises.""" if not console_url or not internal_token: return try: await http_client.post( f"{console_url.rstrip('/')}/api/audit/internal", json={ "event": event, "operator_id": "matrix_bridge", "node_id": node_id, "agent_id": agent_id, "chat_id": room_id, "status": status, "error_code": error_code, "duration_ms": duration_ms, "data": {"matrix_event_id": event_id, "matrix_room_id": room_id, **(data or {})}, }, headers={"X-Internal-Service-Token": internal_token}, timeout=_AUDIT_TIMEOUT_S, ) except Exception as exc: logger.warning("Audit write failed (non-blocking): %s", exc) # ── Ingress loop (reader + workers) ─────────────────────────────────────────── class MatrixIngressLoop: """ Drives the full Matrix → Router → Matrix pipeline with backpressure. Reader task: sync → extract → rate_check → dedupe → queue.put_nowait Worker tasks: queue.get → invoke → send → audit Metric callbacks (all optional, called synchronously): on_message_received(room_id, agent_id) on_message_replied(room_id, agent_id, status) on_gateway_error(error_type) on_rate_limited(room_id, agent_id, limit_type) on_queue_dropped(room_id, agent_id) on_queue_size(current_size: int) on_invoke_latency(agent_id, duration_seconds) on_send_latency(agent_id, duration_seconds) on_queue_wait(agent_id, wait_seconds) on_routed(agent_id, reason) M2.2: successful routing on_route_rejected(room_id, reason) M2.2: routing rejection """ def __init__( self, matrix_homeserver_url: str, matrix_access_token: str, matrix_user_id: str, router_url: str, node_id: str, room_map: RoomMappingConfig, sofiia_console_url: str = "", sofiia_internal_token: str = "", rate_limiter: Optional[InMemoryRateLimiter] = None, queue_max_events: int = 100, worker_concurrency: int = 2, queue_drain_timeout_s: float = 5.0, mixed_room_config: Optional[MixedRoomConfig] = None, # M2.2: guard rails unknown_agent_behavior: str = "ignore", # "ignore" | "reply_error" max_slash_len: int = 32, mixed_concurrency_cap: int = 1, # 0 = unlimited # M3.0: control channel control_config: Optional[ControlConfig] = None, control_unauthorized_behavior: str = "ignore", # "ignore" | "reply_error" # Callbacks on_message_received: Optional[Callable[[str, str], None]] = None, on_message_replied: Optional[Callable[[str, str, str], None]] = None, on_gateway_error: Optional[Callable[[str], None]] = None, on_rate_limited: Optional[Callable[[str, str, str], None]] = None, on_queue_dropped: Optional[Callable[[str, str], None]] = None, on_queue_size: Optional[Callable[[int], None]] = None, on_invoke_latency: Optional[Callable[[str, float], None]] = None, on_send_latency: Optional[Callable[[str, float], None]] = None, on_queue_wait: Optional[Callable[[str, float], None]] = None, on_routed: Optional[Callable[[str, str], None]] = None, on_route_rejected: Optional[Callable[[str, str], None]] = None, on_control_command: Optional[Callable[[str, str, str], None]] = None, ) -> None: self._hs_url = matrix_homeserver_url self._token = matrix_access_token self._user_id = matrix_user_id self._router_url = router_url self._node_id = node_id self._room_map = room_map self._console_url = sofiia_console_url self._internal_token = sofiia_internal_token self._rate_limiter = rate_limiter self._queue_max = queue_max_events self._worker_count = worker_concurrency self._drain_timeout_s = queue_drain_timeout_s # Callbacks self._on_message_received = on_message_received self._on_message_replied = on_message_replied self._on_gateway_error = on_gateway_error self._on_rate_limited = on_rate_limited self._on_queue_dropped = on_queue_dropped self._on_queue_size = on_queue_size self._on_invoke_latency = on_invoke_latency self._on_send_latency = on_send_latency self._on_queue_wait = on_queue_wait self._mixed_room_config = mixed_room_config self._control_config = control_config self._control_unauthorized_behavior = control_unauthorized_behavior self._unknown_agent_behavior = unknown_agent_behavior self._max_slash_len = max_slash_len self._mixed_concurrency_cap = mixed_concurrency_cap self._on_routed = on_routed self._on_route_rejected = on_route_rejected self._on_control_command = on_control_command # Lazily populated semaphores keyed by "{room_id}:{agent_id}" self._concurrency_locks: Dict[str, asyncio.Semaphore] = {} self._next_batch: Optional[str] = None self._queue: Optional[asyncio.Queue] = None # exposed for /health @property def next_batch(self) -> Optional[str]: return self._next_batch @property def queue_size(self) -> int: return self._queue.qsize() if self._queue else 0 @property def worker_count(self) -> int: return self._worker_count @property def active_lock_count(self) -> int: """Number of room-agent pairs currently holding a concurrency lock.""" return sum(1 for lock in self._concurrency_locks.values() if lock.locked()) def _get_concurrency_lock(self, room_id: str, agent_id: str) -> asyncio.Semaphore: """Lazily create and return the semaphore for a (room, agent) pair.""" key = f"{room_id}:{agent_id}" if key not in self._concurrency_locks: cap = self._mixed_concurrency_cap if self._mixed_concurrency_cap > 0 else 2 ** 31 self._concurrency_locks[key] = asyncio.Semaphore(cap) return self._concurrency_locks[key] # ── Public run ───────────────────────────────────────────────────────────── async def run(self, stop_event: asyncio.Event) -> None: mixed_rooms_count = self._mixed_room_config.total_rooms if self._mixed_room_config else 0 logger.info( "Matrix ingress loop started | hs=%s node=%s mappings=%d mixed_rooms=%d " "queue_max=%d workers=%d", self._hs_url, self._node_id, self._room_map.total_mappings, mixed_rooms_count, self._queue_max, self._worker_count, ) if self._room_map.total_mappings == 0 and mixed_rooms_count == 0: logger.warning("No room mappings — ingress loop is idle") queue: asyncio.Queue[Optional[_QueueEntry]] = asyncio.Queue( maxsize=self._queue_max ) self._queue = queue async with MatrixClient(self._hs_url, self._token, self._user_id) as client: for mapping in self._room_map.mappings: if mapping.agent_id in self._room_map.allowed_agents: try: await client.join_room(mapping.room_id) except Exception as exc: logger.warning("Could not join room %s: %s", mapping.room_id, exc) if self._mixed_room_config: for room_id in self._mixed_room_config.rooms: try: await client.join_room(room_id) except Exception as exc: logger.warning("Could not join mixed room %s: %s", room_id, exc) if self._control_config and self._control_config.is_enabled: for room_id in self._control_config.control_rooms: try: await client.join_room(room_id) except Exception as exc: logger.warning("Could not join control room %s: %s", room_id, exc) logger.info( "Control channel: %d rooms, %d operators", len(self._control_config.control_rooms), len(self._control_config.operator_allowlist), ) async with httpx.AsyncClient() as http_client: # Start workers worker_tasks = [ asyncio.create_task( self._worker(queue, client, http_client), name=f"matrix_worker_{i}", ) for i in range(self._worker_count) ] # Run reader until stop_event await self._reader(client, queue, http_client, stop_event) # Drain: wait for all enqueued items to be processed logger.info( "Reader stopped. Draining queue (%d items, timeout=%.1fs)...", queue.qsize(), self._drain_timeout_s, ) try: await asyncio.wait_for(queue.join(), timeout=self._drain_timeout_s) logger.info("Queue drained successfully") except asyncio.TimeoutError: remaining = queue.qsize() logger.warning( "Drain timeout (%.1fs): %d items not processed", self._drain_timeout_s, remaining, ) # Cancel workers for task in worker_tasks: task.cancel() results = await asyncio.gather(*worker_tasks, return_exceptions=True) cancelled = sum(1 for r in results if isinstance(r, asyncio.CancelledError)) logger.info("Workers stopped (%d cancelled)", cancelled) self._queue = None logger.info("Matrix ingress loop stopped") # ── Reader ───────────────────────────────────────────────────────────────── async def _reader( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, stop_event: asyncio.Event, ) -> None: backoff = _INIT_RETRY_BACKOFF while not stop_event.is_set(): try: sync_resp = await client.sync_poll(since=self._next_batch) self._next_batch = sync_resp.get("next_batch") backoff = _INIT_RETRY_BACKOFF await self._enqueue_from_sync(client, queue, http_client, sync_resp) except asyncio.CancelledError: break except Exception as exc: logger.error("Reader error (retry in %.1fs): %s", backoff, exc) if self._on_gateway_error: self._on_gateway_error("sync_error") try: await asyncio.wait_for(stop_event.wait(), timeout=backoff) except asyncio.TimeoutError: pass backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) async def _enqueue_from_sync( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, sync_resp: Dict[str, Any], ) -> None: # M3.0: Control rooms — handled first, not forwarded to agents if self._control_config and self._control_config.is_enabled: for room_id in self._control_config.control_rooms: messages = client.extract_room_messages(sync_resp, room_id) for event in messages: await self._try_control(client, http_client, event, room_id) # Regular rooms: 1 room → 1 agent (M1 / M2.0) for mapping in self._room_map.mappings: if mapping.agent_id not in self._room_map.allowed_agents: continue messages = client.extract_room_messages(sync_resp, mapping.room_id) for event in messages: await self._try_enqueue(client, queue, http_client, event, mapping) # Mixed rooms: 1 room → N agents, routing per message (M2.1) if self._mixed_room_config: for room_id in self._mixed_room_config.rooms: messages = client.extract_room_messages(sync_resp, room_id) for event in messages: await self._try_enqueue_mixed(client, queue, http_client, event, room_id) async def _try_enqueue( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, event: Dict[str, Any], mapping: RoomMapping, ) -> None: event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() room_id = mapping.room_id agent_id = mapping.agent_id if not text: return # H1: Rate limit (before mark_seen — don't charge quota on drop) if self._rate_limiter is not None: allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender) if not allowed: logger.warning( "Rate limited: room=%s sender=%s limit_type=%s event=%s", room_id, sender, limit_type, event_id, ) if self._on_rate_limited: self._on_rate_limited(room_id, agent_id, limit_type or "unknown") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.rate_limited", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=f"rate_limit_{limit_type}", data={"sender": sender, "limit_type": limit_type}, ) return # Dedupe — mark before enqueue (prevents double-enqueue on retry) client.mark_seen(event_id) # H2: Enqueue or drop entry = _QueueEntry( event=event, room_id=room_id, agent_id=agent_id, enqueue_time=time.monotonic(), ) try: queue.put_nowait(entry) qsize = queue.qsize() logger.debug("Enqueued event=%s qsize=%d", event_id, qsize) if self._on_queue_size: self._on_queue_size(qsize) except asyncio.QueueFull: logger.warning( "Queue full (max=%d): dropping event=%s room=%s agent=%s", self._queue_max, event_id, room_id, agent_id, ) if self._on_queue_dropped: self._on_queue_dropped(room_id, agent_id) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.queue_full", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="queue_full", data={"queue_max": self._queue_max, "sender": sender}, ) async def _try_enqueue_mixed( self, client: MatrixClient, queue: "asyncio.Queue[Optional[_QueueEntry]]", http_client: httpx.AsyncClient, event: Dict[str, Any], room_id: str, ) -> None: """Enqueue a message from a mixed room, routing to the appropriate agent.""" assert self._mixed_room_config is not None event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() if not text: return # Route message to determine target agent agent_id, routing_reason, effective_text = route_message( text, room_id, self._mixed_room_config, self._room_map.allowed_agents, max_slash_len=self._max_slash_len, ) if agent_id is None: # M2.2: routing rejected — audit + metric + optional error reply logger.warning( "Mixed room %s: routing rejected reason=%s event=%s", room_id, routing_reason, event_id, ) if self._on_route_rejected: self._on_route_rejected(room_id, routing_reason) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.route.rejected", agent_id="unknown", node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=routing_reason, data={"routing_reason": routing_reason, "sender": sender, "text_len": len(text)}, ) # M2.2: optional user-facing error reply in room if self._unknown_agent_behavior == "reply_error" and routing_reason == REASON_REJECTED_UNKNOWN_AGENT: available = self._mixed_room_config.agents_for_room(room_id) # Extract agent name from text (first slash token, if any) slash_token = text.strip().split()[0].lstrip("/") if text.strip().startswith("/") else "" label = f"`/{slash_token}`" if slash_token else "this command" error_msg = ( f"⚠️ Unknown agent {label}. " f"Available in this room: {', '.join(available)}" ) txn_id = MatrixClient.make_txn_id(room_id, event_id + "_reject") try: await client.send_text(room_id, error_msg, txn_id) except Exception as exc: logger.warning("Could not send route-error reply: %s", exc) return # M2.2: successful route — fire metric callback if self._on_routed: self._on_routed(agent_id, routing_reason) # H1: Rate limit (uses final agent_id for metric tagging) if self._rate_limiter is not None: allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender) if not allowed: logger.warning( "Rate limited (mixed): room=%s sender=%s agent=%s limit_type=%s", room_id, sender, agent_id, limit_type, ) if self._on_rate_limited: self._on_rate_limited(room_id, agent_id, limit_type or "unknown") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.rate_limited", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=f"rate_limit_{limit_type}", data={"sender": sender, "limit_type": limit_type, "routing_reason": routing_reason}, ) return client.mark_seen(event_id) # Store effective_text (stripped of routing token) in a patched event copy effective_event = dict(event) effective_event["content"] = dict(event.get("content", {})) effective_event["content"]["body"] = effective_text entry = _QueueEntry( event=effective_event, room_id=room_id, agent_id=agent_id, enqueue_time=time.monotonic(), routing_reason=routing_reason, is_mixed=True, ) try: queue.put_nowait(entry) qsize = queue.qsize() logger.debug( "Enqueued (mixed): event=%s agent=%s reason=%s qsize=%d", event_id, agent_id, routing_reason, qsize, ) if self._on_queue_size: self._on_queue_size(qsize) except asyncio.QueueFull: logger.warning( "Queue full (max=%d): dropping mixed event=%s room=%s agent=%s", self._queue_max, event_id, room_id, agent_id, ) if self._on_queue_dropped: self._on_queue_dropped(room_id, agent_id) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.queue_full", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="queue_full", data={"queue_max": self._queue_max, "sender": sender}, ) # ── Control command handler ──────────────────────────────────────────────── async def _try_control( self, client: MatrixClient, http_client: httpx.AsyncClient, event: Dict[str, Any], room_id: str, ) -> None: """ Process a message from a control room. Non-command messages (not starting with '!') are silently ignored. All command attempts are audited regardless of authorization. """ assert self._control_config is not None event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() if not text or not is_control_message(text): return # not a command, ignore client.mark_seen(event_id) # Authorization check authorized, rejection_reason = check_authorization(sender, room_id, self._control_config) if not authorized: await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.control.unauthorized", agent_id="control", node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=rejection_reason, data={"sender": sender, "command_preview": text[:80]}, ) logger.warning( "Unauthorized control command: sender=%s room=%s reason=%s cmd=%r", sender, room_id, rejection_reason, text[:60], ) if self._control_unauthorized_behavior == "reply_error": try: txn_id = MatrixClient.make_txn_id(room_id, event_id + "_unauth") await client.send_text(room_id, unauthorized_reply(rejection_reason), txn_id) except Exception as exc: logger.warning("Could not send unauthorized reply: %s", exc) return # Parse command cmd = parse_command(text) if cmd is None: logger.warning("Control message from %s could not be parsed: %r", sender, text[:60]) return # Metric callback if self._on_control_command: self._on_control_command(sender, cmd.verb, cmd.subcommand) # Audit every authorized command await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.control.command", agent_id="control", node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", data={ "sender": sender, "verb": cmd.verb, "subcommand": cmd.subcommand, "args": list(cmd.args), "kwargs": dict(cmd.kwargs), "is_known": cmd.is_known, }, ) logger.info( "Control command: sender=%s verb=%s sub=%s args=%s", sender, cmd.verb, cmd.subcommand, cmd.args, ) # Build reply txn_id = MatrixClient.make_txn_id(room_id, event_id + "_ctrl") if cmd.verb == VERB_HELP: reply_text = help_reply() elif not cmd.is_known: reply_text = unknown_command_reply(cmd) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.control.unknown_cmd", agent_id="control", node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="unknown_verb", data={"verb": cmd.verb, "sender": sender}, ) else: # M3.1+ will implement actual runbook/status commands reply_text = not_implemented_reply(cmd) try: await client.send_text(room_id, reply_text, txn_id) except Exception as exc: logger.error("Could not send control reply: %s", exc) # ── Worker ───────────────────────────────────────────────────────────────── async def _worker( self, queue: "asyncio.Queue[Optional[_QueueEntry]]", client: MatrixClient, http_client: httpx.AsyncClient, ) -> None: """Consume queue entries until cancelled.""" while True: entry = await queue.get() # blocks until item available; raises CancelledError on cancel try: await self._process_entry(client, http_client, entry) except Exception as exc: logger.error("Worker unhandled error: %s", exc) finally: queue.task_done() if self._on_queue_size: self._on_queue_size(queue.qsize()) # ── Process (invoke + send + audit) ─────────────────────────────────────── async def _process_entry( self, client: MatrixClient, http_client: httpx.AsyncClient, entry: _QueueEntry, ) -> None: event = entry.event event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() room_id = entry.room_id agent_id = entry.agent_id # H3: Queue wait latency wait_s = time.monotonic() - entry.enqueue_time if self._on_queue_wait: self._on_queue_wait(agent_id, wait_s) routing_reason = entry.routing_reason is_mixed = entry.is_mixed logger.info( "Processing: room=%s agent=%s event=%s len=%d wait=%.3fs mixed=%s reason=%s", room_id, agent_id, event_id, len(text), wait_s, is_mixed, routing_reason, ) if self._on_message_received: self._on_message_received(room_id, agent_id) await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.message.received", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", data={ "sender": sender, "text_len": len(text), "queue_wait_ms": int(wait_s * 1000), "routing_reason": routing_reason, "is_mixed": is_mixed, }, ) # M2.1: session isolation per (room, agent) for mixed rooms room_key = room_id.replace("!", "").replace(":", "_") if is_mixed: session_id = f"matrix:{room_key}:{agent_id}" else: session_id = f"matrix:{room_key}" # M2.2: per-room-agent concurrency cap (only for mixed rooms; single-agent rooms unaffected) _lock = self._get_concurrency_lock(room_id, agent_id) if is_mixed and self._mixed_concurrency_cap > 0 else None if _lock is not None: await _lock.acquire() try: await self._invoke_and_send( client, http_client, entry, session_id, wait_s, is_mixed, routing_reason, ) finally: if _lock is not None: _lock.release() async def _invoke_and_send( self, client: MatrixClient, http_client: httpx.AsyncClient, entry: _QueueEntry, session_id: str, wait_s: float, is_mixed: bool, routing_reason: str, ) -> None: """Inner: invoke Router + send reply (separated for concurrency lock wrapping).""" event = entry.event event_id = event.get("event_id", "") text = event.get("content", {}).get("body", "").strip() room_id = entry.room_id agent_id = entry.agent_id # H3: Invoke with latency t0 = time.monotonic() reply_text: Optional[str] = None invoke_ok = False invoke_duration_s = 0.0 try: reply_text = await _invoke_router( http_client, self._router_url, agent_id=agent_id, node_id=self._node_id, prompt=text, session_id=session_id, ) invoke_ok = True invoke_duration_s = time.monotonic() - t0 if self._on_invoke_latency: self._on_invoke_latency(agent_id, invoke_duration_s) logger.info( "Invoke ok: agent=%s event=%s reply_len=%d duration=%dms", agent_id, event_id, len(reply_text or ""), int(invoke_duration_s * 1000), ) except httpx.HTTPStatusError as exc: invoke_duration_s = time.monotonic() - t0 logger.error( "Router HTTP %d agent=%s event=%s duration=%dms", exc.response.status_code, agent_id, event_id, int(invoke_duration_s * 1000), ) if self._on_gateway_error: self._on_gateway_error(f"http_{exc.response.status_code}") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code=f"router_http_{exc.response.status_code}", duration_ms=int(invoke_duration_s * 1000), ) except (httpx.ConnectError, httpx.TimeoutException) as exc: invoke_duration_s = time.monotonic() - t0 logger.error("Router network error agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_gateway_error: self._on_gateway_error("network_error") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="router_network_error", duration_ms=int(invoke_duration_s * 1000), ) except Exception as exc: invoke_duration_s = time.monotonic() - t0 logger.error("Unexpected invoke error agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_gateway_error: self._on_gateway_error("unexpected") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="router_unexpected", duration_ms=int(invoke_duration_s * 1000), ) if not invoke_ok or not reply_text: if invoke_ok: logger.warning("Empty reply from router agent=%s event=%s", agent_id, event_id) return # H3: Send with latency # M2.1: prefix reply with agent identity in mixed rooms ("Sofiia: ...") prefix = reply_prefix(agent_id, is_mixed) raw_reply = reply_text[:_REPLY_TEXT_MAX - len(prefix)] send_text = prefix + raw_reply txn_id = MatrixClient.make_txn_id(room_id, event_id) send_t0 = time.monotonic() try: await client.send_text(room_id, send_text, txn_id) send_duration_s = time.monotonic() - send_t0 if self._on_send_latency: self._on_send_latency(agent_id, send_duration_s) if self._on_message_replied: self._on_message_replied(room_id, agent_id, "ok") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.agent.replied", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="ok", duration_ms=int(send_duration_s * 1000), data={ "reply_len": len(send_text), "truncated": len(reply_text) > _REPLY_TEXT_MAX, "router_duration_ms": int(invoke_duration_s * 1000), "queue_wait_ms": int(wait_s * 1000), "routing_reason": routing_reason, "is_mixed": is_mixed, }, ) logger.info( "Reply sent: agent=%s event=%s reply_len=%d send_ms=%d", agent_id, event_id, len(send_text), int(send_duration_s * 1000), ) except Exception as exc: send_duration_s = time.monotonic() - send_t0 logger.error("Send failed agent=%s event=%s: %s", agent_id, event_id, exc) if self._on_message_replied: self._on_message_replied(room_id, agent_id, "error") if self._on_gateway_error: self._on_gateway_error("matrix_send_error") await _write_audit( http_client, self._console_url, self._internal_token, event="matrix.error", agent_id=agent_id, node_id=self._node_id, room_id=room_id, event_id=event_id, status="error", error_code="matrix_send_failed", duration_ms=int(send_duration_s * 1000), )