diff --git a/services/sofiia-console/app/idempotency.py b/services/sofiia-console/app/idempotency.py index 32ec20f7..f8b90266 100644 --- a/services/sofiia-console/app/idempotency.py +++ b/services/sofiia-console/app/idempotency.py @@ -2,10 +2,13 @@ from __future__ import annotations import os import time +import json from collections import OrderedDict from dataclasses import dataclass from typing import Any, Dict, Optional, Protocol +from .logging import log_event + @dataclass class ReplayEntry: @@ -71,9 +74,62 @@ class InMemoryIdempotencyStore: self._values.clear() +class RedisIdempotencyStore: + def __init__(self, redis_client: Any, ttl_seconds: int = 900, prefix: str = "sofiia:idem:") -> None: + self._redis = redis_client + self._ttl_seconds = max(60, int(ttl_seconds)) + self._prefix = str(prefix or "sofiia:idem:") + + def _k(self, key: str) -> str: + return f"{self._prefix}{key}" + + def get(self, key: str) -> Optional[ReplayEntry]: + raw = self._redis.get(self._k(key)) + if raw is None: + return None + if isinstance(raw, bytes): + raw = raw.decode("utf-8", errors="ignore") + try: + payload = json.loads(str(raw)) + except Exception: + return None + if not isinstance(payload, dict): + return None + return ReplayEntry( + message_id=str(payload.get("message_id") or ""), + response_body=dict(payload.get("response_body") or {}), + created_at=float(payload.get("created_at") or 0.0), + node_id=str(payload.get("node_id") or ""), + ) + + def set(self, key: str, entry: ReplayEntry) -> None: + payload = { + "message_id": entry.message_id, + "response_body": entry.response_body, + "created_at": float(entry.created_at), + "node_id": entry.node_id, + } + self._redis.set(self._k(key), json.dumps(payload, ensure_ascii=True), ex=self._ttl_seconds) + + # Debug/test helpers + def delete(self, key: str) -> None: + self._redis.delete(self._k(key)) + + def reset(self) -> None: + keys = self._redis.keys(f"{self._prefix}*") + if keys: + self._redis.delete(*keys) + + _STORE: Optional[IdempotencyStore] = None +def _make_redis_client(redis_url: str) -> Any: + import redis # type: ignore + + return redis.Redis.from_url(redis_url, decode_responses=False) + + def get_idempotency_store() -> IdempotencyStore: global _STORE if _STORE is None: @@ -83,7 +139,25 @@ def get_idempotency_store() -> IdempotencyStore: os.getenv("CHAT_IDEMPOTENCY_TTL_SEC", "900"), ) ) - max_size = int(os.getenv("SOFIIA_IDEMPOTENCY_MAX", "5000")) - _STORE = InMemoryIdempotencyStore(ttl_seconds=ttl, max_size=max_size) + backend = os.getenv("SOFIIA_IDEMPOTENCY_BACKEND", "inmemory").strip().lower() or "inmemory" + if backend == "redis": + redis_url = os.getenv("SOFIIA_REDIS_URL", "redis://localhost:6379/0").strip() + prefix = os.getenv("SOFIIA_REDIS_PREFIX", "sofiia:idem:").strip() or "sofiia:idem:" + try: + client = _make_redis_client(redis_url) + _STORE = RedisIdempotencyStore(client, ttl_seconds=ttl, prefix=prefix) + except Exception as exc: + max_size = int(os.getenv("SOFIIA_IDEMPOTENCY_MAX", "5000")) + _STORE = InMemoryIdempotencyStore(ttl_seconds=ttl, max_size=max_size) + log_event( + "idempotency.backend.fallback", + backend="redis", + status="degraded", + error_code="redis_unavailable", + error=str(exc)[:180], + ) + else: + max_size = int(os.getenv("SOFIIA_IDEMPOTENCY_MAX", "5000")) + _STORE = InMemoryIdempotencyStore(ttl_seconds=ttl, max_size=max_size) return _STORE diff --git a/services/sofiia-console/requirements.txt b/services/sofiia-console/requirements.txt index 71ba37d6..9c24b35c 100644 --- a/services/sofiia-console/requirements.txt +++ b/services/sofiia-console/requirements.txt @@ -5,6 +5,7 @@ python-multipart>=0.0.6 pyyaml>=6.0 python-dotenv>=1.0.0 prometheus-client>=0.20.0 +redis>=5.0.0 # Projects / Documents / Sessions persistence (Phase 1: SQLite) aiosqlite>=0.20.0 # Document text extraction (optional — used when available)