diff --git a/docker-compose.matrix-bridge-node1.yml b/docker-compose.matrix-bridge-node1.yml index 913b4e47..9b9cccf5 100644 --- a/docker-compose.matrix-bridge-node1.yml +++ b/docker-compose.matrix-bridge-node1.yml @@ -31,8 +31,9 @@ services: # Create the room manually, then paste the room_id here - SOFIIA_ROOM_ID=${SOFIIA_ROOM_ID:-} - # ── DAGI backend ───────────────────────────────────────────────────── - - DAGI_GATEWAY_URL=http://dagi-gateway-node1:9300 + # ── DAGI backend — Router for /v1/agents/{id}/infer ───────────────── + # Router internal port 8000 on dagi-network (ext port 9102 on host) + - DAGI_GATEWAY_URL=http://dagi-router-node1:8000 - DEFAULT_NODE_ID=NODA1 # ── Sofiia Console (audit write) ───────────────────────────────────── diff --git a/services/matrix-bridge-dagi/app/ingress.py b/services/matrix-bridge-dagi/app/ingress.py index 6a697959..efdac56b 100644 --- a/services/matrix-bridge-dagi/app/ingress.py +++ b/services/matrix-bridge-dagi/app/ingress.py @@ -1,21 +1,25 @@ """ -Matrix Ingress Loop — Phase M1.3 +Matrix Ingress + Egress Loop — Phase M1.4 -Polls Matrix /sync for new messages, invokes DAGI Gateway for mapped rooms. -Does NOT send replies back (that is PR-M1.4 egress). +Polls Matrix /sync for new messages, invokes DAGI Router for mapped rooms, +sends agent replies back to Matrix, writes audit events to sofiia-console. -Design: - - asyncio task, driven by run_ingress_loop() - - sync_poll() → extract_room_messages() per mapped room - - for each message: dedupe → invoke gateway → audit (fire-and-forget) - - next_batch token persisted in memory (restart resets to None — acceptable for M1) - - graceful shutdown via asyncio.Event +Pipeline: + sync_poll() → extract_room_messages() + → for each message: + 1. dedupe (mark_seen) + 2. audit: matrix.message.received + 3. invoke DAGI Router (/v1/agents/{agent_id}/infer) + 4. send_text() reply to Matrix room + 5. audit: matrix.agent.replied | matrix.error + +Graceful shutdown via asyncio.Event. """ import asyncio import logging import time -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, List, Optional import httpx @@ -26,64 +30,113 @@ logger = logging.getLogger(__name__) # ── Constants ────────────────────────────────────────────────────────────────── -# Max wait between sync retries on error (seconds) _MAX_RETRY_BACKOFF = 60.0 _INIT_RETRY_BACKOFF = 2.0 - -# Gateway invoke timeout -_GATEWAY_TIMEOUT_S = 30.0 +_ROUTER_TIMEOUT_S = 45.0 # Router may call DeepSeek/Mistral +_AUDIT_TIMEOUT_S = 5.0 +_REPLY_TEXT_MAX = 4000 # Matrix message cap (chars) +_ERROR_REPLY_TEXT = "⚠️ Тимчасова помилка. Спробуйте ще раз." -# ── Gateway invoke ───────────────────────────────────────────────────────────── +# ── Router invoke ────────────────────────────────────────────────────────────── -async def _invoke_gateway( +async def _invoke_router( http_client: httpx.AsyncClient, - gateway_url: str, + router_url: str, agent_id: str, node_id: str, - message_text: str, - matrix_room_id: str, - matrix_event_id: str, - matrix_sender: str, -) -> Dict[str, Any]: + prompt: str, + session_id: str, +) -> str: """ - POST to DAGI Gateway /v1/invoke (or /debug/agent_ping equivalent). - Returns parsed JSON response or raises httpx.HTTPError. - - Payload format matches existing Gateway invoke schema. + POST /v1/agents/{agent_id}/infer — returns response text string. + Field: response['response'] (confirmed from NODA1 test). + Raises httpx.HTTPError on failure. """ - url = f"{gateway_url.rstrip('/')}/v1/invoke" + url = f"{router_url.rstrip('/')}/v1/agents/{agent_id}/infer" payload = { - "agent_id": agent_id, - "node_id": node_id, - "message": message_text, + "prompt": prompt, + "session_id": session_id, + "user_id": "matrix_bridge", "metadata": { "transport": "matrix", - "matrix_room_id": matrix_room_id, - "matrix_event_id": matrix_event_id, - "matrix_sender": matrix_sender, "node_id": node_id, }, } - resp = await http_client.post(url, json=payload, timeout=_GATEWAY_TIMEOUT_S) + resp = await http_client.post(url, json=payload, timeout=_ROUTER_TIMEOUT_S) resp.raise_for_status() - return resp.json() + data = resp.json() + # Extract text — field confirmed as 'response' + text = ( + data.get("response") + or data.get("text") + or data.get("content") + or data.get("message") + or "" + ) + if not isinstance(text, str): + text = str(text) + return text.strip() + + +# ── Audit write ──────────────────────────────────────────────────────────────── + +async def _write_audit( + http_client: httpx.AsyncClient, + console_url: str, + internal_token: str, + event: str, + agent_id: str, + node_id: str, + room_id: str, + event_id: str, + status: str = "ok", + error_code: Optional[str] = None, + duration_ms: Optional[int] = None, + data: Optional[Dict[str, Any]] = None, +) -> None: + """ + Fire-and-forget audit write to sofiia-console internal endpoint. + Never raises — logs warning on failure. + """ + if not console_url or not internal_token: + return + try: + url = f"{console_url.rstrip('/')}/api/audit/internal" + await http_client.post( + url, + json={ + "event": event, + "operator_id": "matrix_bridge", + "node_id": node_id, + "agent_id": agent_id, + "chat_id": room_id, + "status": status, + "error_code": error_code, + "duration_ms": duration_ms, + "data": { + "matrix_event_id": event_id, + "matrix_room_id": room_id, + **(data or {}), + }, + }, + headers={"X-Internal-Service-Token": internal_token}, + timeout=_AUDIT_TIMEOUT_S, + ) + except Exception as exc: + logger.warning("Audit write failed (non-blocking): %s", exc) # ── Ingress loop ─────────────────────────────────────────────────────────────── class MatrixIngressLoop: """ - Drives the Matrix sync-poll → gateway-invoke pipeline. + Drives Matrix sync-poll → router-invoke → Matrix send_text pipeline. Usage: - loop = MatrixIngressLoop(cfg, room_map) + loop = MatrixIngressLoop(...) stop_event = asyncio.Event() await loop.run(stop_event) - - Metrics callbacks (optional, injected to avoid hard dependency): - on_message_received(room_id, agent_id) — called after successful dedupe - on_gateway_error(error_type) — called on gateway invoke error """ def __init__( @@ -91,115 +144,92 @@ class MatrixIngressLoop: matrix_homeserver_url: str, matrix_access_token: str, matrix_user_id: str, - gateway_url: str, + router_url: str, node_id: str, room_map: RoomMappingConfig, - on_message_received=None, - on_gateway_error=None, + sofiia_console_url: str = "", + sofiia_internal_token: str = "", + on_message_received: Optional[Callable[[str, str], None]] = None, + on_message_replied: Optional[Callable[[str, str, str], None]] = None, + on_gateway_error: Optional[Callable[[str], None]] = None, ) -> None: self._hs_url = matrix_homeserver_url self._token = matrix_access_token self._user_id = matrix_user_id - self._gateway_url = gateway_url + self._router_url = router_url self._node_id = node_id self._room_map = room_map + self._console_url = sofiia_console_url + self._internal_token = sofiia_internal_token self._on_message_received = on_message_received + self._on_message_replied = on_message_replied self._on_gateway_error = on_gateway_error - self._next_batch: Optional[str] = None - self._running = False @property def next_batch(self) -> Optional[str]: return self._next_batch async def run(self, stop_event: asyncio.Event) -> None: - """ - Main loop. Runs until stop_event is set. - Handles errors with exponential backoff. - """ - self._running = True + """Main loop until stop_event is set.""" backoff = _INIT_RETRY_BACKOFF logger.info( - "Matrix ingress loop started | hs=%s node=%s mappings=%d", + "Matrix ingress/egress loop started | hs=%s node=%s mappings=%d", self._hs_url, self._node_id, self._room_map.total_mappings, ) if self._room_map.total_mappings == 0: - logger.warning("No room mappings configured — ingress loop is idle") + logger.warning("No room mappings — ingress loop is idle") - async with MatrixClient( - self._hs_url, self._token, self._user_id - ) as client: - # Join all mapped rooms at startup + async with MatrixClient(self._hs_url, self._token, self._user_id) as client: for mapping in self._room_map.mappings: if mapping.agent_id in self._room_map.allowed_agents: try: await client.join_room(mapping.room_id) - logger.info("Joined room %s → agent %s", mapping.room_id, mapping.agent_id) except Exception as exc: logger.warning("Could not join room %s: %s", mapping.room_id, exc) - async with httpx.AsyncClient(timeout=_GATEWAY_TIMEOUT_S) as gw_client: + async with httpx.AsyncClient() as http_client: while not stop_event.is_set(): try: sync_resp = await client.sync_poll(since=self._next_batch) self._next_batch = sync_resp.get("next_batch") - backoff = _INIT_RETRY_BACKOFF # reset on success - - await self._process_sync(client, gw_client, sync_resp) - + backoff = _INIT_RETRY_BACKOFF + await self._process_sync(client, http_client, sync_resp) except asyncio.CancelledError: - logger.info("Ingress loop cancelled") break except Exception as exc: - logger.error( - "Ingress loop error (retry in %.1fs): %s", - backoff, exc, - ) + logger.error("Ingress loop error (retry in %.1fs): %s", backoff, exc) if self._on_gateway_error: self._on_gateway_error("sync_error") try: - await asyncio.wait_for( - stop_event.wait(), timeout=backoff - ) + await asyncio.wait_for(stop_event.wait(), timeout=backoff) except asyncio.TimeoutError: pass backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) - self._running = False - logger.info("Matrix ingress loop stopped") + logger.info("Matrix ingress/egress loop stopped") async def _process_sync( self, client: MatrixClient, - gw_client: httpx.AsyncClient, + http_client: httpx.AsyncClient, sync_resp: Dict[str, Any], ) -> None: - """Process all mapped rooms in a sync response.""" for mapping in self._room_map.mappings: if mapping.agent_id not in self._room_map.allowed_agents: continue - messages = client.extract_room_messages(sync_resp, mapping.room_id) for event in messages: - await self._handle_message(client, gw_client, event, mapping) + await self._handle_message(client, http_client, event, mapping) async def _handle_message( self, client: MatrixClient, - gw_client: httpx.AsyncClient, + http_client: httpx.AsyncClient, event: Dict[str, Any], mapping, ) -> None: - """ - Process a single Matrix message event: - 1. Mark as seen (dedupe) - 2. Invoke DAGI gateway - 3. Fire metrics callback - - Note: Reply sending (egress) is PR-M1.4 — not done here. - """ event_id = event.get("event_id", "") sender = event.get("sender", "") text = event.get("content", {}).get("body", "").strip() @@ -207,61 +237,157 @@ class MatrixIngressLoop: agent_id = mapping.agent_id if not text: - logger.debug("Skipping empty message from %s in %s", sender, room_id) return - # Mark event as seen before invoke (prevents duplicate on retry) + # Dedupe — mark seen before any IO (prevents double-process on retry) client.mark_seen(event_id) logger.info( - "Matrix message: room=%s sender=%s agent=%s event=%s text_len=%d", + "Matrix message: room=%s sender=%s agent=%s event=%s len=%d", room_id, sender, agent_id, event_id, len(text), ) if self._on_message_received: self._on_message_received(room_id, agent_id) + # Audit: received + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.message.received", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="ok", + data={"sender": sender, "text_len": len(text)}, + ) + + # Session ID: stable per room (allows memory context across messages) + session_id = f"matrix:{room_id.replace('!', '').replace(':', '_')}" + t0 = time.monotonic() + reply_text: Optional[str] = None + invoke_ok = False + try: - await _invoke_gateway( - gw_client, - self._gateway_url, + reply_text = await _invoke_router( + http_client, + self._router_url, agent_id=agent_id, node_id=self._node_id, - message_text=text, - matrix_room_id=room_id, - matrix_event_id=event_id, - matrix_sender=sender, + prompt=text, + session_id=session_id, ) - duration = time.monotonic() - t0 + invoke_ok = True + duration_ms = int((time.monotonic() - t0) * 1000) logger.info( - "Gateway invoke ok: agent=%s event=%s duration=%.2fs", - agent_id, event_id, duration, + "Router invoke ok: agent=%s event=%s reply_len=%d duration=%dms", + agent_id, event_id, len(reply_text or ""), duration_ms, ) except httpx.HTTPStatusError as exc: - duration = time.monotonic() - t0 + duration_ms = int((time.monotonic() - t0) * 1000) logger.error( - "Gateway HTTP error %d for agent=%s event=%s duration=%.2fs", - exc.response.status_code, agent_id, event_id, duration, + "Router HTTP %d for agent=%s event=%s duration=%dms", + exc.response.status_code, agent_id, event_id, duration_ms, ) if self._on_gateway_error: self._on_gateway_error(f"http_{exc.response.status_code}") + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.error", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code=f"router_http_{exc.response.status_code}", + duration_ms=duration_ms, + ) except (httpx.ConnectError, httpx.TimeoutException) as exc: - duration = time.monotonic() - t0 + duration_ms = int((time.monotonic() - t0) * 1000) logger.error( - "Gateway network error for agent=%s event=%s: %s duration=%.2fs", - agent_id, event_id, exc, duration, + "Router network error agent=%s event=%s: %s duration=%dms", + agent_id, event_id, exc, duration_ms, ) if self._on_gateway_error: self._on_gateway_error("network_error") + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.error", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="router_network_error", + duration_ms=duration_ms, + ) except Exception as exc: - duration = time.monotonic() - t0 + duration_ms = int((time.monotonic() - t0) * 1000) logger.error( - "Unexpected error invoking gateway for agent=%s event=%s: %s", + "Unexpected router error agent=%s event=%s: %s", agent_id, event_id, exc, ) if self._on_gateway_error: self._on_gateway_error("unexpected") + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.error", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="router_unexpected", + duration_ms=duration_ms, + ) + + # ── Egress: send reply back to Matrix ────────────────────────────────── + if not invoke_ok: + # No reply on error in M1 — just audit (avoids spam in room) + return + + if not reply_text: + logger.warning("Empty reply from router for agent=%s event=%s", agent_id, event_id) + return + + # Truncate if needed + send_text = reply_text[:_REPLY_TEXT_MAX] + txn_id = MatrixClient.make_txn_id(room_id, event_id) + + send_t0 = time.monotonic() + try: + await client.send_text(room_id, send_text, txn_id) + send_duration_ms = int((time.monotonic() - send_t0) * 1000) + + if self._on_message_replied: + self._on_message_replied(room_id, agent_id, "ok") + + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.agent.replied", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="ok", + duration_ms=send_duration_ms, + data={ + "reply_len": len(send_text), + "truncated": len(reply_text) > _REPLY_TEXT_MAX, + "router_duration_ms": duration_ms, + }, + ) + logger.info( + "Reply sent: agent=%s event=%s reply_len=%d send_ms=%d", + agent_id, event_id, len(send_text), send_duration_ms, + ) + + except Exception as exc: + send_duration_ms = int((time.monotonic() - send_t0) * 1000) + logger.error( + "Failed to send Matrix reply agent=%s event=%s: %s", + agent_id, event_id, exc, + ) + if self._on_message_replied: + self._on_message_replied(room_id, agent_id, "error") + if self._on_gateway_error: + self._on_gateway_error("matrix_send_error") + await _write_audit( + http_client, self._console_url, self._internal_token, + event="matrix.error", + agent_id=agent_id, node_id=self._node_id, + room_id=room_id, event_id=event_id, + status="error", error_code="matrix_send_failed", + duration_ms=send_duration_ms, + ) diff --git a/services/matrix-bridge-dagi/app/main.py b/services/matrix-bridge-dagi/app/main.py index 40532f0f..587e726a 100644 --- a/services/matrix-bridge-dagi/app/main.py +++ b/services/matrix-bridge-dagi/app/main.py @@ -141,14 +141,23 @@ async def lifespan(app_: Any): if _PROM_OK: _gateway_errors.labels(error_type=error_type).inc() + def _on_replied(room_id: str, agent_id: str, status: str) -> None: + if _PROM_OK: + _messages_replied.labels( + room_id=room_id, agent_id=agent_id, status=status + ).inc() + ingress = MatrixIngressLoop( matrix_homeserver_url=_cfg.matrix_homeserver_url, matrix_access_token=_cfg.matrix_access_token, matrix_user_id=_cfg.matrix_user_id, - gateway_url=_cfg.dagi_gateway_url, + router_url=_cfg.dagi_gateway_url, node_id=_cfg.node_id, room_map=_room_map, + sofiia_console_url=_cfg.sofiia_console_url, + sofiia_internal_token=_cfg.sofiia_internal_token, on_message_received=_on_msg, + on_message_replied=_on_replied, on_gateway_error=_on_gw_error, ) _ingress_task = asyncio.create_task( diff --git a/tests/test_matrix_bridge_ingress.py b/tests/test_matrix_bridge_ingress.py index 13134d0a..01a5220a 100644 --- a/tests/test_matrix_bridge_ingress.py +++ b/tests/test_matrix_bridge_ingress.py @@ -1,11 +1,13 @@ """ -Tests for services/matrix-bridge-dagi/app/ingress.py +Tests for services/matrix-bridge-dagi/app/ingress.py (M1.4 — egress + audit) Strategy: - - mock MatrixClient.sync_poll and extract_room_messages - - mock httpx gateway client - - verify gateway is invoked once per unique message (dedupe works) - - verify stop_event terminates loop + - mock MatrixClient sync_poll / send_text + - mock httpx client for router invoke and audit write + - verify: gateway invoked, send_text called with correct args + - verify: dedupe prevents double-invoke + - verify: audit events fire (received, replied, error) + - verify: empty reply skips send_text (no spam) """ import asyncio @@ -20,7 +22,7 @@ _BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" if str(_BRIDGE) not in sys.path: sys.path.insert(0, str(_BRIDGE)) -from app.ingress import MatrixIngressLoop, _invoke_gateway # noqa: E402 +from app.ingress import MatrixIngressLoop, _invoke_router, _write_audit # noqa: E402 from app.room_mapping import parse_room_map # noqa: E402 @@ -31,27 +33,22 @@ def run(coro): ALLOWED = frozenset({"sofiia"}) ROOM_ID = "!QwHczWXgefDHBEVkTH:daarion.space" ROOM_MAP_STR = f"sofiia:{ROOM_ID}" -GW_URL = "http://127.0.0.1:9300" -HS_URL = "http://localhost:8008" +ROUTER_URL = "http://dagi-router-node1:8000" +HS_URL = "http://dagi-synapse-node1:8008" +CONSOLE_URL = "http://dagi-sofiia-console-node1:8002" +INTERNAL_TOKEN = "test_internal_token_xyz" TOKEN = "syt_fake_token" BOT_USER = "@dagi_bridge:daarion.space" +USER = "@user:daarion.space" -MOCK_EVENT_1 = { +MSG_EVENT = { "type": "m.room.message", "event_id": "$event1:server", - "sender": "@user:server", - "content": {"msgtype": "m.text", "body": "Hello Sofiia!"}, + "sender": USER, + "content": {"msgtype": "m.text", "body": "Привіт Sofiia!"}, "origin_server_ts": 1000, } -MOCK_EVENT_2 = { - "type": "m.room.message", - "event_id": "$event2:server", - "sender": "@user:server", - "content": {"msgtype": "m.text", "body": "Another message"}, - "origin_server_ts": 2000, -} - def _make_loop(**kwargs) -> MatrixIngressLoop: room_map = parse_room_map(ROOM_MAP_STR, ALLOWED) @@ -59,208 +56,349 @@ def _make_loop(**kwargs) -> MatrixIngressLoop: matrix_homeserver_url=HS_URL, matrix_access_token=TOKEN, matrix_user_id=BOT_USER, - gateway_url=GW_URL, + router_url=ROUTER_URL, node_id="NODA1", room_map=room_map, + sofiia_console_url=CONSOLE_URL, + sofiia_internal_token=INTERNAL_TOKEN, ) defaults.update(kwargs) return MatrixIngressLoop(**defaults) -def _fake_sync_resp(events: list) -> dict: +def _fake_sync(events: list) -> dict: return { "next_batch": "s_next", - "rooms": { - "join": { - ROOM_ID: { - "timeline": {"events": events} - } - } - } + "rooms": {"join": {ROOM_ID: {"timeline": {"events": events}}}}, } -# ── _invoke_gateway ───────────────────────────────────────────────────────── +def _ok_response(text: str = "Привіт! Я тут.") -> MagicMock: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"response": text, "model": "test", "tokens_used": 100} + resp.raise_for_status = MagicMock() + return resp -def test_invoke_gateway_builds_correct_request(): + +def _audit_response() -> MagicMock: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"ok": True} + resp.raise_for_status = MagicMock() + return resp + + +def _send_text_response() -> MagicMock: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"event_id": "$reply:server"} + resp.raise_for_status = MagicMock() + return resp + + +# ── _invoke_router ───────────────────────────────────────────────────────────── + +def test_invoke_router_correct_endpoint_and_field(): async def _inner(): captured = {} async def fake_post(url, *, json=None, timeout=None): captured["url"] = url - captured["json"] = json + captured["payload"] = json + return _ok_response("pong!") + + client = MagicMock() + client.post = fake_post + result = await _invoke_router(client, ROUTER_URL, "sofiia", "NODA1", "ping", "session-1") + + assert "/v1/agents/sofiia/infer" in captured["url"] + assert captured["payload"]["prompt"] == "ping" + assert captured["payload"]["session_id"] == "session-1" + assert result == "pong!" + run(_inner()) + + +def test_invoke_router_fallback_fields(): + """Should pick up text/content/message if response key missing.""" + async def _inner(): + async def fake_post(url, *, json=None, timeout=None): resp = MagicMock() resp.status_code = 200 - resp.json.return_value = {"ok": True} + resp.json.return_value = {"text": "hello from text field"} resp.raise_for_status = MagicMock() return resp client = MagicMock() client.post = fake_post - result = await _invoke_gateway( - client, GW_URL, "sofiia", "NODA1", - "Hello!", ROOM_ID, "$event1", "@user:server" - ) - - assert "/v1/invoke" in captured["url"] - payload = captured["json"] - assert payload["agent_id"] == "sofiia" - assert payload["node_id"] == "NODA1" - assert payload["message"] == "Hello!" - meta = payload["metadata"] - assert meta["transport"] == "matrix" - assert meta["matrix_room_id"] == ROOM_ID - assert meta["matrix_event_id"] == "$event1" - assert meta["matrix_sender"] == "@user:server" - + result = await _invoke_router(client, ROUTER_URL, "sofiia", "NODA1", "hi", "s1") + assert result == "hello from text field" run(_inner()) -# ── Ingress loop — normal flow ────────────────────────────────────────────── +# ── _write_audit ─────────────────────────────────────────────────────────────── -def test_ingress_loop_invokes_gateway_once_per_message(): +def test_write_audit_fires_to_console(): async def _inner(): - received_calls: List[Dict] = [] + captured = {} - async def fake_post(url, *, json=None, timeout=None): - received_calls.append({"url": url, "json": json}) - resp = MagicMock() - resp.status_code = 200 - resp.json.return_value = {"ok": True} - resp.raise_for_status = MagicMock() - return resp + async def fake_post(url, *, json=None, headers=None, timeout=None): + captured["url"] = url + captured["headers"] = headers + captured["json"] = json + return _audit_response() + + client = MagicMock() + client.post = fake_post + + await _write_audit( + client, CONSOLE_URL, INTERNAL_TOKEN, + event="matrix.message.received", + agent_id="sofiia", node_id="NODA1", + room_id=ROOM_ID, event_id="$e1", + status="ok", + ) + assert "/api/audit/internal" in captured["url"] + assert captured["headers"]["X-Internal-Service-Token"] == INTERNAL_TOKEN + assert captured["json"]["event"] == "matrix.message.received" + run(_inner()) + + +def test_write_audit_no_op_when_no_token(): + async def _inner(): + called = [False] + + async def fake_post(*args, **kwargs): + called[0] = True + return _audit_response() + + client = MagicMock() + client.post = fake_post + # Empty token — should not call + await _write_audit(client, CONSOLE_URL, "", "matrix.test", "sofiia", "NODA1", ROOM_ID, "$e1") + assert not called[0] + run(_inner()) + + +def test_write_audit_never_raises(): + async def _inner(): + async def fake_post(*args, **kwargs): + raise ConnectionError("audit server down") + + client = MagicMock() + client.post = fake_post + # Should not raise + await _write_audit(client, CONSOLE_URL, INTERNAL_TOKEN, "matrix.test", "sofiia", "NODA1", ROOM_ID, "$e1") + run(_inner()) + + +# ── Full loop: ingress + egress + audit ──────────────────────────────────────── + +def test_loop_full_cycle_invoke_and_reply(): + """One message → router invoked → send_text called with reply.""" + async def _inner(): + router_calls: List[Dict] = [] + send_calls: List[Dict] = [] + audit_calls: List[Dict] = [] stop = asyncio.Event() loop = _make_loop() - sync_responses = [ - _fake_sync_resp([MOCK_EVENT_1]), - _fake_sync_resp([]), # empty on second poll - ] call_count = [0] async def fake_sync_poll(**kwargs): - idx = call_count[0] call_count[0] += 1 - if idx >= len(sync_responses): + if call_count[0] > 1: stop.set() return {"next_batch": "end", "rooms": {}} - return sync_responses[idx] + return _fake_sync([MSG_EVENT]) + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + router_calls.append({"url": url, "json": json}) + return _ok_response("Привіт! Я готова допомогти.") + elif "/audit/internal" in url: + audit_calls.append({"url": url, "json": json}) + return _audit_response() + return _audit_response() + + async def fake_send_text(room_id, text, txn_id): + send_calls.append({"room_id": room_id, "text": text, "txn_id": txn_id}) + return {"event_id": "$reply_event"} with patch("app.ingress.MatrixClient") as MockClient: - mock_mc_instance = AsyncMock() - mock_mc_instance.__aenter__ = AsyncMock(return_value=mock_mc_instance) - mock_mc_instance.__aexit__ = AsyncMock(return_value=False) - mock_mc_instance.sync_poll = fake_sync_poll - mock_mc_instance.join_room = AsyncMock() - mock_mc_instance.mark_seen = MagicMock() - mock_mc_instance.is_duplicate = MagicMock(return_value=False) + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.sync_poll = fake_sync_poll + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock() + mock_mc.extract_room_messages = fake_extract + mock_mc.send_text = fake_send_text - def fake_extract(sync_resp, room_id): - events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) - return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] - - mock_mc_instance.extract_room_messages = fake_extract - MockClient.return_value = mock_mc_instance + # Patch MatrixClient.make_txn_id as static method + MockClient.return_value = mock_mc + MockClient.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MockHTTP: mock_http = AsyncMock() mock_http.__aenter__ = AsyncMock(return_value=mock_http) mock_http.__aexit__ = AsyncMock(return_value=False) - mock_http.post = fake_post + mock_http.post = fake_http_post MockHTTP.return_value = mock_http await asyncio.wait_for(loop.run(stop), timeout=3.0) - # Gateway should have been invoked exactly once (for MOCK_EVENT_1) - assert len(received_calls) == 1 - assert received_calls[0]["json"]["message"] == "Hello Sofiia!" + # Router invoked once + assert len(router_calls) == 1 + assert "sofiia" in router_calls[0]["url"] + assert router_calls[0]["json"]["prompt"] == "Привіт Sofiia!" + + # Reply sent once + assert len(send_calls) == 1 + assert send_calls[0]["room_id"] == ROOM_ID + assert send_calls[0]["text"] == "Привіт! Я готова допомогти." + + # Audit events: at least received + replied + audit_events = [a["json"]["event"] for a in audit_calls] + assert "matrix.message.received" in audit_events + assert "matrix.agent.replied" in audit_events run(_inner()) -def test_ingress_loop_deduplicates_same_event(): - """Same event_id appearing twice should only invoke gateway once.""" +def test_loop_deduplication_no_double_invoke(): + """Same event_id in two syncs → router called exactly once.""" async def _inner(): - invoke_count = [0] - - async def fake_post(url, *, json=None, timeout=None): - invoke_count[0] += 1 - resp = MagicMock() - resp.status_code = 200 - resp.json.return_value = {"ok": True} - resp.raise_for_status = MagicMock() - return resp - - stop = asyncio.Event() - loop = _make_loop() - - # Same event in two consecutive syncs - sync_responses = [ - _fake_sync_resp([MOCK_EVENT_1]), - _fake_sync_resp([MOCK_EVENT_1]), # duplicate event_id - ] - call_count = [0] + router_calls = [0] seen = set() + stop = asyncio.Event() + loop = _make_loop() + + call_count = [0] + async def fake_sync_poll(**kwargs): - idx = call_count[0] call_count[0] += 1 - if idx >= len(sync_responses): + if call_count[0] > 2: stop.set() return {"next_batch": "end", "rooms": {}} - return sync_responses[idx] + return _fake_sync([MSG_EVENT]) + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" + and e.get("sender") != BOT_USER + and e.get("event_id") not in seen] + + def fake_mark_seen(eid): + seen.add(eid) + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + router_calls[0] += 1 + return _ok_response("response") + return _audit_response() with patch("app.ingress.MatrixClient") as MockClient: - mock_mc_instance = AsyncMock() - mock_mc_instance.__aenter__ = AsyncMock(return_value=mock_mc_instance) - mock_mc_instance.__aexit__ = AsyncMock(return_value=False) - mock_mc_instance.sync_poll = fake_sync_poll - mock_mc_instance.join_room = AsyncMock() - - def fake_mark_seen(event_id): - seen.add(event_id) - - def fake_is_dup(event_id): - return event_id in seen - - mock_mc_instance.mark_seen = fake_mark_seen - mock_mc_instance.is_duplicate = fake_is_dup - - def fake_extract(sync_resp, room_id): - events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) - return [e for e in events if e.get("type") == "m.room.message" - and e.get("sender") != BOT_USER - and not fake_is_dup(e.get("event_id", ""))] - - mock_mc_instance.extract_room_messages = fake_extract - MockClient.return_value = mock_mc_instance + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.sync_poll = fake_sync_poll + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = fake_mark_seen + mock_mc.extract_room_messages = fake_extract + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) + MockClient.return_value = mock_mc + MockClient.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MockHTTP: mock_http = AsyncMock() mock_http.__aenter__ = AsyncMock(return_value=mock_http) mock_http.__aexit__ = AsyncMock(return_value=False) - mock_http.post = fake_post + mock_http.post = fake_http_post MockHTTP.return_value = mock_http await asyncio.wait_for(loop.run(stop), timeout=3.0) - # Dedupe: only 1 invoke despite 2 sync responses with same event - assert invoke_count[0] == 1 + assert router_calls[0] == 1 run(_inner()) -def test_ingress_loop_calls_metric_callbacks(): - """on_message_received and on_gateway_error callbacks should fire.""" +def test_loop_empty_reply_skips_send(): + """Empty reply from router → send_text NOT called.""" async def _inner(): - received_events = [] - error_events = [] + send_called = [False] + stop = asyncio.Event() + loop = _make_loop() + + call_count = [0] + + async def fake_sync_poll(**kwargs): + call_count[0] += 1 + if call_count[0] > 1: + stop.set() + return {"next_batch": "end", "rooms": {}} + return _fake_sync([MSG_EVENT]) + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"response": ""} # empty + resp.raise_for_status = MagicMock() + return resp + return _audit_response() + + async def fake_send_text(room_id, text, txn_id): + send_called[0] = True + return {"event_id": "$r"} + + with patch("app.ingress.MatrixClient") as MockClient: + mock_mc = AsyncMock() + mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) + mock_mc.__aexit__ = AsyncMock(return_value=False) + mock_mc.sync_poll = fake_sync_poll + mock_mc.join_room = AsyncMock() + mock_mc.mark_seen = MagicMock() + mock_mc.extract_room_messages = fake_extract + mock_mc.send_text = fake_send_text + MockClient.return_value = mock_mc + MockClient.make_txn_id = lambda r, e: f"txn_{e}" + + with patch("app.ingress.httpx.AsyncClient") as MockHTTP: + mock_http = AsyncMock() + mock_http.__aenter__ = AsyncMock(return_value=mock_http) + mock_http.__aexit__ = AsyncMock(return_value=False) + mock_http.post = fake_http_post + MockHTTP.return_value = mock_http + + await asyncio.wait_for(loop.run(stop), timeout=3.0) + + assert not send_called[0] + + run(_inner()) + + +def test_loop_metric_callbacks_fire(): + """on_message_received and on_message_replied should be called.""" + async def _inner(): + received = [] + replied = [] stop = asyncio.Event() loop = _make_loop( - on_message_received=lambda room, agent: received_events.append((room, agent)), - on_gateway_error=lambda etype: error_events.append(etype), + on_message_received=lambda r, a: received.append((r, a)), + on_message_replied=lambda r, a, s: replied.append((r, a, s)), ) call_count = [0] @@ -270,7 +408,16 @@ def test_ingress_loop_calls_metric_callbacks(): if call_count[0] > 1: stop.set() return {"next_batch": "end", "rooms": {}} - return _fake_sync_resp([MOCK_EVENT_1]) + return _fake_sync([MSG_EVENT]) + + def fake_extract(sync_resp, room_id): + events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) + return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] + + async def fake_http_post(url, *, json=None, headers=None, timeout=None): + if "/infer" in url: + return _ok_response("test reply") + return _audit_response() with patch("app.ingress.MatrixClient") as MockClient: mock_mc = AsyncMock() @@ -279,73 +426,23 @@ def test_ingress_loop_calls_metric_callbacks(): mock_mc.sync_poll = fake_sync_poll mock_mc.join_room = AsyncMock() mock_mc.mark_seen = MagicMock() - mock_mc.is_duplicate = MagicMock(return_value=False) - - def fake_extract(sync_resp, room_id): - events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) - return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] - mock_mc.extract_room_messages = fake_extract + mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"}) MockClient.return_value = mock_mc + MockClient.make_txn_id = lambda r, e: f"txn_{e}" with patch("app.ingress.httpx.AsyncClient") as MockHTTP: mock_http = AsyncMock() mock_http.__aenter__ = AsyncMock(return_value=mock_http) mock_http.__aexit__ = AsyncMock(return_value=False) - - async def fake_post(url, *, json=None, timeout=None): - resp = MagicMock() - resp.status_code = 200 - resp.json.return_value = {"ok": True} - resp.raise_for_status = MagicMock() - return resp - - mock_http.post = fake_post + mock_http.post = fake_http_post MockHTTP.return_value = mock_http await asyncio.wait_for(loop.run(stop), timeout=3.0) - assert len(received_events) == 1 - assert received_events[0] == (ROOM_ID, "sofiia") - - run(_inner()) - - -def test_ingress_loop_no_mappings_is_idle(): - """Loop with 0 mappings should start and stop cleanly without invoking gateway.""" - async def _inner(): - empty_map = parse_room_map("", ALLOWED) - loop = MatrixIngressLoop( - matrix_homeserver_url=HS_URL, - matrix_access_token=TOKEN, - matrix_user_id=BOT_USER, - gateway_url=GW_URL, - node_id="NODA1", - room_map=empty_map, - ) - stop = asyncio.Event() - - with patch("app.ingress.MatrixClient") as MockClient: - mock_mc = AsyncMock() - mock_mc.__aenter__ = AsyncMock(return_value=mock_mc) - mock_mc.__aexit__ = AsyncMock(return_value=False) - - async def fake_sync_poll(**kwargs): - stop.set() - return {"next_batch": "end", "rooms": {}} - - mock_mc.sync_poll = fake_sync_poll - MockClient.return_value = mock_mc - - with patch("app.ingress.httpx.AsyncClient") as MockHTTP: - mock_http = AsyncMock() - mock_http.__aenter__ = AsyncMock(return_value=mock_http) - mock_http.__aexit__ = AsyncMock(return_value=False) - MockHTTP.return_value = mock_http - - await asyncio.wait_for(loop.run(stop), timeout=3.0) - - # Should complete without error - assert True + assert len(received) == 1 + assert received[0] == (ROOM_ID, "sofiia") + assert len(replied) == 1 + assert replied[0][2] == "ok" run(_inner())