Files

112 lines
3.5 KiB
Python

import sqlite3
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Literal, Optional
from .config import settings
Decision = Literal["created", "exists", "conflict"]
@dataclass
class IdempotencyResult:
decision: Decision
job_id: str
class IdempotencyStore:
def __init__(self, db_path: str, ttl_s: int) -> None:
self.db_path = db_path
self.ttl_s = max(60, int(ttl_s))
self._lock = threading.Lock()
self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def _init_db(self) -> None:
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS idempotency_jobs (
idem_key TEXT NOT NULL,
gen_type TEXT NOT NULL,
req_hash TEXT NOT NULL,
job_id TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
PRIMARY KEY (idem_key, gen_type)
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_idem_expires_at ON idempotency_jobs(expires_at)"
)
conn.commit()
def reserve(
self,
*,
idem_key: str,
gen_type: str,
req_hash: str,
job_id: str,
) -> IdempotencyResult:
now = int(time.time())
expires = now + self.ttl_s
with self._lock, self._connect() as conn:
conn.execute("DELETE FROM idempotency_jobs WHERE expires_at < ?", (now,))
row = conn.execute(
"""
SELECT req_hash, job_id, expires_at
FROM idempotency_jobs
WHERE idem_key = ? AND gen_type = ?
""",
(idem_key, gen_type),
).fetchone()
if row:
existing_hash, existing_job_id, existing_exp = row
if existing_exp >= now:
if existing_hash == req_hash:
return IdempotencyResult(decision="exists", job_id=existing_job_id)
return IdempotencyResult(decision="conflict", job_id=existing_job_id)
conn.execute(
"DELETE FROM idempotency_jobs WHERE idem_key = ? AND gen_type = ?",
(idem_key, gen_type),
)
conn.execute(
"""
INSERT INTO idempotency_jobs
(idem_key, gen_type, req_hash, job_id, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(idem_key, gen_type, req_hash, job_id, now, expires),
)
conn.commit()
return IdempotencyResult(decision="created", job_id=job_id)
IDEMPOTENCY_STORE: Optional[IdempotencyStore] = None
def init_idempotency_store() -> None:
global IDEMPOTENCY_STORE
if IDEMPOTENCY_STORE is not None:
return
db_path = settings.IDEMPOTENCY_DB_PATH
if not db_path:
db_path = str(Path(settings.STORAGE_PATH) / "idempotency.sqlite3")
IDEMPOTENCY_STORE = IdempotencyStore(db_path=db_path, ttl_s=settings.IDEMPOTENCY_TTL_S)