feat(matrix-bridge-dagi): add egress, audit integration, fix router endpoint (PR-M1.4)

Closes the full Matrix ↔ DAGI loop:

Egress:
- invoke Router POST /v1/agents/{agent_id}/infer (field: prompt, response: response)
- send_text() reply to Matrix room with idempotent txn_id = make_txn_id(room_id, event_id)
- empty reply → skip send (no spam)
- reply truncated to 4000 chars if needed

Audit (via sofiia-console POST /api/audit/internal):
- matrix.message.received (on ingress)
- matrix.agent.replied (on successful reply)
- matrix.error (on router/send failure, with error_code)
- fire-and-forget: audit failures never crash the loop

Router URL fix:
- DAGI_GATEWAY_URL now points to dagi-router-node1:8000 (not gateway:9300)
- Session ID: stable per room — matrix:{room_localpart} (memory context)

9 tests: invoke endpoint, fallback fields, audit write, full cycle,
dedupe, empty reply skip, metric callbacks

Made-with: Cursor
This commit is contained in:
Apple
2026-03-03 08:06:49 -08:00
parent 8d564fbbe5
commit cad3663508
4 changed files with 540 additions and 307 deletions

View File

@@ -31,8 +31,9 @@ services:
# Create the room manually, then paste the room_id here # Create the room manually, then paste the room_id here
- SOFIIA_ROOM_ID=${SOFIIA_ROOM_ID:-} - SOFIIA_ROOM_ID=${SOFIIA_ROOM_ID:-}
# ── DAGI backend ───────────────────────────────────────────────────── # ── DAGI backend — Router for /v1/agents/{id}/infer ─────────────────
- DAGI_GATEWAY_URL=http://dagi-gateway-node1:9300 # Router internal port 8000 on dagi-network (ext port 9102 on host)
- DAGI_GATEWAY_URL=http://dagi-router-node1:8000
- DEFAULT_NODE_ID=NODA1 - DEFAULT_NODE_ID=NODA1
# ── Sofiia Console (audit write) ───────────────────────────────────── # ── Sofiia Console (audit write) ─────────────────────────────────────

View File

@@ -1,21 +1,25 @@
""" """
Matrix Ingress Loop — Phase M1.3 Matrix Ingress + Egress Loop — Phase M1.4
Polls Matrix /sync for new messages, invokes DAGI Gateway for mapped rooms. Polls Matrix /sync for new messages, invokes DAGI Router for mapped rooms,
Does NOT send replies back (that is PR-M1.4 egress). sends agent replies back to Matrix, writes audit events to sofiia-console.
Design: Pipeline:
- asyncio task, driven by run_ingress_loop() sync_poll() → extract_room_messages()
- sync_poll() → extract_room_messages() per mapped room → for each message:
- for each message: dedupe → invoke gateway → audit (fire-and-forget) 1. dedupe (mark_seen)
- next_batch token persisted in memory (restart resets to None — acceptable for M1) 2. audit: matrix.message.received
- graceful shutdown via asyncio.Event 3. invoke DAGI Router (/v1/agents/{agent_id}/infer)
4. send_text() reply to Matrix room
5. audit: matrix.agent.replied | matrix.error
Graceful shutdown via asyncio.Event.
""" """
import asyncio import asyncio
import logging import logging
import time import time
from typing import Any, Dict, Optional from typing import Any, Callable, Dict, List, Optional
import httpx import httpx
@@ -26,64 +30,113 @@ logger = logging.getLogger(__name__)
# ── Constants ────────────────────────────────────────────────────────────────── # ── Constants ──────────────────────────────────────────────────────────────────
# Max wait between sync retries on error (seconds)
_MAX_RETRY_BACKOFF = 60.0 _MAX_RETRY_BACKOFF = 60.0
_INIT_RETRY_BACKOFF = 2.0 _INIT_RETRY_BACKOFF = 2.0
_ROUTER_TIMEOUT_S = 45.0 # Router may call DeepSeek/Mistral
# Gateway invoke timeout _AUDIT_TIMEOUT_S = 5.0
_GATEWAY_TIMEOUT_S = 30.0 _REPLY_TEXT_MAX = 4000 # Matrix message cap (chars)
_ERROR_REPLY_TEXT = "⚠️ Тимчасова помилка. Спробуйте ще раз."
# ── Gateway invoke ───────────────────────────────────────────────────────────── # ── Router invoke ─────────────────────────────────────────────────────────────
async def _invoke_gateway( async def _invoke_router(
http_client: httpx.AsyncClient, http_client: httpx.AsyncClient,
gateway_url: str, router_url: str,
agent_id: str, agent_id: str,
node_id: str, node_id: str,
message_text: str, prompt: str,
matrix_room_id: str, session_id: str,
matrix_event_id: str, ) -> str:
matrix_sender: str,
) -> Dict[str, Any]:
""" """
POST to DAGI Gateway /v1/invoke (or /debug/agent_ping equivalent). POST /v1/agents/{agent_id}/infer — returns response text string.
Returns parsed JSON response or raises httpx.HTTPError. Field: response['response'] (confirmed from NODA1 test).
Raises httpx.HTTPError on failure.
Payload format matches existing Gateway invoke schema.
""" """
url = f"{gateway_url.rstrip('/')}/v1/invoke" url = f"{router_url.rstrip('/')}/v1/agents/{agent_id}/infer"
payload = { payload = {
"agent_id": agent_id, "prompt": prompt,
"node_id": node_id, "session_id": session_id,
"message": message_text, "user_id": "matrix_bridge",
"metadata": { "metadata": {
"transport": "matrix", "transport": "matrix",
"matrix_room_id": matrix_room_id,
"matrix_event_id": matrix_event_id,
"matrix_sender": matrix_sender,
"node_id": node_id, "node_id": node_id,
}, },
} }
resp = await http_client.post(url, json=payload, timeout=_GATEWAY_TIMEOUT_S) resp = await http_client.post(url, json=payload, timeout=_ROUTER_TIMEOUT_S)
resp.raise_for_status() resp.raise_for_status()
return resp.json() data = resp.json()
# Extract text — field confirmed as 'response'
text = (
data.get("response")
or data.get("text")
or data.get("content")
or data.get("message")
or ""
)
if not isinstance(text, str):
text = str(text)
return text.strip()
# ── Audit write ────────────────────────────────────────────────────────────────
async def _write_audit(
http_client: httpx.AsyncClient,
console_url: str,
internal_token: str,
event: str,
agent_id: str,
node_id: str,
room_id: str,
event_id: str,
status: str = "ok",
error_code: Optional[str] = None,
duration_ms: Optional[int] = None,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""
Fire-and-forget audit write to sofiia-console internal endpoint.
Never raises — logs warning on failure.
"""
if not console_url or not internal_token:
return
try:
url = f"{console_url.rstrip('/')}/api/audit/internal"
await http_client.post(
url,
json={
"event": event,
"operator_id": "matrix_bridge",
"node_id": node_id,
"agent_id": agent_id,
"chat_id": room_id,
"status": status,
"error_code": error_code,
"duration_ms": duration_ms,
"data": {
"matrix_event_id": event_id,
"matrix_room_id": room_id,
**(data or {}),
},
},
headers={"X-Internal-Service-Token": internal_token},
timeout=_AUDIT_TIMEOUT_S,
)
except Exception as exc:
logger.warning("Audit write failed (non-blocking): %s", exc)
# ── Ingress loop ─────────────────────────────────────────────────────────────── # ── Ingress loop ───────────────────────────────────────────────────────────────
class MatrixIngressLoop: class MatrixIngressLoop:
""" """
Drives the Matrix sync-poll → gateway-invoke pipeline. Drives Matrix sync-poll → router-invoke → Matrix send_text pipeline.
Usage: Usage:
loop = MatrixIngressLoop(cfg, room_map) loop = MatrixIngressLoop(...)
stop_event = asyncio.Event() stop_event = asyncio.Event()
await loop.run(stop_event) await loop.run(stop_event)
Metrics callbacks (optional, injected to avoid hard dependency):
on_message_received(room_id, agent_id) — called after successful dedupe
on_gateway_error(error_type) — called on gateway invoke error
""" """
def __init__( def __init__(
@@ -91,115 +144,92 @@ class MatrixIngressLoop:
matrix_homeserver_url: str, matrix_homeserver_url: str,
matrix_access_token: str, matrix_access_token: str,
matrix_user_id: str, matrix_user_id: str,
gateway_url: str, router_url: str,
node_id: str, node_id: str,
room_map: RoomMappingConfig, room_map: RoomMappingConfig,
on_message_received=None, sofiia_console_url: str = "",
on_gateway_error=None, sofiia_internal_token: str = "",
on_message_received: Optional[Callable[[str, str], None]] = None,
on_message_replied: Optional[Callable[[str, str, str], None]] = None,
on_gateway_error: Optional[Callable[[str], None]] = None,
) -> None: ) -> None:
self._hs_url = matrix_homeserver_url self._hs_url = matrix_homeserver_url
self._token = matrix_access_token self._token = matrix_access_token
self._user_id = matrix_user_id self._user_id = matrix_user_id
self._gateway_url = gateway_url self._router_url = router_url
self._node_id = node_id self._node_id = node_id
self._room_map = room_map self._room_map = room_map
self._console_url = sofiia_console_url
self._internal_token = sofiia_internal_token
self._on_message_received = on_message_received self._on_message_received = on_message_received
self._on_message_replied = on_message_replied
self._on_gateway_error = on_gateway_error self._on_gateway_error = on_gateway_error
self._next_batch: Optional[str] = None self._next_batch: Optional[str] = None
self._running = False
@property @property
def next_batch(self) -> Optional[str]: def next_batch(self) -> Optional[str]:
return self._next_batch return self._next_batch
async def run(self, stop_event: asyncio.Event) -> None: async def run(self, stop_event: asyncio.Event) -> None:
""" """Main loop until stop_event is set."""
Main loop. Runs until stop_event is set.
Handles errors with exponential backoff.
"""
self._running = True
backoff = _INIT_RETRY_BACKOFF backoff = _INIT_RETRY_BACKOFF
logger.info( logger.info(
"Matrix ingress loop started | hs=%s node=%s mappings=%d", "Matrix ingress/egress loop started | hs=%s node=%s mappings=%d",
self._hs_url, self._node_id, self._room_map.total_mappings, self._hs_url, self._node_id, self._room_map.total_mappings,
) )
if self._room_map.total_mappings == 0: if self._room_map.total_mappings == 0:
logger.warning("No room mappings configured — ingress loop is idle") logger.warning("No room mappings — ingress loop is idle")
async with MatrixClient( async with MatrixClient(self._hs_url, self._token, self._user_id) as client:
self._hs_url, self._token, self._user_id
) as client:
# Join all mapped rooms at startup
for mapping in self._room_map.mappings: for mapping in self._room_map.mappings:
if mapping.agent_id in self._room_map.allowed_agents: if mapping.agent_id in self._room_map.allowed_agents:
try: try:
await client.join_room(mapping.room_id) await client.join_room(mapping.room_id)
logger.info("Joined room %s → agent %s", mapping.room_id, mapping.agent_id)
except Exception as exc: except Exception as exc:
logger.warning("Could not join room %s: %s", mapping.room_id, exc) logger.warning("Could not join room %s: %s", mapping.room_id, exc)
async with httpx.AsyncClient(timeout=_GATEWAY_TIMEOUT_S) as gw_client: async with httpx.AsyncClient() as http_client:
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
sync_resp = await client.sync_poll(since=self._next_batch) sync_resp = await client.sync_poll(since=self._next_batch)
self._next_batch = sync_resp.get("next_batch") self._next_batch = sync_resp.get("next_batch")
backoff = _INIT_RETRY_BACKOFF # reset on success backoff = _INIT_RETRY_BACKOFF
await self._process_sync(client, http_client, sync_resp)
await self._process_sync(client, gw_client, sync_resp)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("Ingress loop cancelled")
break break
except Exception as exc: except Exception as exc:
logger.error( logger.error("Ingress loop error (retry in %.1fs): %s", backoff, exc)
"Ingress loop error (retry in %.1fs): %s",
backoff, exc,
)
if self._on_gateway_error: if self._on_gateway_error:
self._on_gateway_error("sync_error") self._on_gateway_error("sync_error")
try: try:
await asyncio.wait_for( await asyncio.wait_for(stop_event.wait(), timeout=backoff)
stop_event.wait(), timeout=backoff
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
backoff = min(backoff * 2, _MAX_RETRY_BACKOFF) backoff = min(backoff * 2, _MAX_RETRY_BACKOFF)
self._running = False logger.info("Matrix ingress/egress loop stopped")
logger.info("Matrix ingress loop stopped")
async def _process_sync( async def _process_sync(
self, self,
client: MatrixClient, client: MatrixClient,
gw_client: httpx.AsyncClient, http_client: httpx.AsyncClient,
sync_resp: Dict[str, Any], sync_resp: Dict[str, Any],
) -> None: ) -> None:
"""Process all mapped rooms in a sync response."""
for mapping in self._room_map.mappings: for mapping in self._room_map.mappings:
if mapping.agent_id not in self._room_map.allowed_agents: if mapping.agent_id not in self._room_map.allowed_agents:
continue continue
messages = client.extract_room_messages(sync_resp, mapping.room_id) messages = client.extract_room_messages(sync_resp, mapping.room_id)
for event in messages: for event in messages:
await self._handle_message(client, gw_client, event, mapping) await self._handle_message(client, http_client, event, mapping)
async def _handle_message( async def _handle_message(
self, self,
client: MatrixClient, client: MatrixClient,
gw_client: httpx.AsyncClient, http_client: httpx.AsyncClient,
event: Dict[str, Any], event: Dict[str, Any],
mapping, mapping,
) -> None: ) -> None:
"""
Process a single Matrix message event:
1. Mark as seen (dedupe)
2. Invoke DAGI gateway
3. Fire metrics callback
Note: Reply sending (egress) is PR-M1.4 — not done here.
"""
event_id = event.get("event_id", "") event_id = event.get("event_id", "")
sender = event.get("sender", "") sender = event.get("sender", "")
text = event.get("content", {}).get("body", "").strip() text = event.get("content", {}).get("body", "").strip()
@@ -207,61 +237,157 @@ class MatrixIngressLoop:
agent_id = mapping.agent_id agent_id = mapping.agent_id
if not text: if not text:
logger.debug("Skipping empty message from %s in %s", sender, room_id)
return return
# Mark event as seen before invoke (prevents duplicate on retry) # Dedupe — mark seen before any IO (prevents double-process on retry)
client.mark_seen(event_id) client.mark_seen(event_id)
logger.info( logger.info(
"Matrix message: room=%s sender=%s agent=%s event=%s text_len=%d", "Matrix message: room=%s sender=%s agent=%s event=%s len=%d",
room_id, sender, agent_id, event_id, len(text), room_id, sender, agent_id, event_id, len(text),
) )
if self._on_message_received: if self._on_message_received:
self._on_message_received(room_id, agent_id) self._on_message_received(room_id, agent_id)
# Audit: received
await _write_audit(
http_client, self._console_url, self._internal_token,
event="matrix.message.received",
agent_id=agent_id, node_id=self._node_id,
room_id=room_id, event_id=event_id,
status="ok",
data={"sender": sender, "text_len": len(text)},
)
# Session ID: stable per room (allows memory context across messages)
session_id = f"matrix:{room_id.replace('!', '').replace(':', '_')}"
t0 = time.monotonic() t0 = time.monotonic()
reply_text: Optional[str] = None
invoke_ok = False
try: try:
await _invoke_gateway( reply_text = await _invoke_router(
gw_client, http_client,
self._gateway_url, self._router_url,
agent_id=agent_id, agent_id=agent_id,
node_id=self._node_id, node_id=self._node_id,
message_text=text, prompt=text,
matrix_room_id=room_id, session_id=session_id,
matrix_event_id=event_id,
matrix_sender=sender,
) )
duration = time.monotonic() - t0 invoke_ok = True
duration_ms = int((time.monotonic() - t0) * 1000)
logger.info( logger.info(
"Gateway invoke ok: agent=%s event=%s duration=%.2fs", "Router invoke ok: agent=%s event=%s reply_len=%d duration=%dms",
agent_id, event_id, duration, agent_id, event_id, len(reply_text or ""), duration_ms,
) )
except httpx.HTTPStatusError as exc: except httpx.HTTPStatusError as exc:
duration = time.monotonic() - t0 duration_ms = int((time.monotonic() - t0) * 1000)
logger.error( logger.error(
"Gateway HTTP error %d for agent=%s event=%s duration=%.2fs", "Router HTTP %d for agent=%s event=%s duration=%dms",
exc.response.status_code, agent_id, event_id, duration, exc.response.status_code, agent_id, event_id, duration_ms,
) )
if self._on_gateway_error: if self._on_gateway_error:
self._on_gateway_error(f"http_{exc.response.status_code}") self._on_gateway_error(f"http_{exc.response.status_code}")
await _write_audit(
http_client, self._console_url, self._internal_token,
event="matrix.error",
agent_id=agent_id, node_id=self._node_id,
room_id=room_id, event_id=event_id,
status="error", error_code=f"router_http_{exc.response.status_code}",
duration_ms=duration_ms,
)
except (httpx.ConnectError, httpx.TimeoutException) as exc: except (httpx.ConnectError, httpx.TimeoutException) as exc:
duration = time.monotonic() - t0 duration_ms = int((time.monotonic() - t0) * 1000)
logger.error( logger.error(
"Gateway network error for agent=%s event=%s: %s duration=%.2fs", "Router network error agent=%s event=%s: %s duration=%dms",
agent_id, event_id, exc, duration, agent_id, event_id, exc, duration_ms,
) )
if self._on_gateway_error: if self._on_gateway_error:
self._on_gateway_error("network_error") self._on_gateway_error("network_error")
await _write_audit(
http_client, self._console_url, self._internal_token,
event="matrix.error",
agent_id=agent_id, node_id=self._node_id,
room_id=room_id, event_id=event_id,
status="error", error_code="router_network_error",
duration_ms=duration_ms,
)
except Exception as exc: except Exception as exc:
duration = time.monotonic() - t0 duration_ms = int((time.monotonic() - t0) * 1000)
logger.error( logger.error(
"Unexpected error invoking gateway for agent=%s event=%s: %s", "Unexpected router error agent=%s event=%s: %s",
agent_id, event_id, exc, agent_id, event_id, exc,
) )
if self._on_gateway_error: if self._on_gateway_error:
self._on_gateway_error("unexpected") self._on_gateway_error("unexpected")
await _write_audit(
http_client, self._console_url, self._internal_token,
event="matrix.error",
agent_id=agent_id, node_id=self._node_id,
room_id=room_id, event_id=event_id,
status="error", error_code="router_unexpected",
duration_ms=duration_ms,
)
# ── Egress: send reply back to Matrix ──────────────────────────────────
if not invoke_ok:
# No reply on error in M1 — just audit (avoids spam in room)
return
if not reply_text:
logger.warning("Empty reply from router for agent=%s event=%s", agent_id, event_id)
return
# Truncate if needed
send_text = reply_text[:_REPLY_TEXT_MAX]
txn_id = MatrixClient.make_txn_id(room_id, event_id)
send_t0 = time.monotonic()
try:
await client.send_text(room_id, send_text, txn_id)
send_duration_ms = int((time.monotonic() - send_t0) * 1000)
if self._on_message_replied:
self._on_message_replied(room_id, agent_id, "ok")
await _write_audit(
http_client, self._console_url, self._internal_token,
event="matrix.agent.replied",
agent_id=agent_id, node_id=self._node_id,
room_id=room_id, event_id=event_id,
status="ok",
duration_ms=send_duration_ms,
data={
"reply_len": len(send_text),
"truncated": len(reply_text) > _REPLY_TEXT_MAX,
"router_duration_ms": duration_ms,
},
)
logger.info(
"Reply sent: agent=%s event=%s reply_len=%d send_ms=%d",
agent_id, event_id, len(send_text), send_duration_ms,
)
except Exception as exc:
send_duration_ms = int((time.monotonic() - send_t0) * 1000)
logger.error(
"Failed to send Matrix reply agent=%s event=%s: %s",
agent_id, event_id, exc,
)
if self._on_message_replied:
self._on_message_replied(room_id, agent_id, "error")
if self._on_gateway_error:
self._on_gateway_error("matrix_send_error")
await _write_audit(
http_client, self._console_url, self._internal_token,
event="matrix.error",
agent_id=agent_id, node_id=self._node_id,
room_id=room_id, event_id=event_id,
status="error", error_code="matrix_send_failed",
duration_ms=send_duration_ms,
)

View File

@@ -141,14 +141,23 @@ async def lifespan(app_: Any):
if _PROM_OK: if _PROM_OK:
_gateway_errors.labels(error_type=error_type).inc() _gateway_errors.labels(error_type=error_type).inc()
def _on_replied(room_id: str, agent_id: str, status: str) -> None:
if _PROM_OK:
_messages_replied.labels(
room_id=room_id, agent_id=agent_id, status=status
).inc()
ingress = MatrixIngressLoop( ingress = MatrixIngressLoop(
matrix_homeserver_url=_cfg.matrix_homeserver_url, matrix_homeserver_url=_cfg.matrix_homeserver_url,
matrix_access_token=_cfg.matrix_access_token, matrix_access_token=_cfg.matrix_access_token,
matrix_user_id=_cfg.matrix_user_id, matrix_user_id=_cfg.matrix_user_id,
gateway_url=_cfg.dagi_gateway_url, router_url=_cfg.dagi_gateway_url,
node_id=_cfg.node_id, node_id=_cfg.node_id,
room_map=_room_map, room_map=_room_map,
sofiia_console_url=_cfg.sofiia_console_url,
sofiia_internal_token=_cfg.sofiia_internal_token,
on_message_received=_on_msg, on_message_received=_on_msg,
on_message_replied=_on_replied,
on_gateway_error=_on_gw_error, on_gateway_error=_on_gw_error,
) )
_ingress_task = asyncio.create_task( _ingress_task = asyncio.create_task(

View File

@@ -1,11 +1,13 @@
""" """
Tests for services/matrix-bridge-dagi/app/ingress.py Tests for services/matrix-bridge-dagi/app/ingress.py (M1.4 — egress + audit)
Strategy: Strategy:
- mock MatrixClient.sync_poll and extract_room_messages - mock MatrixClient sync_poll / send_text
- mock httpx gateway client - mock httpx client for router invoke and audit write
- verify gateway is invoked once per unique message (dedupe works) - verify: gateway invoked, send_text called with correct args
- verify stop_event terminates loop - verify: dedupe prevents double-invoke
- verify: audit events fire (received, replied, error)
- verify: empty reply skips send_text (no spam)
""" """
import asyncio import asyncio
@@ -20,7 +22,7 @@ _BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi"
if str(_BRIDGE) not in sys.path: if str(_BRIDGE) not in sys.path:
sys.path.insert(0, str(_BRIDGE)) sys.path.insert(0, str(_BRIDGE))
from app.ingress import MatrixIngressLoop, _invoke_gateway # noqa: E402 from app.ingress import MatrixIngressLoop, _invoke_router, _write_audit # noqa: E402
from app.room_mapping import parse_room_map # noqa: E402 from app.room_mapping import parse_room_map # noqa: E402
@@ -31,27 +33,22 @@ def run(coro):
ALLOWED = frozenset({"sofiia"}) ALLOWED = frozenset({"sofiia"})
ROOM_ID = "!QwHczWXgefDHBEVkTH:daarion.space" ROOM_ID = "!QwHczWXgefDHBEVkTH:daarion.space"
ROOM_MAP_STR = f"sofiia:{ROOM_ID}" ROOM_MAP_STR = f"sofiia:{ROOM_ID}"
GW_URL = "http://127.0.0.1:9300" ROUTER_URL = "http://dagi-router-node1:8000"
HS_URL = "http://localhost:8008" HS_URL = "http://dagi-synapse-node1:8008"
CONSOLE_URL = "http://dagi-sofiia-console-node1:8002"
INTERNAL_TOKEN = "test_internal_token_xyz"
TOKEN = "syt_fake_token" TOKEN = "syt_fake_token"
BOT_USER = "@dagi_bridge:daarion.space" BOT_USER = "@dagi_bridge:daarion.space"
USER = "@user:daarion.space"
MOCK_EVENT_1 = { MSG_EVENT = {
"type": "m.room.message", "type": "m.room.message",
"event_id": "$event1:server", "event_id": "$event1:server",
"sender": "@user:server", "sender": USER,
"content": {"msgtype": "m.text", "body": "Hello Sofiia!"}, "content": {"msgtype": "m.text", "body": "Привіт Sofiia!"},
"origin_server_ts": 1000, "origin_server_ts": 1000,
} }
MOCK_EVENT_2 = {
"type": "m.room.message",
"event_id": "$event2:server",
"sender": "@user:server",
"content": {"msgtype": "m.text", "body": "Another message"},
"origin_server_ts": 2000,
}
def _make_loop(**kwargs) -> MatrixIngressLoop: def _make_loop(**kwargs) -> MatrixIngressLoop:
room_map = parse_room_map(ROOM_MAP_STR, ALLOWED) room_map = parse_room_map(ROOM_MAP_STR, ALLOWED)
@@ -59,208 +56,349 @@ def _make_loop(**kwargs) -> MatrixIngressLoop:
matrix_homeserver_url=HS_URL, matrix_homeserver_url=HS_URL,
matrix_access_token=TOKEN, matrix_access_token=TOKEN,
matrix_user_id=BOT_USER, matrix_user_id=BOT_USER,
gateway_url=GW_URL, router_url=ROUTER_URL,
node_id="NODA1", node_id="NODA1",
room_map=room_map, room_map=room_map,
sofiia_console_url=CONSOLE_URL,
sofiia_internal_token=INTERNAL_TOKEN,
) )
defaults.update(kwargs) defaults.update(kwargs)
return MatrixIngressLoop(**defaults) return MatrixIngressLoop(**defaults)
def _fake_sync_resp(events: list) -> dict: def _fake_sync(events: list) -> dict:
return { return {
"next_batch": "s_next", "next_batch": "s_next",
"rooms": { "rooms": {"join": {ROOM_ID: {"timeline": {"events": events}}}},
"join": {
ROOM_ID: {
"timeline": {"events": events}
}
}
}
} }
# ── _invoke_gateway ───────────────────────────────────────────────────────── def _ok_response(text: str = "Привіт! Я тут.") -> MagicMock:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"response": text, "model": "test", "tokens_used": 100}
resp.raise_for_status = MagicMock()
return resp
def test_invoke_gateway_builds_correct_request():
def _audit_response() -> MagicMock:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"ok": True}
resp.raise_for_status = MagicMock()
return resp
def _send_text_response() -> MagicMock:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"event_id": "$reply:server"}
resp.raise_for_status = MagicMock()
return resp
# ── _invoke_router ─────────────────────────────────────────────────────────────
def test_invoke_router_correct_endpoint_and_field():
async def _inner(): async def _inner():
captured = {} captured = {}
async def fake_post(url, *, json=None, timeout=None): async def fake_post(url, *, json=None, timeout=None):
captured["url"] = url captured["url"] = url
captured["json"] = json captured["payload"] = json
return _ok_response("pong!")
client = MagicMock()
client.post = fake_post
result = await _invoke_router(client, ROUTER_URL, "sofiia", "NODA1", "ping", "session-1")
assert "/v1/agents/sofiia/infer" in captured["url"]
assert captured["payload"]["prompt"] == "ping"
assert captured["payload"]["session_id"] == "session-1"
assert result == "pong!"
run(_inner())
def test_invoke_router_fallback_fields():
"""Should pick up text/content/message if response key missing."""
async def _inner():
async def fake_post(url, *, json=None, timeout=None):
resp = MagicMock() resp = MagicMock()
resp.status_code = 200 resp.status_code = 200
resp.json.return_value = {"ok": True} resp.json.return_value = {"text": "hello from text field"}
resp.raise_for_status = MagicMock() resp.raise_for_status = MagicMock()
return resp return resp
client = MagicMock() client = MagicMock()
client.post = fake_post client.post = fake_post
result = await _invoke_gateway( result = await _invoke_router(client, ROUTER_URL, "sofiia", "NODA1", "hi", "s1")
client, GW_URL, "sofiia", "NODA1", assert result == "hello from text field"
"Hello!", ROOM_ID, "$event1", "@user:server"
)
assert "/v1/invoke" in captured["url"]
payload = captured["json"]
assert payload["agent_id"] == "sofiia"
assert payload["node_id"] == "NODA1"
assert payload["message"] == "Hello!"
meta = payload["metadata"]
assert meta["transport"] == "matrix"
assert meta["matrix_room_id"] == ROOM_ID
assert meta["matrix_event_id"] == "$event1"
assert meta["matrix_sender"] == "@user:server"
run(_inner()) run(_inner())
# ── Ingress loop — normal flow ────────────────────────────────────────────── # ── _write_audit ───────────────────────────────────────────────────────────────
def test_ingress_loop_invokes_gateway_once_per_message(): def test_write_audit_fires_to_console():
async def _inner(): async def _inner():
received_calls: List[Dict] = [] captured = {}
async def fake_post(url, *, json=None, timeout=None): async def fake_post(url, *, json=None, headers=None, timeout=None):
received_calls.append({"url": url, "json": json}) captured["url"] = url
resp = MagicMock() captured["headers"] = headers
resp.status_code = 200 captured["json"] = json
resp.json.return_value = {"ok": True} return _audit_response()
resp.raise_for_status = MagicMock()
return resp client = MagicMock()
client.post = fake_post
await _write_audit(
client, CONSOLE_URL, INTERNAL_TOKEN,
event="matrix.message.received",
agent_id="sofiia", node_id="NODA1",
room_id=ROOM_ID, event_id="$e1",
status="ok",
)
assert "/api/audit/internal" in captured["url"]
assert captured["headers"]["X-Internal-Service-Token"] == INTERNAL_TOKEN
assert captured["json"]["event"] == "matrix.message.received"
run(_inner())
def test_write_audit_no_op_when_no_token():
async def _inner():
called = [False]
async def fake_post(*args, **kwargs):
called[0] = True
return _audit_response()
client = MagicMock()
client.post = fake_post
# Empty token — should not call
await _write_audit(client, CONSOLE_URL, "", "matrix.test", "sofiia", "NODA1", ROOM_ID, "$e1")
assert not called[0]
run(_inner())
def test_write_audit_never_raises():
async def _inner():
async def fake_post(*args, **kwargs):
raise ConnectionError("audit server down")
client = MagicMock()
client.post = fake_post
# Should not raise
await _write_audit(client, CONSOLE_URL, INTERNAL_TOKEN, "matrix.test", "sofiia", "NODA1", ROOM_ID, "$e1")
run(_inner())
# ── Full loop: ingress + egress + audit ────────────────────────────────────────
def test_loop_full_cycle_invoke_and_reply():
"""One message → router invoked → send_text called with reply."""
async def _inner():
router_calls: List[Dict] = []
send_calls: List[Dict] = []
audit_calls: List[Dict] = []
stop = asyncio.Event() stop = asyncio.Event()
loop = _make_loop() loop = _make_loop()
sync_responses = [
_fake_sync_resp([MOCK_EVENT_1]),
_fake_sync_resp([]), # empty on second poll
]
call_count = [0] call_count = [0]
async def fake_sync_poll(**kwargs): async def fake_sync_poll(**kwargs):
idx = call_count[0]
call_count[0] += 1 call_count[0] += 1
if idx >= len(sync_responses): if call_count[0] > 1:
stop.set() stop.set()
return {"next_batch": "end", "rooms": {}} return {"next_batch": "end", "rooms": {}}
return sync_responses[idx] return _fake_sync([MSG_EVENT])
with patch("app.ingress.MatrixClient") as MockClient:
mock_mc_instance = AsyncMock()
mock_mc_instance.__aenter__ = AsyncMock(return_value=mock_mc_instance)
mock_mc_instance.__aexit__ = AsyncMock(return_value=False)
mock_mc_instance.sync_poll = fake_sync_poll
mock_mc_instance.join_room = AsyncMock()
mock_mc_instance.mark_seen = MagicMock()
mock_mc_instance.is_duplicate = MagicMock(return_value=False)
def fake_extract(sync_resp, room_id): def fake_extract(sync_resp, room_id):
events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", [])
return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER] return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER]
mock_mc_instance.extract_room_messages = fake_extract async def fake_http_post(url, *, json=None, headers=None, timeout=None):
MockClient.return_value = mock_mc_instance if "/infer" in url:
router_calls.append({"url": url, "json": json})
return _ok_response("Привіт! Я готова допомогти.")
elif "/audit/internal" in url:
audit_calls.append({"url": url, "json": json})
return _audit_response()
return _audit_response()
async def fake_send_text(room_id, text, txn_id):
send_calls.append({"room_id": room_id, "text": text, "txn_id": txn_id})
return {"event_id": "$reply_event"}
with patch("app.ingress.MatrixClient") as MockClient:
mock_mc = AsyncMock()
mock_mc.__aenter__ = AsyncMock(return_value=mock_mc)
mock_mc.__aexit__ = AsyncMock(return_value=False)
mock_mc.sync_poll = fake_sync_poll
mock_mc.join_room = AsyncMock()
mock_mc.mark_seen = MagicMock()
mock_mc.extract_room_messages = fake_extract
mock_mc.send_text = fake_send_text
# Patch MatrixClient.make_txn_id as static method
MockClient.return_value = mock_mc
MockClient.make_txn_id = lambda r, e: f"txn_{e}"
with patch("app.ingress.httpx.AsyncClient") as MockHTTP: with patch("app.ingress.httpx.AsyncClient") as MockHTTP:
mock_http = AsyncMock() mock_http = AsyncMock()
mock_http.__aenter__ = AsyncMock(return_value=mock_http) mock_http.__aenter__ = AsyncMock(return_value=mock_http)
mock_http.__aexit__ = AsyncMock(return_value=False) mock_http.__aexit__ = AsyncMock(return_value=False)
mock_http.post = fake_post mock_http.post = fake_http_post
MockHTTP.return_value = mock_http MockHTTP.return_value = mock_http
await asyncio.wait_for(loop.run(stop), timeout=3.0) await asyncio.wait_for(loop.run(stop), timeout=3.0)
# Gateway should have been invoked exactly once (for MOCK_EVENT_1) # Router invoked once
assert len(received_calls) == 1 assert len(router_calls) == 1
assert received_calls[0]["json"]["message"] == "Hello Sofiia!" assert "sofiia" in router_calls[0]["url"]
assert router_calls[0]["json"]["prompt"] == "Привіт Sofiia!"
# Reply sent once
assert len(send_calls) == 1
assert send_calls[0]["room_id"] == ROOM_ID
assert send_calls[0]["text"] == "Привіт! Я готова допомогти."
# Audit events: at least received + replied
audit_events = [a["json"]["event"] for a in audit_calls]
assert "matrix.message.received" in audit_events
assert "matrix.agent.replied" in audit_events
run(_inner()) run(_inner())
def test_ingress_loop_deduplicates_same_event(): def test_loop_deduplication_no_double_invoke():
"""Same event_id appearing twice should only invoke gateway once.""" """Same event_id in two syncs → router called exactly once."""
async def _inner(): async def _inner():
invoke_count = [0] router_calls = [0]
seen = set()
async def fake_post(url, *, json=None, timeout=None):
invoke_count[0] += 1
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"ok": True}
resp.raise_for_status = MagicMock()
return resp
stop = asyncio.Event() stop = asyncio.Event()
loop = _make_loop() loop = _make_loop()
# Same event in two consecutive syncs
sync_responses = [
_fake_sync_resp([MOCK_EVENT_1]),
_fake_sync_resp([MOCK_EVENT_1]), # duplicate event_id
]
call_count = [0] call_count = [0]
seen = set()
async def fake_sync_poll(**kwargs): async def fake_sync_poll(**kwargs):
idx = call_count[0]
call_count[0] += 1 call_count[0] += 1
if idx >= len(sync_responses): if call_count[0] > 2:
stop.set() stop.set()
return {"next_batch": "end", "rooms": {}} return {"next_batch": "end", "rooms": {}}
return sync_responses[idx] return _fake_sync([MSG_EVENT])
with patch("app.ingress.MatrixClient") as MockClient:
mock_mc_instance = AsyncMock()
mock_mc_instance.__aenter__ = AsyncMock(return_value=mock_mc_instance)
mock_mc_instance.__aexit__ = AsyncMock(return_value=False)
mock_mc_instance.sync_poll = fake_sync_poll
mock_mc_instance.join_room = AsyncMock()
def fake_mark_seen(event_id):
seen.add(event_id)
def fake_is_dup(event_id):
return event_id in seen
mock_mc_instance.mark_seen = fake_mark_seen
mock_mc_instance.is_duplicate = fake_is_dup
def fake_extract(sync_resp, room_id): def fake_extract(sync_resp, room_id):
events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", []) events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", [])
return [e for e in events if e.get("type") == "m.room.message" return [e for e in events if e.get("type") == "m.room.message"
and e.get("sender") != BOT_USER and e.get("sender") != BOT_USER
and not fake_is_dup(e.get("event_id", ""))] and e.get("event_id") not in seen]
mock_mc_instance.extract_room_messages = fake_extract def fake_mark_seen(eid):
MockClient.return_value = mock_mc_instance seen.add(eid)
async def fake_http_post(url, *, json=None, headers=None, timeout=None):
if "/infer" in url:
router_calls[0] += 1
return _ok_response("response")
return _audit_response()
with patch("app.ingress.MatrixClient") as MockClient:
mock_mc = AsyncMock()
mock_mc.__aenter__ = AsyncMock(return_value=mock_mc)
mock_mc.__aexit__ = AsyncMock(return_value=False)
mock_mc.sync_poll = fake_sync_poll
mock_mc.join_room = AsyncMock()
mock_mc.mark_seen = fake_mark_seen
mock_mc.extract_room_messages = fake_extract
mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"})
MockClient.return_value = mock_mc
MockClient.make_txn_id = lambda r, e: f"txn_{e}"
with patch("app.ingress.httpx.AsyncClient") as MockHTTP: with patch("app.ingress.httpx.AsyncClient") as MockHTTP:
mock_http = AsyncMock() mock_http = AsyncMock()
mock_http.__aenter__ = AsyncMock(return_value=mock_http) mock_http.__aenter__ = AsyncMock(return_value=mock_http)
mock_http.__aexit__ = AsyncMock(return_value=False) mock_http.__aexit__ = AsyncMock(return_value=False)
mock_http.post = fake_post mock_http.post = fake_http_post
MockHTTP.return_value = mock_http MockHTTP.return_value = mock_http
await asyncio.wait_for(loop.run(stop), timeout=3.0) await asyncio.wait_for(loop.run(stop), timeout=3.0)
# Dedupe: only 1 invoke despite 2 sync responses with same event assert router_calls[0] == 1
assert invoke_count[0] == 1
run(_inner()) run(_inner())
def test_ingress_loop_calls_metric_callbacks(): def test_loop_empty_reply_skips_send():
"""on_message_received and on_gateway_error callbacks should fire.""" """Empty reply from router → send_text NOT called."""
async def _inner(): async def _inner():
received_events = [] send_called = [False]
error_events = [] stop = asyncio.Event()
loop = _make_loop()
call_count = [0]
async def fake_sync_poll(**kwargs):
call_count[0] += 1
if call_count[0] > 1:
stop.set()
return {"next_batch": "end", "rooms": {}}
return _fake_sync([MSG_EVENT])
def fake_extract(sync_resp, room_id):
events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", [])
return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER]
async def fake_http_post(url, *, json=None, headers=None, timeout=None):
if "/infer" in url:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"response": ""} # empty
resp.raise_for_status = MagicMock()
return resp
return _audit_response()
async def fake_send_text(room_id, text, txn_id):
send_called[0] = True
return {"event_id": "$r"}
with patch("app.ingress.MatrixClient") as MockClient:
mock_mc = AsyncMock()
mock_mc.__aenter__ = AsyncMock(return_value=mock_mc)
mock_mc.__aexit__ = AsyncMock(return_value=False)
mock_mc.sync_poll = fake_sync_poll
mock_mc.join_room = AsyncMock()
mock_mc.mark_seen = MagicMock()
mock_mc.extract_room_messages = fake_extract
mock_mc.send_text = fake_send_text
MockClient.return_value = mock_mc
MockClient.make_txn_id = lambda r, e: f"txn_{e}"
with patch("app.ingress.httpx.AsyncClient") as MockHTTP:
mock_http = AsyncMock()
mock_http.__aenter__ = AsyncMock(return_value=mock_http)
mock_http.__aexit__ = AsyncMock(return_value=False)
mock_http.post = fake_http_post
MockHTTP.return_value = mock_http
await asyncio.wait_for(loop.run(stop), timeout=3.0)
assert not send_called[0]
run(_inner())
def test_loop_metric_callbacks_fire():
"""on_message_received and on_message_replied should be called."""
async def _inner():
received = []
replied = []
stop = asyncio.Event() stop = asyncio.Event()
loop = _make_loop( loop = _make_loop(
on_message_received=lambda room, agent: received_events.append((room, agent)), on_message_received=lambda r, a: received.append((r, a)),
on_gateway_error=lambda etype: error_events.append(etype), on_message_replied=lambda r, a, s: replied.append((r, a, s)),
) )
call_count = [0] call_count = [0]
@@ -270,7 +408,16 @@ def test_ingress_loop_calls_metric_callbacks():
if call_count[0] > 1: if call_count[0] > 1:
stop.set() stop.set()
return {"next_batch": "end", "rooms": {}} return {"next_batch": "end", "rooms": {}}
return _fake_sync_resp([MOCK_EVENT_1]) return _fake_sync([MSG_EVENT])
def fake_extract(sync_resp, room_id):
events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", [])
return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER]
async def fake_http_post(url, *, json=None, headers=None, timeout=None):
if "/infer" in url:
return _ok_response("test reply")
return _audit_response()
with patch("app.ingress.MatrixClient") as MockClient: with patch("app.ingress.MatrixClient") as MockClient:
mock_mc = AsyncMock() mock_mc = AsyncMock()
@@ -279,73 +426,23 @@ def test_ingress_loop_calls_metric_callbacks():
mock_mc.sync_poll = fake_sync_poll mock_mc.sync_poll = fake_sync_poll
mock_mc.join_room = AsyncMock() mock_mc.join_room = AsyncMock()
mock_mc.mark_seen = MagicMock() mock_mc.mark_seen = MagicMock()
mock_mc.is_duplicate = MagicMock(return_value=False)
def fake_extract(sync_resp, room_id):
events = sync_resp.get("rooms", {}).get("join", {}).get(room_id, {}).get("timeline", {}).get("events", [])
return [e for e in events if e.get("type") == "m.room.message" and e.get("sender") != BOT_USER]
mock_mc.extract_room_messages = fake_extract mock_mc.extract_room_messages = fake_extract
mock_mc.send_text = AsyncMock(return_value={"event_id": "$r"})
MockClient.return_value = mock_mc MockClient.return_value = mock_mc
MockClient.make_txn_id = lambda r, e: f"txn_{e}"
with patch("app.ingress.httpx.AsyncClient") as MockHTTP: with patch("app.ingress.httpx.AsyncClient") as MockHTTP:
mock_http = AsyncMock() mock_http = AsyncMock()
mock_http.__aenter__ = AsyncMock(return_value=mock_http) mock_http.__aenter__ = AsyncMock(return_value=mock_http)
mock_http.__aexit__ = AsyncMock(return_value=False) mock_http.__aexit__ = AsyncMock(return_value=False)
mock_http.post = fake_http_post
async def fake_post(url, *, json=None, timeout=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"ok": True}
resp.raise_for_status = MagicMock()
return resp
mock_http.post = fake_post
MockHTTP.return_value = mock_http MockHTTP.return_value = mock_http
await asyncio.wait_for(loop.run(stop), timeout=3.0) await asyncio.wait_for(loop.run(stop), timeout=3.0)
assert len(received_events) == 1 assert len(received) == 1
assert received_events[0] == (ROOM_ID, "sofiia") assert received[0] == (ROOM_ID, "sofiia")
assert len(replied) == 1
run(_inner()) assert replied[0][2] == "ok"
def test_ingress_loop_no_mappings_is_idle():
"""Loop with 0 mappings should start and stop cleanly without invoking gateway."""
async def _inner():
empty_map = parse_room_map("", ALLOWED)
loop = MatrixIngressLoop(
matrix_homeserver_url=HS_URL,
matrix_access_token=TOKEN,
matrix_user_id=BOT_USER,
gateway_url=GW_URL,
node_id="NODA1",
room_map=empty_map,
)
stop = asyncio.Event()
with patch("app.ingress.MatrixClient") as MockClient:
mock_mc = AsyncMock()
mock_mc.__aenter__ = AsyncMock(return_value=mock_mc)
mock_mc.__aexit__ = AsyncMock(return_value=False)
async def fake_sync_poll(**kwargs):
stop.set()
return {"next_batch": "end", "rooms": {}}
mock_mc.sync_poll = fake_sync_poll
MockClient.return_value = mock_mc
with patch("app.ingress.httpx.AsyncClient") as MockHTTP:
mock_http = AsyncMock()
mock_http.__aenter__ = AsyncMock(return_value=mock_http)
mock_http.__aexit__ = AsyncMock(return_value=False)
MockHTTP.return_value = mock_http
await asyncio.wait_for(loop.run(stop), timeout=3.0)
# Should complete without error
assert True
run(_inner()) run(_inner())