Files
microdao-daarion/tests/test_matrix_bridge_rate_limit.py
Apple a4e95482bc feat(matrix-bridge-dagi): add rate limiting (H1) and metrics (H3)
H1 — InMemoryRateLimiter (sliding window, no Redis):
  - Per-room: RATE_LIMIT_ROOM_RPM (default 20/min)
  - Per-sender: RATE_LIMIT_SENDER_RPM (default 10/min)
  - Room checked before sender — sender quota not charged on room block
  - Blocked messages: audit matrix.rate_limited + on_rate_limited callback
  - reset() for ops/test, stats() exposed in /health

H3 — Extended Prometheus metrics:
  - matrix_bridge_rate_limited_total{room_id,agent_id,limit_type}
  - matrix_bridge_send_duration_seconds histogram (invoke was already there)
  - matrix_bridge_invoke_duration_seconds buckets tuned for LLM latency
  - matrix_bridge_rate_limiter_active_rooms/senders gauges
  - on_invoke_latency + on_send_latency callbacks wired in ingress loop

16 new tests: rate limiter unit (13) + ingress integration (3)
Total: 65 passed

Made-with: Cursor
2026-03-05 00:54:14 -08:00

170 lines
4.9 KiB
Python

"""
Tests for services/matrix-bridge-dagi/app/rate_limit.py (H1)
Coverage:
- basic allow / room limit / sender limit
- independent room and sender counters
- sliding window prune (old events don't block)
- reset() clears buckets
- stats() reflects live state
- constructor validation
"""
import sys
import time
from pathlib import Path
_BRIDGE = Path(__file__).parent.parent / "services" / "matrix-bridge-dagi"
if str(_BRIDGE) not in sys.path:
sys.path.insert(0, str(_BRIDGE))
from app.rate_limit import InMemoryRateLimiter # noqa: E402
ROOM = "!room1:server"
ROOM2 = "!room2:server"
SENDER = "@alice:server"
SENDER2 = "@bob:server"
def test_allows_first_message():
rl = InMemoryRateLimiter(room_rpm=5, sender_rpm=5)
allowed, limit_type = rl.check(ROOM, SENDER)
assert allowed is True
assert limit_type is None
def test_room_limit_blocks_at_threshold():
rl = InMemoryRateLimiter(room_rpm=3, sender_rpm=100)
for _ in range(3):
allowed, _ = rl.check(ROOM, SENDER)
assert allowed
# 4th from same room (different sender) should be blocked
allowed, limit_type = rl.check(ROOM, SENDER2)
assert allowed is False
assert limit_type == "room"
def test_sender_limit_blocks_at_threshold():
rl = InMemoryRateLimiter(room_rpm=100, sender_rpm=2)
allowed, _ = rl.check(ROOM, SENDER)
assert allowed
allowed, _ = rl.check(ROOM2, SENDER)
assert allowed
# 3rd from same sender (different room)
allowed, limit_type = rl.check("!room3:server", SENDER)
assert allowed is False
assert limit_type == "sender"
def test_room_checked_before_sender():
"""When both would exceed, 'room' is reported first."""
rl = InMemoryRateLimiter(room_rpm=1, sender_rpm=1)
rl.check(ROOM, SENDER) # fills both
allowed, limit_type = rl.check(ROOM, SENDER)
assert not allowed
assert limit_type == "room"
def test_independent_rooms_dont_interfere():
rl = InMemoryRateLimiter(room_rpm=2, sender_rpm=100)
rl.check(ROOM, SENDER)
rl.check(ROOM, SENDER)
# room1 full — room2 still ok
allowed, limit_type = rl.check(ROOM2, SENDER)
assert allowed is True
def test_independent_senders_dont_interfere():
rl = InMemoryRateLimiter(room_rpm=100, sender_rpm=1)
rl.check(ROOM, SENDER)
# alice full — bob still ok
allowed, _ = rl.check(ROOM, SENDER2)
assert allowed is True
def test_window_prune_allows_after_expiry(monkeypatch):
"""Events older than 60s should not count against the limit."""
rl = InMemoryRateLimiter(room_rpm=2, sender_rpm=100)
# Fill the room bucket
rl.check(ROOM, SENDER)
rl.check(ROOM, SENDER)
# Verify blocked
ok, lt = rl.check(ROOM, SENDER2)
assert not ok and lt == "room"
# Fast-forward time by 61 seconds
original_time = time.monotonic
start = original_time()
monkeypatch.setattr(time, "monotonic", lambda: start + 61.0)
# Should be allowed again
allowed, _ = rl.check(ROOM, SENDER2)
assert allowed is True
def test_reset_room_clears_bucket():
rl = InMemoryRateLimiter(room_rpm=1, sender_rpm=100)
rl.check(ROOM, SENDER)
ok, lt = rl.check(ROOM, SENDER2)
assert not ok and lt == "room"
rl.reset(room_id=ROOM)
ok, _ = rl.check(ROOM, SENDER2)
assert ok is True
def test_reset_sender_clears_bucket():
rl = InMemoryRateLimiter(room_rpm=100, sender_rpm=1)
rl.check(ROOM, SENDER)
ok, lt = rl.check(ROOM2, SENDER)
assert not ok and lt == "sender"
rl.reset(sender=SENDER)
ok, _ = rl.check(ROOM2, SENDER)
assert ok is True
def test_stats_reflects_active_buckets():
rl = InMemoryRateLimiter(room_rpm=10, sender_rpm=10)
rl.check(ROOM, SENDER)
rl.check(ROOM2, SENDER2)
s = rl.stats()
assert s["active_rooms"] == 2
assert s["active_senders"] == 2
assert s["room_rpm_limit"] == 10
assert s["sender_rpm_limit"] == 10
def test_stats_stale_buckets_not_counted(monkeypatch):
rl = InMemoryRateLimiter(room_rpm=10, sender_rpm=10)
rl.check(ROOM, SENDER)
original_time = time.monotonic
start = original_time()
monkeypatch.setattr(time, "monotonic", lambda: start + 61.0)
s = rl.stats()
assert s["active_rooms"] == 0
assert s["active_senders"] == 0
def test_constructor_validates_limits():
import pytest
with pytest.raises(ValueError):
InMemoryRateLimiter(room_rpm=0, sender_rpm=5)
with pytest.raises(ValueError):
InMemoryRateLimiter(room_rpm=5, sender_rpm=-1)
def test_sender_bucket_not_charged_when_room_blocked():
"""When room blocks, sender quota must not decrease."""
rl = InMemoryRateLimiter(room_rpm=1, sender_rpm=2)
rl.check(ROOM, SENDER) # fills room (1/1), sender (1/2)
# room blocked — sender should NOT be decremented
rl.check(ROOM, SENDER) # blocked by room
rl.check(ROOM, SENDER) # blocked by room
# Sender still has 1 slot left in a fresh room
ok, lt = rl.check(ROOM2, SENDER)
assert ok is True # sender only used 1/2 of its quota