From d40b1e87c6d18e6d75efbece093c8444aad946dc Mon Sep 17 00:00:00 2001 From: Apple Date: Thu, 5 Mar 2026 01:41:20 -0800 Subject: [PATCH] feat(matrix-bridge-dagi): harden mixed rooms with safe defaults and ops visibility (M2.2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Guard rails (mixed_routing.py): - MAX_AGENTS_PER_MIXED_ROOM (default 5): fail-fast at parse time - MAX_SLASH_LEN (default 32): reject garbage/injection slash tokens - Unified rejection reasons: unknown_agent, slash_too_long, no_mapping - REASON_REJECTED_* constants (separate from success REASON_*) Ingress (ingress.py): - per-room-agent concurrency semaphore (MIXED_CONCURRENCY_CAP, default 1) - active_lock_count property for /health + prometheus - UNKNOWN_AGENT_BEHAVIOR: "ignore" (silent) | "reply_error" (inform user) - on_routed(agent_id, reason) callback for routing metrics - on_route_rejected(room_id, reason) callback for rejection metrics - matrix.route.rejected audit event on every rejection Config + main: - max_agents_per_mixed_room, max_slash_len, unknown_agent_behavior, mixed_concurrency_cap - matrix_bridge_routed_total{agent_id, reason} counter - matrix_bridge_route_rejected_total{room_id, reason} counter - matrix_bridge_active_room_agent_locks gauge - /health: mixed_guard_rails section + total_agents_in_mixed_rooms - docker-compose: all 4 new guard rail env vars Runbook: section 9 — mixed room debug guide (6 acceptance tests, routing metrics, session isolation, lock hang, config guard) Tests: 108 pass (94 → 108, +14 new tests for guard rails + callbacks + concurrency) Made-with: Cursor --- docker-compose.matrix-bridge-node1.yml | 10 ++ docs/runbook/matrix-bridge-dagi-ops.md | 164 +++++++++++++++++- services/matrix-bridge-dagi/app/config.py | 12 +- services/matrix-bridge-dagi/app/ingress.py | 95 +++++++++- services/matrix-bridge-dagi/app/main.py | 42 +++++ .../matrix-bridge-dagi/app/mixed_routing.py | 50 +++++- tests/test_matrix_bridge_mixed_ingress.py | 152 ++++++++++++++++ tests/test_matrix_bridge_mixed_routing.py | 72 +++++++- 8 files changed, 576 insertions(+), 21 deletions(-) diff --git a/docker-compose.matrix-bridge-node1.yml b/docker-compose.matrix-bridge-node1.yml index 9239f9f4..1267fe35 100644 --- a/docker-compose.matrix-bridge-node1.yml +++ b/docker-compose.matrix-bridge-node1.yml @@ -60,6 +60,16 @@ services: # "!roomX:server=helion;!roomY:server=druid" - BRIDGE_MIXED_DEFAULTS=${BRIDGE_MIXED_DEFAULTS:-} + # ── M2.2: Mixed room guard rails ──────────────────────────────────── + # Fail-fast if any room defines more agents than this + - MAX_AGENTS_PER_MIXED_ROOM=${MAX_AGENTS_PER_MIXED_ROOM:-5} + # Reject slash commands longer than this (anti-garbage / injection guard) + - MAX_SLASH_LEN=${MAX_SLASH_LEN:-32} + # What to do when unknown /slash is used: "ignore" (silent) | "reply_error" (inform user) + - UNKNOWN_AGENT_BEHAVIOR=${UNKNOWN_AGENT_BEHAVIOR:-ignore} + # Max concurrent Router invocations per (room, agent) pair; 0 = unlimited + - MIXED_CONCURRENCY_CAP=${MIXED_CONCURRENCY_CAP:-1} + healthcheck: test: - "CMD" diff --git a/docs/runbook/matrix-bridge-dagi-ops.md b/docs/runbook/matrix-bridge-dagi-ops.md index 334cefd8..3fc4e391 100644 --- a/docs/runbook/matrix-bridge-dagi-ops.md +++ b/docs/runbook/matrix-bridge-dagi-ops.md @@ -2,7 +2,7 @@ **Сервіс:** `matrix-bridge-dagi` | **Нода:** NODA1 | **Порт:** 7030 (localhost) **Stack:** Matrix (Synapse) → bridge → Router `/v1/agents/{id}/infer` → Matrix reply -**Фаза:** M1 (1 room = Sofiia), H1/H2/H3 hardening активний +**Фаза:** M2.2 (N rooms + mixed room routing), H1/H2/H3 hardening активний --- @@ -364,11 +364,167 @@ echo | openssl s_client -connect matrix.daarion.space:443 -servername matrix.daa --- +--- + +## 9. Mixed Room Debug Guide (M2.2) + +### 9.1 Перевірка конфігурації mixed rooms + +```bash +# Поточний mapping (regular + mixed) +curl -sS http://127.0.0.1:7030/bridge/mappings | python3 -m json.tool + +# Guard rail параметри (з /health) +curl -sS http://127.0.0.1:7030/health | python3 -m json.tool | python3 -c " +import sys, json; h=json.load(sys.stdin) +print('mixed_rooms:', h.get('mixed_rooms_count',0)) +print('total_agents:', h.get('total_agents_in_mixed_rooms',0)) +print('guard_rails:', json.dumps(h.get('mixed_guard_rails',{}), indent=2)) +" +``` + +### 9.2 Smoke test для mixed room (6 acceptance test cases) + +Відправляємо з `test_user` у mixed room `!roomX:daarion.space`: + +```bash +# Змінна для зручності +ROOM_ID="!roomX:daarion.space" +TOKEN="@test_user_token" # або через Element UI + +# 1. Slash → Sofiia +curl -sX POST "https://matrix.daarion.space/_matrix/client/v3/rooms/${ROOM_ID}/send/m.room.message/txn1" \ + -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ + -d '{"msgtype":"m.text","body":"/sofiia ping"}' | jq '.event_id' + +# 2. Slash → Helion +curl -sX POST "...txn2" -d '{"msgtype":"m.text","body":"/helion ping"}' | jq '.event_id' + +# 3. @mention → Sofiia +curl -sX POST "...txn3" -d '{"msgtype":"m.text","body":"@sofiia status"}' | jq '.event_id' + +# 4. colon-mention → Sofiia +curl -sX POST "...txn4" -d '{"msgtype":"m.text","body":"sofiia: status"}' | jq '.event_id' + +# 5. Plain text → default agent +curl -sX POST "...txn5" -d '{"msgtype":"m.text","body":"ping"}' | jq '.event_id' + +# 6. Unknown slash → audit matrix.route.rejected +curl -sX POST "...txn6" -d '{"msgtype":"m.text","body":"/unknown test"}' | jq '.event_id' +``` + +**Очікувана поведінка у кімнаті:** +| Команда | Reply prefix | Router agent | +|---------|-------------|--------------| +| `/sofiia ping` | `Sofiia: ...` | sofiia | +| `/helion ping` | `Helion: ...` | helion | +| `@sofiia status` | `Sofiia: ...` | sofiia | +| `sofiia: status` | `Sofiia: ...` | sofiia | +| `ping` (plain) | `: ...` | перший у списку | +| `/unknown test` | `⚠️ Unknown agent...` або тиша (залежно від `UNKNOWN_AGENT_BEHAVIOR`) | — | + +### 9.3 Перевірка routing метрик + +```bash +# Successful routing breakdown by reason +curl -sS http://127.0.0.1:7030/metrics | grep 'matrix_bridge_routed_total' +# Очікування: +# matrix_bridge_routed_total{agent_id="sofiia",reason="slash_command"} N +# matrix_bridge_routed_total{agent_id="helion",reason="slash_command"} N +# matrix_bridge_routed_total{agent_id="sofiia",reason="at_mention"} N +# matrix_bridge_routed_total{agent_id="sofiia",reason="default"} N + +# Rejections +curl -sS http://127.0.0.1:7030/metrics | grep 'matrix_bridge_route_rejected_total' +# Очікується > 0 лише якщо були /unknown або занадто довгі токени + +# Active concurrency locks +curl -sS http://127.0.0.1:7030/metrics | grep 'active_room_agent_locks' +# Зазвичай 0 (між повідомленнями) +``` + +### 9.4 Debug: "Wrong agent responds" + +**Симптом:** У mixed room `/helion ...` → відповідає sofiia, або відповідає не той агент. + +**Діагностика:** + +```bash +# 1. Перевірити audit events в sofiia-console +# (через psql або API) +curl -sS http://127.0.0.1:8002/api/audit \ + | python3 -m json.tool | grep -A5 '"event":"matrix.message.received"' \ + | grep '"routing_reason"' +# routing_reason має бути "slash_command", "at_mention", "colon_mention" або "default" + +# 2. Перевірити логи bridge +docker logs matrix-bridge-dagi-node1 --tail 100 2>&1 | grep -E 'route|Route|routing' +# Очікування: "Slash route: /helion → helion" або "Default route: → sofiia" + +# 3. Перевірити BRIDGE_MIXED_ROOM_MAP в .env +grep BRIDGE_MIXED_ROOM_MAP /opt/microdao-daarion/.env +# Формат: "!roomX:server=sofiia,helion" +# Перший у списку = default agent +``` + +**Виправлення:** +- Якщо порядок агентів неправильний — змінити `BRIDGE_MIXED_ROOM_MAP` або встановити `BRIDGE_MIXED_DEFAULTS` +- Перезапустити bridge: `docker restart matrix-bridge-dagi-node1` + +### 9.5 Debug: "Session context змішується між агентами" + +**Симптом:** Helion "пам'ятає" контекст розмови sofiia. + +**Перевірка:** Session key у логах (`session_id` у invoke payload) + +```bash +docker logs matrix-bridge-dagi-node1 --tail 50 2>&1 | grep session_id +# Очікування для mixed room: +# session_id = "matrix:roomX_daarion_space:sofiia" ← ізольований per-agent +# session_id = "matrix:roomX_daarion_space:helion" ← окремий контекст +``` + +Якщо обидва агенти мають однаковий `session_id` — це баг рефакторингу, відкати на M2.0. + +### 9.6 Debug: Concurrency lock "застрявання" + +**Симптом:** Запит зависає, не відповідає, active_lock_count > 0 протягом >60s. + +```bash +# Перевірити active locks +curl -sS http://127.0.0.1:7030/health | python3 -m json.tool | python3 -c \ + "import sys,json; h=json.load(sys.stdin); print(h.get('mixed_guard_rails',{}).get('active_room_agent_locks',0))" + +# Якщо > 0 протягом довгого часу — Router застряг +curl -sS http://127.0.0.1:9102/health | jq '.status' + +# Якщо Router недоступний — перезапуск bridge звільнить locks (graceful shutdown + cancel) +docker restart matrix-bridge-dagi-node1 +``` + +### 9.7 Guard rail: перевірка MAX_AGENTS_PER_MIXED_ROOM + +Якщо у `.env` є рядок з 6+ агентами і `MAX_AGENTS_PER_MIXED_ROOM=5`: + +```bash +docker logs matrix-bridge-dagi-node1 --tail 20 2>&1 | grep 'Config error\|MAX_AGENTS' +# Очікується: "❌ Config error: BRIDGE_MIXED_ROOM_MAP parse errors: Room ... has 6 agents > MAX..." +# Bridge не стартує → /health поверне {"ok":false,"error":"..."} +``` + +**Виправлення:** Зменшити кількість агентів або збільшити `MAX_AGENTS_PER_MIXED_ROOM`. + +--- + ## 8. Що прикріпити до інциденту ```bash echo "=== /health ===" && curl -sS http://127.0.0.1:7030/health | python3 -m json.tool -echo "=== /metrics traffic ===" && curl -sS http://127.0.0.1:7030/metrics | grep -E 'matrix_bridge_(messages|rate_limited|queue_dropped|gateway_errors)' -echo "=== /metrics latency ===" && curl -sS http://127.0.0.1:7030/metrics | grep -E '(invoke|send|queue_wait)_duration_seconds_(count|sum)' -echo "=== logs ===" && docker logs matrix-bridge-dagi-node1 --tail 50 2>&1 | grep -E 'ERROR|WARN|rate_limited|queue_full|Reply sent|invoke ok' +echo "=== /bridge/mappings ===" && curl -sS http://127.0.0.1:7030/bridge/mappings | python3 -m json.tool +echo "=== /metrics traffic ===" && curl -sS http://127.0.0.1:7030/metrics \ + | grep -E 'matrix_bridge_(messages|rate_limited|queue_dropped|gateway_errors|routed|route_rejected)' +echo "=== /metrics latency ===" && curl -sS http://127.0.0.1:7030/metrics \ + | grep -E '(invoke|send|queue_wait)_duration_seconds_(count|sum)' +echo "=== logs ===" && docker logs matrix-bridge-dagi-node1 --tail 50 2>&1 \ + | grep -E 'ERROR|WARN|rate_limited|queue_full|Reply sent|invoke ok|route|rejected' ``` diff --git a/services/matrix-bridge-dagi/app/config.py b/services/matrix-bridge-dagi/app/config.py index c5267f15..5d2eb883 100644 --- a/services/matrix-bridge-dagi/app/config.py +++ b/services/matrix-bridge-dagi/app/config.py @@ -1,5 +1,5 @@ """ -matrix-bridge-dagi — configuration and validation (M2.1: mixed rooms) +matrix-bridge-dagi — configuration and validation (M2.1 + M2.2: mixed rooms + guard rails) """ import os from dataclasses import dataclass, field @@ -40,6 +40,12 @@ class BridgeConfig: # "!roomX:server=helion" — explicit default per mixed room (optional) bridge_mixed_defaults: str + # M2.2: Mixed room guard rails + max_agents_per_mixed_room: int # fail-fast if room defines more agents than this + max_slash_len: int # reject slash token longer than this (anti-garbage) + unknown_agent_behavior: str # "ignore" | "reply_error" + mixed_concurrency_cap: int # max parallel invokes per (room, agent); 0 = unlimited + # Service identity node_id: str build_sha: str @@ -78,6 +84,10 @@ def load_config() -> BridgeConfig: queue_drain_timeout_s=max(1.0, float(_optional("QUEUE_DRAIN_TIMEOUT_S", "5"))), bridge_mixed_room_map=_optional("BRIDGE_MIXED_ROOM_MAP", ""), bridge_mixed_defaults=_optional("BRIDGE_MIXED_DEFAULTS", ""), + max_agents_per_mixed_room=max(1, int(_optional("MAX_AGENTS_PER_MIXED_ROOM", "5"))), + max_slash_len=max(4, int(_optional("MAX_SLASH_LEN", "32"))), + unknown_agent_behavior=_optional("UNKNOWN_AGENT_BEHAVIOR", "ignore"), + mixed_concurrency_cap=max(0, int(_optional("MIXED_CONCURRENCY_CAP", "1"))), node_id=_optional("NODE_ID", "NODA1"), build_sha=_optional("BUILD_SHA", "dev"), build_time=_optional("BUILD_TIME", "local"), diff --git a/services/matrix-bridge-dagi/app/ingress.py b/services/matrix-bridge-dagi/app/ingress.py index e0d46419..13eccee3 100644 --- a/services/matrix-bridge-dagi/app/ingress.py +++ b/services/matrix-bridge-dagi/app/ingress.py @@ -1,5 +1,5 @@ """ -Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 (mixed rooms) +Matrix Ingress + Egress Loop — Phase M1.4 + H1 + H2 + H3 + M2.1 + M2.2 (mixed rooms hardening) Architecture (H2): Reader task → asyncio.Queue(maxsize) → N Worker tasks @@ -34,7 +34,10 @@ from typing import Any, Callable, Dict, List, Optional import httpx from .matrix_client import MatrixClient -from .mixed_routing import MixedRoomConfig, route_message, reply_prefix +from .mixed_routing import ( + MixedRoomConfig, route_message, reply_prefix, + REASON_REJECTED_UNKNOWN_AGENT, REASON_REJECTED_SLASH_TOO_LONG, REASON_REJECTED_NO_MAPPING, +) from .rate_limit import InMemoryRateLimiter from .room_mapping import RoomMappingConfig, RoomMapping @@ -152,6 +155,8 @@ class MatrixIngressLoop: on_invoke_latency(agent_id, duration_seconds) on_send_latency(agent_id, duration_seconds) on_queue_wait(agent_id, wait_seconds) + on_routed(agent_id, reason) M2.2: successful routing + on_route_rejected(room_id, reason) M2.2: routing rejection """ def __init__( @@ -169,6 +174,10 @@ class MatrixIngressLoop: worker_concurrency: int = 2, queue_drain_timeout_s: float = 5.0, mixed_room_config: Optional[MixedRoomConfig] = None, + # M2.2: guard rails + unknown_agent_behavior: str = "ignore", # "ignore" | "reply_error" + max_slash_len: int = 32, + mixed_concurrency_cap: int = 1, # 0 = unlimited # Callbacks on_message_received: Optional[Callable[[str, str], None]] = None, on_message_replied: Optional[Callable[[str, str, str], None]] = None, @@ -179,6 +188,8 @@ class MatrixIngressLoop: on_invoke_latency: Optional[Callable[[str, float], None]] = None, on_send_latency: Optional[Callable[[str, float], None]] = None, on_queue_wait: Optional[Callable[[str, float], None]] = None, + on_routed: Optional[Callable[[str, str], None]] = None, + on_route_rejected: Optional[Callable[[str, str], None]] = None, ) -> None: self._hs_url = matrix_homeserver_url self._token = matrix_access_token @@ -203,6 +214,13 @@ class MatrixIngressLoop: self._on_send_latency = on_send_latency self._on_queue_wait = on_queue_wait self._mixed_room_config = mixed_room_config + self._unknown_agent_behavior = unknown_agent_behavior + self._max_slash_len = max_slash_len + self._mixed_concurrency_cap = mixed_concurrency_cap + self._on_routed = on_routed + self._on_route_rejected = on_route_rejected + # Lazily populated semaphores keyed by "{room_id}:{agent_id}" + self._concurrency_locks: Dict[str, asyncio.Semaphore] = {} self._next_batch: Optional[str] = None self._queue: Optional[asyncio.Queue] = None # exposed for /health @@ -218,6 +236,19 @@ class MatrixIngressLoop: def worker_count(self) -> int: return self._worker_count + @property + def active_lock_count(self) -> int: + """Number of room-agent pairs currently holding a concurrency lock.""" + return sum(1 for lock in self._concurrency_locks.values() if lock.locked()) + + def _get_concurrency_lock(self, room_id: str, agent_id: str) -> asyncio.Semaphore: + """Lazily create and return the semaphore for a (room, agent) pair.""" + key = f"{room_id}:{agent_id}" + if key not in self._concurrency_locks: + cap = self._mixed_concurrency_cap if self._mixed_concurrency_cap > 0 else 2 ** 31 + self._concurrency_locks[key] = asyncio.Semaphore(cap) + return self._concurrency_locks[key] + # ── Public run ───────────────────────────────────────────────────────────── async def run(self, stop_event: asyncio.Event) -> None: @@ -428,23 +459,46 @@ class MatrixIngressLoop: # Route message to determine target agent agent_id, routing_reason, effective_text = route_message( text, room_id, self._mixed_room_config, self._room_map.allowed_agents, + max_slash_len=self._max_slash_len, ) if agent_id is None: + # M2.2: routing rejected — audit + metric + optional error reply logger.warning( - "Mixed room %s: unresolvable routing reason=%s event=%s — skipping", + "Mixed room %s: routing rejected reason=%s event=%s", room_id, routing_reason, event_id, ) + if self._on_route_rejected: + self._on_route_rejected(room_id, routing_reason) await _write_audit( http_client, self._console_url, self._internal_token, - event="matrix.error", + event="matrix.route.rejected", agent_id="unknown", node_id=self._node_id, room_id=room_id, event_id=event_id, - status="error", error_code="no_agent_for_message", - data={"routing_reason": routing_reason, "sender": sender}, + status="error", error_code=routing_reason, + data={"routing_reason": routing_reason, "sender": sender, "text_len": len(text)}, ) + # M2.2: optional user-facing error reply in room + if self._unknown_agent_behavior == "reply_error" and routing_reason == REASON_REJECTED_UNKNOWN_AGENT: + available = self._mixed_room_config.agents_for_room(room_id) + # Extract agent name from text (first slash token, if any) + slash_token = text.strip().split()[0].lstrip("/") if text.strip().startswith("/") else "" + label = f"`/{slash_token}`" if slash_token else "this command" + error_msg = ( + f"⚠️ Unknown agent {label}. " + f"Available in this room: {', '.join(available)}" + ) + txn_id = MatrixClient.make_txn_id(room_id, event_id + "_reject") + try: + await client.send_text(room_id, error_msg, txn_id) + except Exception as exc: + logger.warning("Could not send route-error reply: %s", exc) return + # M2.2: successful route — fire metric callback + if self._on_routed: + self._on_routed(agent_id, routing_reason) + # H1: Rate limit (uses final agent_id for metric tagging) if self._rate_limiter is not None: allowed, limit_type = self._rate_limiter.check(room_id=room_id, sender=sender) @@ -578,6 +632,35 @@ class MatrixIngressLoop: else: session_id = f"matrix:{room_key}" + # M2.2: per-room-agent concurrency cap (only for mixed rooms; single-agent rooms unaffected) + _lock = self._get_concurrency_lock(room_id, agent_id) if is_mixed and self._mixed_concurrency_cap > 0 else None + if _lock is not None: + await _lock.acquire() + try: + await self._invoke_and_send( + client, http_client, entry, session_id, wait_s, is_mixed, routing_reason, + ) + finally: + if _lock is not None: + _lock.release() + + async def _invoke_and_send( + self, + client: MatrixClient, + http_client: httpx.AsyncClient, + entry: _QueueEntry, + session_id: str, + wait_s: float, + is_mixed: bool, + routing_reason: str, + ) -> None: + """Inner: invoke Router + send reply (separated for concurrency lock wrapping).""" + event = entry.event + event_id = event.get("event_id", "") + text = event.get("content", {}).get("body", "").strip() + room_id = entry.room_id + agent_id = entry.agent_id + # H3: Invoke with latency t0 = time.monotonic() reply_text: Optional[str] = None diff --git a/services/matrix-bridge-dagi/app/main.py b/services/matrix-bridge-dagi/app/main.py index ac318bb0..d7f746f2 100644 --- a/services/matrix-bridge-dagi/app/main.py +++ b/services/matrix-bridge-dagi/app/main.py @@ -105,6 +105,21 @@ if _PROM_OK: ["agent_id"], buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 30.0], ) + # M2.2: Mixed room routing metrics + _routed_total = Counter( + "matrix_bridge_routed_total", + "Successful message routing by reason", + ["agent_id", "reason"], + ) + _route_rejected_total = Counter( + "matrix_bridge_route_rejected_total", + "Messages rejected during routing (unknown agent, bad slash, etc.)", + ["room_id", "reason"], + ) + _active_room_agent_locks = Gauge( + "matrix_bridge_active_room_agent_locks", + "Number of room-agent pairs currently holding a concurrency lock", + ) # ── Startup state ───────────────────────────────────────────────────────────── _START_TIME = time.monotonic() @@ -151,6 +166,7 @@ async def lifespan(app_: Any): _cfg.bridge_mixed_room_map, _cfg.bridge_mixed_defaults, _cfg.bridge_allowed_agents, + max_agents_per_room=_cfg.max_agents_per_mixed_room, ) logger.info( "✅ Mixed room config: %d rooms, agents=%s", @@ -249,6 +265,15 @@ async def lifespan(app_: Any): if _PROM_OK: _queue_wait.labels(agent_id=agent_id).observe(wait_s) + # M2.2 callbacks + def _on_routed(agent_id: str, reason: str) -> None: + if _PROM_OK: + _routed_total.labels(agent_id=agent_id, reason=reason).inc() + + def _on_route_rejected(room_id: str, reason: str) -> None: + if _PROM_OK: + _route_rejected_total.labels(room_id=room_id, reason=reason).inc() + ingress = MatrixIngressLoop( matrix_homeserver_url=_cfg.matrix_homeserver_url, matrix_access_token=_cfg.matrix_access_token, @@ -263,6 +288,9 @@ async def lifespan(app_: Any): worker_concurrency=_cfg.worker_concurrency, queue_drain_timeout_s=_cfg.queue_drain_timeout_s, mixed_room_config=_mixed_room_config, + unknown_agent_behavior=_cfg.unknown_agent_behavior, + max_slash_len=_cfg.max_slash_len, + mixed_concurrency_cap=_cfg.mixed_concurrency_cap, on_message_received=_on_msg, on_message_replied=_on_replied, on_gateway_error=_on_gw_error, @@ -272,6 +300,8 @@ async def lifespan(app_: Any): on_invoke_latency=_on_invoke_latency, on_send_latency=_on_send_latency, on_queue_wait=_on_queue_wait, + on_routed=_on_routed, + on_route_rejected=_on_route_rejected, ) logger.info( "✅ Backpressure queue: max=%d workers=%d drain_timeout=%.1fs", @@ -321,6 +351,8 @@ app.add_middleware( @app.get("/health") async def health() -> Dict[str, Any]: uptime = int(time.monotonic() - _START_TIME) + if _PROM_OK and _ingress_loop is not None: + _active_room_agent_locks.set(_ingress_loop.active_lock_count) if _config_error or _cfg is None: return { "ok": False, @@ -351,6 +383,9 @@ async def health() -> Dict[str, Any]: "gateway_reachable": _gateway_reachable, "mappings_count": _room_map.total_mappings if _room_map else 0, "mixed_rooms_count": _mixed_room_config.total_rooms if _mixed_room_config else 0, + "total_agents_in_mixed_rooms": sum( + len(r.agents) for r in _mixed_room_config.rooms.values() + ) if _mixed_room_config else 0, "config_ok": True, "rate_limiter": _rate_limiter.stats() if _rate_limiter else None, "queue": { @@ -358,6 +393,13 @@ async def health() -> Dict[str, Any]: "max": _cfg.queue_max_events, "workers": _cfg.worker_concurrency, }, + "mixed_guard_rails": { + "max_agents_per_room": _cfg.max_agents_per_mixed_room, + "max_slash_len": _cfg.max_slash_len, + "unknown_agent_behavior": _cfg.unknown_agent_behavior, + "concurrency_cap": _cfg.mixed_concurrency_cap, + "active_room_agent_locks": _ingress_loop.active_lock_count if _ingress_loop else 0, + }, } diff --git a/services/matrix-bridge-dagi/app/mixed_routing.py b/services/matrix-bridge-dagi/app/mixed_routing.py index 5c84a0c3..040a6a1c 100644 --- a/services/matrix-bridge-dagi/app/mixed_routing.py +++ b/services/matrix-bridge-dagi/app/mixed_routing.py @@ -1,5 +1,5 @@ """ -Mixed-Room Routing — Phase M2.1 +Mixed-Room Routing — Phase M2.1 + M2.2 (guard rails + rejection audit) Supports 1 room → N agents with deterministic message routing. @@ -33,12 +33,21 @@ _SLASH_RE = re.compile(r"^/([A-Za-z0-9_\-]+)\s*(.*)", re.DOTALL) _MENTION_AT_RE = re.compile(r"^@([A-Za-z0-9_\-]+)\s*(.*)", re.DOTALL) _MENTION_COLON_RE = re.compile(r"^([A-Za-z0-9_\-]+):\s+(.*)", re.DOTALL) -# Routing reason labels +# Routing reason labels (successful routes) REASON_SLASH = "slash_command" REASON_AT_MENTION = "at_mention" REASON_COLON_MENTION = "colon_mention" REASON_DEFAULT = "default" +# Rejection reason labels (route_message returns agent_id=None + one of these) +REASON_REJECTED_UNKNOWN_AGENT = "unknown_agent" +REASON_REJECTED_SLASH_TOO_LONG = "slash_too_long" +REASON_REJECTED_NO_MAPPING = "no_mapping" + +# Hard guards +_DEFAULT_MAX_AGENTS_PER_ROOM = 5 +_DEFAULT_MAX_SLASH_LEN = 32 + # ── Data structures ──────────────────────────────────────────────────────────── @@ -94,6 +103,7 @@ def parse_mixed_room_map( raw_map: str, raw_defaults: str, allowed_agents: FrozenSet[str], + max_agents_per_room: int = _DEFAULT_MAX_AGENTS_PER_ROOM, ) -> MixedRoomConfig: """ Parse BRIDGE_MIXED_ROOM_MAP and BRIDGE_MIXED_DEFAULTS into MixedRoomConfig. @@ -104,6 +114,7 @@ def parse_mixed_room_map( Raises ValueError on: - Malformed room_id - Empty agent list + - Too many agents per room (> max_agents_per_room) - Agent not in allowed_agents - Duplicate room_id in map """ @@ -154,6 +165,13 @@ def parse_mixed_room_map( errors.append(f"Empty agent list for room {room_id!r}") continue + # M2.2 guard: fail-fast if too many agents per room + if len(agents) > max_agents_per_room: + errors.append( + f"Room {room_id!r} has {len(agents)} agents > MAX_AGENTS_PER_MIXED_ROOM={max_agents_per_room}" + ) + continue + invalid = [a for a in agents if a not in allowed_agents] if invalid: errors.append( @@ -189,6 +207,7 @@ def route_message( room_id: str, config: MixedRoomConfig, allowed_agents: FrozenSet[str], + max_slash_len: int = _DEFAULT_MAX_SLASH_LEN, ) -> Tuple[Optional[str], str, str]: """ Determine which agent should handle this message. @@ -196,8 +215,8 @@ def route_message( Returns: (agent_id, routing_reason, effective_text) - agent_id: matched agent or None if unresolvable - routing_reason: one of REASON_* constants + agent_id: matched agent or None if unresolvable / rejected + routing_reason: one of REASON_* or REASON_REJECTED_* constants effective_text: text with routing prefix stripped (for cleaner invoke) Priority: @@ -205,10 +224,14 @@ def route_message( 2. @agentname ... (at-mention) 3. agentname: ... (colon-mention) 4. default agent for room (fallback) + + Guard rails (M2.2): + - Slash command token longer than max_slash_len → REASON_REJECTED_SLASH_TOO_LONG + - Unknown agent in slash → REASON_REJECTED_UNKNOWN_AGENT (no fallthrough) """ room = config.rooms.get(room_id) if room is None: - return None, "no_mapping", text + return None, REASON_REJECTED_NO_MAPPING, text stripped = text.strip() @@ -217,16 +240,25 @@ def route_message( if m: candidate = m.group(1).lower() body = m.group(2).strip() or stripped # keep original if body empty + + # M2.2: reject suspiciously long slash tokens (garbage / injection attempts) + if len(candidate) > max_slash_len: + logger.warning( + "Slash token too long (%d > %d) in room %s — rejected", + len(candidate), max_slash_len, room_id, + ) + return None, REASON_REJECTED_SLASH_TOO_LONG, text + agent = _resolve_agent(candidate, room, allowed_agents) if agent: logger.debug("Slash route: /%s → %s", candidate, agent) return agent, REASON_SLASH, body - # Unknown agent → return None + log; do not fall through to default + # Unknown agent → hard reject, do NOT fall through to default logger.warning( - "Slash command /%s in room %s: agent not recognised or not allowed", - candidate, room_id, + "Slash command /%s in room %s: agent not recognised or not allowed (available: %s)", + candidate, room_id, room.agents, ) - return None, f"unknown_slash_{candidate}", text + return None, REASON_REJECTED_UNKNOWN_AGENT, text # 2. @mention: @sofiia hello m = _MENTION_AT_RE.match(stripped) diff --git a/tests/test_matrix_bridge_mixed_ingress.py b/tests/test_matrix_bridge_mixed_ingress.py index b558a762..045be6c6 100644 --- a/tests/test_matrix_bridge_mixed_ingress.py +++ b/tests/test_matrix_bridge_mixed_ingress.py @@ -308,6 +308,158 @@ def test_rate_limited_mixed_room_event_dropped(): assert len(dropped) == 2 # two dropped by rate limiter +def test_on_route_rejected_callback_fires(): + """on_route_rejected fires when /unknown slash is used in mixed room.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + rejected_calls: List[tuple] = [] + ingress._on_route_rejected = lambda room, reason: rejected_calls.append((room, reason)) + + client = _fake_client({ROOM_MIXED: [_make_event("/unknownbot hello", event_id="r1")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 0 + assert len(rejected_calls) == 1 + room, reason = rejected_calls[0] + assert room == ROOM_MIXED + assert "unknown" in reason + + +def test_on_routed_callback_fires_with_reason(): + """on_routed fires with correct agent_id and routing_reason on successful route.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + routed_calls: List[tuple] = [] + ingress._on_routed = lambda agent, reason: routed_calls.append((agent, reason)) + + client = _fake_client({ROOM_MIXED: [_make_event("/helion hello", event_id="rt1")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert len(routed_calls) == 1 + agent, reason = routed_calls[0] + assert agent == "helion" + assert reason == "slash_command" + + +def test_unknown_agent_reply_error_sends_message(): + """UNKNOWN_AGENT_BEHAVIOR=reply_error → error message sent to room.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + ingress._unknown_agent_behavior = "reply_error" + + sent_texts: List[str] = [] + mock_client = _fake_client({ROOM_MIXED: [_make_event("/druid hello", event_id="ue1")]}) + mock_client.send_text = AsyncMock(side_effect=lambda room, text, txn_id=None: sent_texts.append(text)) + + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(mock_client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 0 # not enqueued + assert len(sent_texts) == 1 + assert "Unknown agent" in sent_texts[0] + assert "sofiia" in sent_texts[0] or "helion" in sent_texts[0] + + +def test_unknown_agent_ignore_sends_nothing(): + """UNKNOWN_AGENT_BEHAVIOR=ignore (default) → no reply sent.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + ingress._unknown_agent_behavior = "ignore" + + sent_texts: List[str] = [] + mock_client = _fake_client({ROOM_MIXED: [_make_event("/druid hello", event_id="ui1")]}) + mock_client.send_text = AsyncMock(side_effect=lambda room, text, txn_id=None: sent_texts.append(text)) + + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(mock_client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 0 + assert len(sent_texts) == 0 # silent + + +def test_concurrency_cap_active_lock_count(): + """active_lock_count returns 1 while semaphore is held.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + ingress._mixed_concurrency_cap = 1 + + async def _run(): + lock = ingress._get_concurrency_lock(ROOM_MIXED, "sofiia") + assert ingress.active_lock_count == 0 + await lock.acquire() + assert ingress.active_lock_count == 1 + lock.release() + assert ingress.active_lock_count == 0 + + run(_run()) + + +def test_slash_too_long_rejected_and_not_enqueued(): + """Slash token longer than max_slash_len → rejected, not enqueued.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + ingress._max_slash_len = 4 # very short for test + + client = _fake_client({ROOM_MIXED: [_make_event("/toolongtoken hello", event_id="tl1")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + rejected_calls: List[str] = [] + ingress._on_route_rejected = lambda room, reason: rejected_calls.append(reason) + + async def _run(): + with patch("app.ingress._write_audit", new=AsyncMock()): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert queue.qsize() == 0 + assert len(rejected_calls) == 1 + assert rejected_calls[0] == "slash_too_long" + + +def test_route_rejected_audit_event_written(): + """On routing rejection, matrix.route.rejected audit event must be written.""" + ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion") + + audit_events: List[str] = [] + + async def fake_audit(*args, **kwargs): + audit_events.append(kwargs.get("event", "")) + + client = _fake_client({ROOM_MIXED: [_make_event("/unknownbot test", event_id="ra1")]}) + queue: asyncio.Queue = asyncio.Queue(maxsize=50) + ingress._queue = queue + + async def _run(): + with patch("app.ingress._write_audit", side_effect=fake_audit): + await ingress._enqueue_from_sync(client, queue, AsyncMock(), {}) + + run(_run()) + + assert "matrix.route.rejected" in audit_events + + def test_direct_room_reply_has_no_prefix(): """Reply in single-agent (direct) room must NOT have a prefix.""" ingress = _make_ingress(direct_raw=f"druid:{ROOM_DIRECT}", allowed=frozenset({"druid"})) diff --git a/tests/test_matrix_bridge_mixed_routing.py b/tests/test_matrix_bridge_mixed_routing.py index 9bf3888f..dcab6a42 100644 --- a/tests/test_matrix_bridge_mixed_routing.py +++ b/tests/test_matrix_bridge_mixed_routing.py @@ -26,6 +26,9 @@ from app.mixed_routing import ( # noqa: E402 REASON_AT_MENTION, REASON_COLON_MENTION, REASON_DEFAULT, + REASON_REJECTED_UNKNOWN_AGENT, + REASON_REJECTED_SLASH_TOO_LONG, + REASON_REJECTED_NO_MAPPING, ) ROOM_X = "!roomX:daarion.space" @@ -146,7 +149,7 @@ def test_slash_unknown_agent_returns_none(): cfg = _make_cfg() agent, reason, _ = route_message("/druid hello", ROOM_X, cfg, frozenset({"sofiia", "helion"})) assert agent is None - assert "unknown_slash_druid" in reason + assert reason == REASON_REJECTED_UNKNOWN_AGENT # ── Routing — @mention ──────────────────────────────────────────────────────── @@ -225,3 +228,70 @@ def test_reply_prefix_single_room_empty(): def test_reply_prefix_capitalises_first_letter(): assert reply_prefix("druid", is_mixed=True) == "Druid: " assert reply_prefix("NUTRA", is_mixed=True) == "Nutra: " # capitalize() normalises case + + +# ── M2.2: Guard rails ───────────────────────────────────────────────────────── + +def test_max_agents_per_room_raises(): + """More agents than max → ValueError at parse time.""" + raw = f"{ROOM_X}=sofiia,helion,druid,nutra,alateya,yaromir" # 6 agents + allowed_6 = frozenset({"sofiia", "helion", "druid", "nutra", "alateya", "yaromir"}) + with pytest.raises(ValueError, match="MAX_AGENTS_PER_MIXED_ROOM"): + parse_mixed_room_map(raw, "", allowed_6, max_agents_per_room=5) + + +def test_max_agents_per_room_exactly_at_limit_ok(): + """Exactly at limit should succeed.""" + raw = f"{ROOM_X}=sofiia,helion,druid,nutra,alateya" # 5 = default limit + allowed_5 = frozenset({"sofiia", "helion", "druid", "nutra", "alateya"}) + cfg = parse_mixed_room_map(raw, "", allowed_5, max_agents_per_room=5) + assert len(cfg.agents_for_room(ROOM_X)) == 5 + + +def test_slash_too_long_returns_rejected_reason(): + """Slash command token longer than max_slash_len → rejection, no fallthrough.""" + cfg = _make_cfg() + long_token = "a" * 33 # > default 32 + agent, reason, _ = route_message( + f"/{long_token} hello", ROOM_X, cfg, frozenset({"sofiia", "helion"}), + max_slash_len=32, + ) + assert agent is None + assert reason == REASON_REJECTED_SLASH_TOO_LONG + + +def test_slash_exactly_at_max_len_ok(): + """Slash token exactly at max_slash_len should NOT be rejected.""" + allowed = frozenset({"sofiia", "helion"}) + raw = f"{ROOM_X}=sofiia,helion" + # Create a 10-char agent name (within limit) — use a mock allowed set + cfg = parse_mixed_room_map(raw, "", allowed, max_agents_per_room=5) + agent, reason, _ = route_message("/sofiia hi", ROOM_X, cfg, allowed, max_slash_len=32) + assert agent == "sofiia" + assert reason == REASON_SLASH + + +def test_unknown_slash_returns_rejected_unknown_agent(): + """Slash with valid-length token but unknown agent → REASON_REJECTED_UNKNOWN_AGENT.""" + cfg = _make_cfg() + agent, reason, _ = route_message( + "/druid hello", ROOM_X, cfg, frozenset({"sofiia", "helion"}), + max_slash_len=32, + ) + assert agent is None + assert reason == REASON_REJECTED_UNKNOWN_AGENT + + +def test_no_mapping_returns_rejected_no_mapping(): + """Room not in config → REASON_REJECTED_NO_MAPPING.""" + cfg = _make_cfg(room_id=ROOM_X) + agent, reason, _ = route_message("hello", ROOM_Y, cfg, ALLOWED, max_slash_len=32) + assert agent is None + assert reason == REASON_REJECTED_NO_MAPPING + + +def test_rejection_reasons_are_distinct_constants(): + """All rejection reason strings must differ from success reasons.""" + success = {REASON_SLASH, REASON_AT_MENTION, REASON_COLON_MENTION, REASON_DEFAULT} + rejected = {REASON_REJECTED_UNKNOWN_AGENT, REASON_REJECTED_SLASH_TOO_LONG, REASON_REJECTED_NO_MAPPING} + assert not success.intersection(rejected), "Rejection reasons must not overlap with success reasons"