Guard rails (mixed_routing.py):
- MAX_AGENTS_PER_MIXED_ROOM (default 5): fail-fast at parse time
- MAX_SLASH_LEN (default 32): reject garbage/injection slash tokens
- Unified rejection reasons: unknown_agent, slash_too_long, no_mapping
- REASON_REJECTED_* constants (separate from success REASON_*)
Ingress (ingress.py):
- per-room-agent concurrency semaphore (MIXED_CONCURRENCY_CAP, default 1)
- active_lock_count property for /health + prometheus
- UNKNOWN_AGENT_BEHAVIOR: "ignore" (silent) | "reply_error" (inform user)
- on_routed(agent_id, reason) callback for routing metrics
- on_route_rejected(room_id, reason) callback for rejection metrics
- matrix.route.rejected audit event on every rejection
Config + main:
- max_agents_per_mixed_room, max_slash_len, unknown_agent_behavior, mixed_concurrency_cap
- matrix_bridge_routed_total{agent_id, reason} counter
- matrix_bridge_route_rejected_total{room_id, reason} counter
- matrix_bridge_active_room_agent_locks gauge
- /health: mixed_guard_rails section + total_agents_in_mixed_rooms
- docker-compose: all 4 new guard rail env vars
Runbook: section 9 — mixed room debug guide (6 acceptance tests, routing metrics, session isolation, lock hang, config guard)
Tests: 108 pass (94 → 108, +14 new tests for guard rails + callbacks + concurrency)
Made-with: Cursor
491 lines
18 KiB
Python
491 lines
18 KiB
Python
"""
|
|
Tests for mixed-room routing in MatrixIngressLoop (M2.1).
|
|
|
|
Covers:
|
|
- Slash command routes to correct agent in mixed room
|
|
- @mention routes to correct agent in mixed room
|
|
- Default fallback routes to first agent
|
|
- Unknown /slash returns no invoke + audit error
|
|
- Reply is prefixed with agent name in mixed room
|
|
- Session isolation: different agents get different session_ids
|
|
- Multi-room: regular room and mixed room coexist correctly
|
|
- Rate-limited message in mixed room is dropped
|
|
- Direct (single-agent) room reply has no prefix
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi"
|
|
if str(_BRIDGE) not in sys.path:
|
|
sys.path.insert(0, str(_BRIDGE))
|
|
|
|
from app.ingress import MatrixIngressLoop, _QueueEntry # noqa: E402
|
|
from app.mixed_routing import parse_mixed_room_map # noqa: E402
|
|
from app.room_mapping import parse_room_map # noqa: E402
|
|
|
|
# ── Constants ────────────────────────────────────────────────────────────────
|
|
|
|
ROOM_MIXED = "!mixedRoom:daarion.space"
|
|
ROOM_DIRECT = "!directRoom:daarion.space"
|
|
ALLOWED = frozenset({"sofiia", "helion", "druid"})
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
def _make_event(body: str, event_id: str = "evt1", sender: str = "@user:test") -> Dict[str, Any]:
|
|
return {
|
|
"event_id": event_id,
|
|
"sender": sender,
|
|
"type": "m.room.message",
|
|
"content": {"msgtype": "m.text", "body": body},
|
|
}
|
|
|
|
|
|
def _make_ingress(
|
|
mixed_raw: str = "",
|
|
direct_raw: str = "",
|
|
allowed: frozenset = ALLOWED,
|
|
) -> MatrixIngressLoop:
|
|
room_map = parse_room_map(direct_raw, allowed) if direct_raw else parse_room_map("", allowed)
|
|
mixed_cfg = parse_mixed_room_map(mixed_raw, "", allowed) if mixed_raw else None
|
|
return MatrixIngressLoop(
|
|
matrix_homeserver_url="https://matrix.test",
|
|
matrix_access_token="tok_test",
|
|
matrix_user_id="@bridge:test",
|
|
router_url="http://router:8000",
|
|
node_id="test_node",
|
|
room_map=room_map,
|
|
mixed_room_config=mixed_cfg,
|
|
queue_max_events=50,
|
|
worker_concurrency=1,
|
|
)
|
|
|
|
|
|
def _fake_client(room_events: Dict[str, List[Dict[str, Any]]]) -> MagicMock:
|
|
"""Return a mock MatrixClient that yields pre-set events per room."""
|
|
c = MagicMock()
|
|
c.extract_room_messages.side_effect = lambda sync_resp, room_id: room_events.get(room_id, [])
|
|
c.mark_seen = MagicMock()
|
|
c.send_text = AsyncMock(return_value=None)
|
|
return c
|
|
|
|
|
|
# ── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
def test_slash_command_routes_to_helion():
|
|
"""/helion in mixed room → agent=helion, body stripped."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
client = _fake_client({ROOM_MIXED: [_make_event("/helion what is the weather?", event_id="e1")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 1
|
|
entry: _QueueEntry = queue.get_nowait()
|
|
assert entry.agent_id == "helion"
|
|
assert entry.is_mixed is True
|
|
assert entry.routing_reason == "slash_command"
|
|
assert entry.event["content"]["body"] == "what is the weather?"
|
|
|
|
|
|
def test_at_mention_routes_to_sofiia():
|
|
"""@sofiia in mixed room → agent=sofiia."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
client = _fake_client({ROOM_MIXED: [_make_event("@sofiia check status", event_id="e2")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
entry: _QueueEntry = queue.get_nowait()
|
|
assert entry.agent_id == "sofiia"
|
|
assert entry.routing_reason == "at_mention"
|
|
|
|
|
|
def test_colon_mention_routes_to_sofiia():
|
|
"""'sofiia: help' in mixed room → agent=sofiia."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
client = _fake_client({ROOM_MIXED: [_make_event("sofiia: can you help?", event_id="e3")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
entry: _QueueEntry = queue.get_nowait()
|
|
assert entry.agent_id == "sofiia"
|
|
assert entry.routing_reason == "colon_mention"
|
|
|
|
|
|
def test_default_fallback_routes_to_first_agent():
|
|
"""Plain text with no routing token → default (first in list = helion)."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=helion,sofiia")
|
|
client = _fake_client({ROOM_MIXED: [_make_event("plain message", event_id="e4")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
entry: _QueueEntry = queue.get_nowait()
|
|
assert entry.agent_id == "helion"
|
|
assert entry.routing_reason == "default"
|
|
|
|
|
|
def test_unknown_slash_not_enqueued_and_audited():
|
|
"""/unknownbot in mixed room → NOT enqueued, audit error written."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
client = _fake_client({ROOM_MIXED: [_make_event("/unknownbot hello", event_id="e5")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
audit_calls: List[str] = []
|
|
|
|
async def fake_audit(*args, **kwargs):
|
|
audit_calls.append(kwargs.get("event", ""))
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", side_effect=fake_audit):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 0
|
|
assert len(audit_calls) >= 1
|
|
assert any("matrix" in e for e in audit_calls)
|
|
|
|
|
|
def test_reply_prefixed_with_agent_name_in_mixed_room():
|
|
"""Reply in mixed room must start with 'Helion: '."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
sent_texts: List[str] = []
|
|
|
|
async def fake_send(room_id, text, txn_id=None):
|
|
sent_texts.append(text)
|
|
|
|
async def fake_invoke(http_client, router_url, agent_id, node_id, prompt, session_id):
|
|
return "The weather is sunny"
|
|
|
|
entry = _QueueEntry(
|
|
event=_make_event("/helion weather", event_id="e6"),
|
|
room_id=ROOM_MIXED,
|
|
agent_id="helion",
|
|
enqueue_time=0.0,
|
|
routing_reason="slash_command",
|
|
is_mixed=True,
|
|
)
|
|
fake_client = MagicMock()
|
|
fake_client.send_text = AsyncMock(side_effect=fake_send)
|
|
|
|
async def _run():
|
|
with patch("app.ingress._invoke_router", side_effect=fake_invoke), \
|
|
patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._process_entry(fake_client, AsyncMock(), entry)
|
|
|
|
run(_run())
|
|
|
|
assert len(sent_texts) == 1
|
|
assert sent_texts[0].startswith("Helion: ")
|
|
assert "The weather is sunny" in sent_texts[0]
|
|
|
|
|
|
def test_session_isolation_per_agent():
|
|
"""Two agents in same mixed room must get different session_ids."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
sessions: List[str] = []
|
|
|
|
async def fake_invoke(http_client, router_url, agent_id, node_id, prompt, session_id):
|
|
sessions.append(session_id)
|
|
return f"reply from {agent_id}"
|
|
|
|
entries = [
|
|
_QueueEntry(
|
|
event=_make_event("msg", event_id="s1"),
|
|
room_id=ROOM_MIXED, agent_id="sofiia",
|
|
enqueue_time=0.0, routing_reason="default", is_mixed=True,
|
|
),
|
|
_QueueEntry(
|
|
event=_make_event("msg", event_id="h1"),
|
|
room_id=ROOM_MIXED, agent_id="helion",
|
|
enqueue_time=0.0, routing_reason="slash_command", is_mixed=True,
|
|
),
|
|
]
|
|
fake_client = MagicMock()
|
|
fake_client.send_text = AsyncMock()
|
|
|
|
async def _run():
|
|
with patch("app.ingress._invoke_router", side_effect=fake_invoke), \
|
|
patch("app.ingress._write_audit", new=AsyncMock()):
|
|
for e in entries:
|
|
await ingress._process_entry(fake_client, AsyncMock(), e)
|
|
|
|
run(_run())
|
|
|
|
assert len(sessions) == 2
|
|
assert sessions[0] != sessions[1], "Session IDs must differ per agent"
|
|
assert "sofiia" in sessions[0]
|
|
assert "helion" in sessions[1]
|
|
|
|
|
|
def test_direct_room_and_mixed_room_coexist():
|
|
"""Regular direct room and mixed room both processed in same sync."""
|
|
ingress = _make_ingress(
|
|
direct_raw=f"druid:{ROOM_DIRECT}",
|
|
mixed_raw=f"{ROOM_MIXED}=sofiia,helion",
|
|
allowed=frozenset({"sofiia", "helion", "druid"}),
|
|
)
|
|
client = _fake_client({
|
|
ROOM_DIRECT: [_make_event("direct msg", event_id="d1")],
|
|
ROOM_MIXED: [_make_event("/helion mixed msg", event_id="m1")],
|
|
})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 2
|
|
entries_got = [queue.get_nowait() for _ in range(2)]
|
|
agents = {e.agent_id for e in entries_got}
|
|
assert agents == {"druid", "helion"}
|
|
mixed_map = {e.agent_id: e.is_mixed for e in entries_got}
|
|
assert mixed_map["druid"] is False
|
|
assert mixed_map["helion"] is True
|
|
|
|
|
|
def test_rate_limited_mixed_room_event_dropped():
|
|
"""Rate-limited sender in mixed room: only first message passes."""
|
|
from app.rate_limit import InMemoryRateLimiter
|
|
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
ingress._rate_limiter = InMemoryRateLimiter(room_rpm=100, sender_rpm=1)
|
|
|
|
events = [
|
|
_make_event("hello", event_id=f"rl{i}", sender="@spammer:test")
|
|
for i in range(3)
|
|
]
|
|
client = _fake_client({ROOM_MIXED: events})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
dropped: List[str] = []
|
|
ingress._on_rate_limited = lambda room, agent, kind: dropped.append(kind)
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 1 # only first passes
|
|
assert len(dropped) == 2 # two dropped by rate limiter
|
|
|
|
|
|
def test_on_route_rejected_callback_fires():
|
|
"""on_route_rejected fires when /unknown slash is used in mixed room."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
rejected_calls: List[tuple] = []
|
|
ingress._on_route_rejected = lambda room, reason: rejected_calls.append((room, reason))
|
|
|
|
client = _fake_client({ROOM_MIXED: [_make_event("/unknownbot hello", event_id="r1")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 0
|
|
assert len(rejected_calls) == 1
|
|
room, reason = rejected_calls[0]
|
|
assert room == ROOM_MIXED
|
|
assert "unknown" in reason
|
|
|
|
|
|
def test_on_routed_callback_fires_with_reason():
|
|
"""on_routed fires with correct agent_id and routing_reason on successful route."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
routed_calls: List[tuple] = []
|
|
ingress._on_routed = lambda agent, reason: routed_calls.append((agent, reason))
|
|
|
|
client = _fake_client({ROOM_MIXED: [_make_event("/helion hello", event_id="rt1")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert len(routed_calls) == 1
|
|
agent, reason = routed_calls[0]
|
|
assert agent == "helion"
|
|
assert reason == "slash_command"
|
|
|
|
|
|
def test_unknown_agent_reply_error_sends_message():
|
|
"""UNKNOWN_AGENT_BEHAVIOR=reply_error → error message sent to room."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
ingress._unknown_agent_behavior = "reply_error"
|
|
|
|
sent_texts: List[str] = []
|
|
mock_client = _fake_client({ROOM_MIXED: [_make_event("/druid hello", event_id="ue1")]})
|
|
mock_client.send_text = AsyncMock(side_effect=lambda room, text, txn_id=None: sent_texts.append(text))
|
|
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(mock_client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 0 # not enqueued
|
|
assert len(sent_texts) == 1
|
|
assert "Unknown agent" in sent_texts[0]
|
|
assert "sofiia" in sent_texts[0] or "helion" in sent_texts[0]
|
|
|
|
|
|
def test_unknown_agent_ignore_sends_nothing():
|
|
"""UNKNOWN_AGENT_BEHAVIOR=ignore (default) → no reply sent."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
ingress._unknown_agent_behavior = "ignore"
|
|
|
|
sent_texts: List[str] = []
|
|
mock_client = _fake_client({ROOM_MIXED: [_make_event("/druid hello", event_id="ui1")]})
|
|
mock_client.send_text = AsyncMock(side_effect=lambda room, text, txn_id=None: sent_texts.append(text))
|
|
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(mock_client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 0
|
|
assert len(sent_texts) == 0 # silent
|
|
|
|
|
|
def test_concurrency_cap_active_lock_count():
|
|
"""active_lock_count returns 1 while semaphore is held."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
ingress._mixed_concurrency_cap = 1
|
|
|
|
async def _run():
|
|
lock = ingress._get_concurrency_lock(ROOM_MIXED, "sofiia")
|
|
assert ingress.active_lock_count == 0
|
|
await lock.acquire()
|
|
assert ingress.active_lock_count == 1
|
|
lock.release()
|
|
assert ingress.active_lock_count == 0
|
|
|
|
run(_run())
|
|
|
|
|
|
def test_slash_too_long_rejected_and_not_enqueued():
|
|
"""Slash token longer than max_slash_len → rejected, not enqueued."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
ingress._max_slash_len = 4 # very short for test
|
|
|
|
client = _fake_client({ROOM_MIXED: [_make_event("/toolongtoken hello", event_id="tl1")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
rejected_calls: List[str] = []
|
|
ingress._on_route_rejected = lambda room, reason: rejected_calls.append(reason)
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert queue.qsize() == 0
|
|
assert len(rejected_calls) == 1
|
|
assert rejected_calls[0] == "slash_too_long"
|
|
|
|
|
|
def test_route_rejected_audit_event_written():
|
|
"""On routing rejection, matrix.route.rejected audit event must be written."""
|
|
ingress = _make_ingress(mixed_raw=f"{ROOM_MIXED}=sofiia,helion")
|
|
|
|
audit_events: List[str] = []
|
|
|
|
async def fake_audit(*args, **kwargs):
|
|
audit_events.append(kwargs.get("event", ""))
|
|
|
|
client = _fake_client({ROOM_MIXED: [_make_event("/unknownbot test", event_id="ra1")]})
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=50)
|
|
ingress._queue = queue
|
|
|
|
async def _run():
|
|
with patch("app.ingress._write_audit", side_effect=fake_audit):
|
|
await ingress._enqueue_from_sync(client, queue, AsyncMock(), {})
|
|
|
|
run(_run())
|
|
|
|
assert "matrix.route.rejected" in audit_events
|
|
|
|
|
|
def test_direct_room_reply_has_no_prefix():
|
|
"""Reply in single-agent (direct) room must NOT have a prefix."""
|
|
ingress = _make_ingress(direct_raw=f"druid:{ROOM_DIRECT}", allowed=frozenset({"druid"}))
|
|
sent_texts: List[str] = []
|
|
|
|
async def fake_send(room_id, text, txn_id=None):
|
|
sent_texts.append(text)
|
|
|
|
async def fake_invoke(http_client, router_url, agent_id, node_id, prompt, session_id):
|
|
return "direct reply no prefix"
|
|
|
|
entry = _QueueEntry(
|
|
event=_make_event("hello", event_id="dr1"),
|
|
room_id=ROOM_DIRECT, agent_id="druid",
|
|
enqueue_time=0.0, routing_reason="direct", is_mixed=False,
|
|
)
|
|
fake_client = MagicMock()
|
|
fake_client.send_text = AsyncMock(side_effect=fake_send)
|
|
|
|
async def _run():
|
|
with patch("app.ingress._invoke_router", side_effect=fake_invoke), \
|
|
patch("app.ingress._write_audit", new=AsyncMock()):
|
|
await ingress._process_entry(fake_client, AsyncMock(), entry)
|
|
|
|
run(_run())
|
|
|
|
assert len(sent_texts) == 1
|
|
assert sent_texts[0] == "direct reply no prefix"
|