""" Tests for services/matrix-bridge-dagi/app/control.py and M3.0 ingress control handling. Covers: - parse_control_config: valid, invalid user_ids, invalid room_ids - ControlConfig.is_enabled: empty sets - is_control_message / is_control_room / is_operator - parse_command: valid, too long, too many tokens, no subcommand - ControlCommand.from_tokens: verb, subcommand, positional, kwargs - check_authorization: ok, not_operator, not_control_room - Reply helpers: not_implemented, unknown_command, unauthorized, help - Ingress _try_control: authorized command audited, unauthorized audited, non-cmd ignored - on_control_command callback fires - CONTROL_UNAUTHORIZED_BEHAVIOR=reply_error sends ⛔ message """ import asyncio import sys from pathlib import Path from typing import Any, Dict, List from unittest.mock import AsyncMock, MagicMock, patch import pytest _BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi" if str(_BRIDGE) not in sys.path: sys.path.insert(0, str(_BRIDGE)) from app.control import ( # noqa: E402 ControlConfig, ControlCommand, parse_control_config, parse_command, is_control_message, is_control_room, is_operator, check_authorization, not_implemented_reply, unknown_command_reply, unauthorized_reply, help_reply, VERB_RUNBOOK, VERB_HELP, KNOWN_VERBS, ) from app.ingress import MatrixIngressLoop # noqa: E402 from app.room_mapping import parse_room_map # noqa: E402 # ── Constants ──────────────────────────────────────────────────────────────── CTRL_ROOM = "!opsroom:daarion.space" CTRL_ROOM2 = "!opsroom2:daarion.space" OP1 = "@ivan:daarion.space" OP2 = "@sergiy:daarion.space" NON_OP = "@stranger:matrix.org" ALLOWED_AGENTS = frozenset({"sofiia"}) def run(coro): return asyncio.run(coro) def _make_event(body: str, event_id: str = "e1", sender: str = OP1) -> Dict[str, Any]: return { "event_id": event_id, "sender": sender, "type": "m.room.message", "content": {"msgtype": "m.text", "body": body}, } def _make_ctrl_config( allowlist: str = f"{OP1},{OP2}", rooms: str = CTRL_ROOM, ) -> ControlConfig: return parse_control_config(allowlist, rooms) def _make_ingress(ctrl_config=None) -> MatrixIngressLoop: room_map = parse_room_map("", ALLOWED_AGENTS) return MatrixIngressLoop( matrix_homeserver_url="https://matrix.test", matrix_access_token="tok", matrix_user_id="@bridge:test", router_url="http://router:8000", node_id="test", room_map=room_map, control_config=ctrl_config or _make_ctrl_config(), queue_max_events=10, worker_concurrency=1, ) # ── parse_control_config ───────────────────────────────────────────────────── def test_parse_valid_allowlist_and_rooms(): cfg = parse_control_config(f"{OP1},{OP2}", CTRL_ROOM) assert OP1 in cfg.operator_allowlist assert OP2 in cfg.operator_allowlist assert CTRL_ROOM in cfg.control_rooms def test_parse_multiple_control_rooms(): cfg = parse_control_config(OP1, f"{CTRL_ROOM},{CTRL_ROOM2}") assert len(cfg.control_rooms) == 2 def test_parse_empty_strings_return_empty_config(): cfg = parse_control_config("", "") assert cfg.operator_allowlist == frozenset() assert cfg.control_rooms == frozenset() assert cfg.is_enabled is False def test_parse_invalid_user_id_raises(): with pytest.raises(ValueError, match="Invalid operator user_id"): parse_control_config("not-a-user-id", CTRL_ROOM) def test_parse_invalid_room_id_raises(): with pytest.raises(ValueError, match="Invalid control room_id"): parse_control_config(OP1, "not-a-room-id") def test_parse_whitespace_stripped(): cfg = parse_control_config(f" {OP1} , {OP2} ", f" {CTRL_ROOM} ") assert OP1 in cfg.operator_allowlist assert CTRL_ROOM in cfg.control_rooms def test_is_enabled_false_when_no_operators(): cfg = parse_control_config("", CTRL_ROOM) assert cfg.is_enabled is False def test_is_enabled_false_when_no_rooms(): cfg = parse_control_config(OP1, "") assert cfg.is_enabled is False def test_is_enabled_true_when_both_set(): cfg = _make_ctrl_config() assert cfg.is_enabled is True # ── is_control_message ─────────────────────────────────────────────────────── def test_is_control_message_bang_prefix(): assert is_control_message("!runbook start test.md") is True def test_is_control_message_help(): assert is_control_message("!help") is True def test_is_control_message_no_bang(): assert is_control_message("/sofiia hello") is False assert is_control_message("hello") is False def test_is_control_message_empty(): assert is_control_message("") is False # ── is_operator / is_control_room ──────────────────────────────────────────── def test_is_operator_true(): cfg = _make_ctrl_config() assert is_operator(OP1, cfg) is True def test_is_operator_false(): cfg = _make_ctrl_config() assert is_operator(NON_OP, cfg) is False def test_is_control_room_true(): cfg = _make_ctrl_config() assert is_control_room(CTRL_ROOM, cfg) is True def test_is_control_room_false(): cfg = _make_ctrl_config() assert is_control_room("!other:server", cfg) is False # ── parse_command ───────────────────────────────────────────────────────────── def test_parse_runbook_start(): cmd = parse_command("!runbook start path/to/runbook.md node=NODA1") assert cmd is not None assert cmd.verb == VERB_RUNBOOK assert cmd.subcommand == "start" assert "path/to/runbook.md" in cmd.args assert cmd.kwargs.get("node") == "NODA1" assert cmd.is_known is True def test_parse_runbook_next(): cmd = parse_command("!runbook next run-abc123") assert cmd is not None assert cmd.subcommand == "next" assert "run-abc123" in cmd.args def test_parse_help(): cmd = parse_command("!help") assert cmd is not None assert cmd.verb == VERB_HELP assert cmd.subcommand == "" assert cmd.is_known is True def test_parse_unknown_verb(): cmd = parse_command("!frobnicate do-something") assert cmd is not None assert cmd.is_known is False def test_parse_too_long_returns_none(): long_msg = "!" + "x" * 600 assert parse_command(long_msg) is None def test_parse_non_command_returns_none(): assert parse_command("regular text") is None assert parse_command("/slash command") is None def test_parse_only_bang_returns_none(): assert parse_command("!") is None assert parse_command("! ") is None def test_parse_kwargs_extracted(): cmd = parse_command("!runbook complete run-1 step=3 status=ok notes=done") assert cmd is not None assert cmd.kwargs["step"] == "3" assert cmd.kwargs["status"] == "ok" assert cmd.kwargs["notes"] == "done" # ── check_authorization ─────────────────────────────────────────────────────── def test_check_authorization_ok(): cfg = _make_ctrl_config() auth, reason = check_authorization(OP1, CTRL_ROOM, cfg) assert auth is True assert reason == "ok" def test_check_authorization_not_operator(): cfg = _make_ctrl_config() auth, reason = check_authorization(NON_OP, CTRL_ROOM, cfg) assert auth is False assert reason == "not_operator" def test_check_authorization_not_control_room(): cfg = _make_ctrl_config() auth, reason = check_authorization(OP1, "!wrong:server", cfg) assert auth is False assert reason == "not_control_room" # ── Reply helpers ───────────────────────────────────────────────────────────── def test_not_implemented_reply_contains_verb(): cmd = parse_command("!runbook start test.md") reply = not_implemented_reply(cmd) assert "runbook" in reply assert "M3.1" in reply def test_unknown_command_reply_contains_known_verbs(): cmd = parse_command("!frobnicate") reply = unknown_command_reply(cmd) assert "runbook" in reply or "status" in reply or "help" in reply def test_unauthorized_reply_not_operator(): assert "operator allowlist" in unauthorized_reply("not_operator") def test_help_reply_lists_commands(): reply = help_reply() assert "!runbook" in reply assert "!status" in reply assert "!help" in reply # ── Ingress _try_control integration ───────────────────────────────────────── def test_authorized_command_audited(): """Authorised !runbook start → audit matrix.control.command written.""" ingress = _make_ingress() audit_events: List[str] = [] async def fake_audit(*args, **kwargs): audit_events.append(kwargs.get("event", "")) fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock() event = _make_event("!runbook start test.md node=NODA1", event_id="ctrl1", sender=OP1) async def _run(): with patch("app.ingress._write_audit", side_effect=fake_audit): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) assert "matrix.control.command" in audit_events fake_client.send_text.assert_called_once() # Reply should mention pending implementation sent_text = fake_client.send_text.call_args[0][1] assert "runbook" in sent_text.lower() def test_unauthorized_command_audited_silent_by_default(): """Non-operator !command → audit unauthorized, no reply (default ignore).""" ingress = _make_ingress() audit_events: List[str] = [] async def fake_audit(*args, **kwargs): audit_events.append(kwargs.get("event", "")) fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock() event = _make_event("!runbook start test.md", event_id="ctrl2", sender=NON_OP) async def _run(): with patch("app.ingress._write_audit", side_effect=fake_audit): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) assert "matrix.control.unauthorized" in audit_events fake_client.send_text.assert_not_called() # silent def test_unauthorized_reply_error_sends_message(): """CONTROL_UNAUTHORIZED_BEHAVIOR=reply_error → ⛔ message sent.""" ingress = _make_ingress() ingress._control_unauthorized_behavior = "reply_error" sent_texts: List[str] = [] fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock( side_effect=lambda room, text, txn_id=None: sent_texts.append(text) ) event = _make_event("!runbook start test.md", event_id="ctrl3", sender=NON_OP) async def _run(): with patch("app.ingress._write_audit", new=AsyncMock()): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) assert len(sent_texts) == 1 assert "Not authorised" in sent_texts[0] or "⛔" in sent_texts[0] def test_non_command_message_ignored(): """Regular text in control room (no '!') → silently ignored.""" ingress = _make_ingress() fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock() event = _make_event("just chatting in ops room", event_id="ctrl4", sender=OP1) async def _run(): with patch("app.ingress._write_audit", new=AsyncMock()): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) fake_client.mark_seen.assert_not_called() fake_client.send_text.assert_not_called() def test_on_control_command_callback_fires(): """on_control_command callback receives (sender, verb, subcommand).""" ingress = _make_ingress() ctrl_calls: List[tuple] = [] ingress._on_control_command = lambda s, v, sc: ctrl_calls.append((s, v, sc)) fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock() event = _make_event("!runbook next run-abc123", event_id="ctrl5", sender=OP1) async def _run(): with patch("app.ingress._write_audit", new=AsyncMock()): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) assert len(ctrl_calls) == 1 sender, verb, subcommand = ctrl_calls[0] assert sender == OP1 assert verb == "runbook" assert subcommand == "next" def test_help_command_returns_help_text(): """!help → help_reply() sent.""" ingress = _make_ingress() sent_texts: List[str] = [] fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock( side_effect=lambda room, text, txn_id=None: sent_texts.append(text) ) event = _make_event("!help", event_id="ctrl6", sender=OP2) async def _run(): with patch("app.ingress._write_audit", new=AsyncMock()): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) assert len(sent_texts) == 1 assert "!runbook" in sent_texts[0] def test_unknown_command_verb_audited_and_replied(): """Unknown verb → matrix.control.unknown_cmd audited + reply.""" ingress = _make_ingress() audit_events: List[str] = [] async def fake_audit(*args, **kwargs): audit_events.append(kwargs.get("event", "")) sent_texts: List[str] = [] fake_client = MagicMock() fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock( side_effect=lambda room, text, txn_id=None: sent_texts.append(text) ) event = _make_event("!frobnicate do-stuff", event_id="ctrl7", sender=OP1) async def _run(): with patch("app.ingress._write_audit", side_effect=fake_audit): await ingress._try_control(fake_client, AsyncMock(), event, CTRL_ROOM) run(_run()) assert "matrix.control.unknown_cmd" in audit_events assert len(sent_texts) == 1 assert "Unknown command" in sent_texts[0] or "unknown" in sent_texts[0].lower() def test_control_room_events_not_forwarded_to_agents(): """Messages from control room must NOT be enqueued as agent messages.""" ingress = _make_ingress() queue: asyncio.Queue = asyncio.Queue(maxsize=10) ingress._queue = queue def extract(sync_resp, room_id): if room_id == CTRL_ROOM: return [_make_event("!runbook start test.md", event_id="fwd1", sender=OP1)] return [] fake_client = MagicMock() fake_client.extract_room_messages.side_effect = extract fake_client.mark_seen = MagicMock() fake_client.send_text = AsyncMock() async def _run(): with patch("app.ingress._write_audit", new=AsyncMock()): await ingress._enqueue_from_sync(fake_client, queue, AsyncMock(), {}) run(_run()) assert queue.qsize() == 0 # control commands never enqueued as agent work