Files
microdao-daarion/tests/conftest.py
Apple 5a886a56ca test(sofiia-console): cover idempotency and cursor pagination contracts
Add focused API contract tests for chat idempotency, cursor pagination, and node routing behavior using isolated local fixtures and mocked upstream inference.

Made-with: Cursor
2026-03-02 04:03:30 -08:00

66 lines
1.9 KiB
Python

from __future__ import annotations
import importlib
import sys
from pathlib import Path
import asyncio
import httpx
import pytest
_ROOT = Path(__file__).resolve().parent.parent
_SOFIIA_PATH = _ROOT / "services" / "sofiia-console"
if str(_SOFIIA_PATH) not in sys.path:
sys.path.insert(0, str(_SOFIIA_PATH))
@pytest.fixture
def sofiia_module(tmp_path, monkeypatch):
"""Reload sofiia-console app with isolated env and DB path."""
monkeypatch.setenv("SOFIIA_DATA_DIR", str(tmp_path / "sofiia-data"))
monkeypatch.setenv("ENV", "dev")
monkeypatch.delenv("SOFIIA_CONSOLE_API_KEY", raising=False)
monkeypatch.setenv("ROUTER_URL", "http://router.local:8000")
monkeypatch.delenv("NODE_ID", raising=False)
# Python 3.9 + pytest-asyncio strict mode may not have a default loop.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
import app.db as db_mod # type: ignore
import app.main as main_mod # type: ignore
importlib.reload(db_mod)
importlib.reload(main_mod)
main_mod._rate_buckets.clear()
main_mod._idempotency_cache.clear()
return main_mod
@pytest.fixture
def sofiia_client(sofiia_module):
class _LocalClient:
def __init__(self, app):
self.app = app
def request(self, method: str, path: str, **kwargs):
async def _do():
transport = httpx.ASGITransport(app=self.app)
async with httpx.AsyncClient(
transport=transport,
base_url="http://testserver",
follow_redirects=True,
) as client:
return await client.request(method, path, **kwargs)
return asyncio.run(_do())
def get(self, path: str, **kwargs):
return self.request("GET", path, **kwargs)
def post(self, path: str, **kwargs):
return self.request("POST", path, **kwargs)
return _LocalClient(sofiia_module.app)