"""Tests for node-worker idempotency store.""" import asyncio import sys import os import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "services", "node-worker")) from models import JobResponse, JobError from idempotency import IdempotencyStore def test_put_and_get(): store = IdempotencyStore() resp = JobResponse(job_id="j1", status="ok", node_id="n1", provider="ollama", model="qwen3:14b") store.put("key1", resp) cached = store.get("key1") assert cached is not None assert cached.status == "ok" assert cached.cached is True assert cached.job_id == "j1" def test_miss(): store = IdempotencyStore() assert store.get("nonexistent") is None def test_ttl_expiry(): store = IdempotencyStore() resp = JobResponse(job_id="j2", status="ok", node_id="n1") store.put("key2", resp) store._cache["key2"] = (resp, time.time() - 1) assert store.get("key2") is None def test_timeout_shorter_ttl(): store = IdempotencyStore() resp = JobResponse(job_id="j3", status="timeout", error=JobError(code="TO")) store.put("key3", resp) _, expires = store._cache["key3"] assert expires - time.time() < 35 # timeout TTL ≈ 30s def test_inflight_dedup(): store = IdempotencyStore() async def run(): fut1 = await store.acquire_inflight("key4") assert fut1 is None # first caller gets None (becomes processor) fut2 = await store.acquire_inflight("key4") assert fut2 is not None # second caller gets future to wait on resp = JobResponse(job_id="j4", status="ok") store.complete_inflight("key4", resp) result = await asyncio.wait_for(fut2, timeout=1.0) assert result.status == "ok" asyncio.run(run()) def test_evict_on_max_size(): store = IdempotencyStore(max_size=3) for i in range(5): store.put(f"k{i}", JobResponse(job_id=f"j{i}", status="ok")) assert len(store._cache) <= 3