112 lines
3.5 KiB
Python
112 lines
3.5 KiB
Python
import sqlite3
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Literal, Optional
|
|
|
|
from .config import settings
|
|
|
|
|
|
Decision = Literal["created", "exists", "conflict"]
|
|
|
|
|
|
@dataclass
|
|
class IdempotencyResult:
|
|
decision: Decision
|
|
job_id: str
|
|
|
|
|
|
class IdempotencyStore:
|
|
def __init__(self, db_path: str, ttl_s: int) -> None:
|
|
self.db_path = db_path
|
|
self.ttl_s = max(60, int(ttl_s))
|
|
self._lock = threading.Lock()
|
|
self._init_db()
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
return conn
|
|
|
|
def _init_db(self) -> None:
|
|
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS idempotency_jobs (
|
|
idem_key TEXT NOT NULL,
|
|
gen_type TEXT NOT NULL,
|
|
req_hash TEXT NOT NULL,
|
|
job_id TEXT NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
expires_at INTEGER NOT NULL,
|
|
PRIMARY KEY (idem_key, gen_type)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_idem_expires_at ON idempotency_jobs(expires_at)"
|
|
)
|
|
conn.commit()
|
|
|
|
def reserve(
|
|
self,
|
|
*,
|
|
idem_key: str,
|
|
gen_type: str,
|
|
req_hash: str,
|
|
job_id: str,
|
|
) -> IdempotencyResult:
|
|
now = int(time.time())
|
|
expires = now + self.ttl_s
|
|
|
|
with self._lock, self._connect() as conn:
|
|
conn.execute("DELETE FROM idempotency_jobs WHERE expires_at < ?", (now,))
|
|
row = conn.execute(
|
|
"""
|
|
SELECT req_hash, job_id, expires_at
|
|
FROM idempotency_jobs
|
|
WHERE idem_key = ? AND gen_type = ?
|
|
""",
|
|
(idem_key, gen_type),
|
|
).fetchone()
|
|
|
|
if row:
|
|
existing_hash, existing_job_id, existing_exp = row
|
|
if existing_exp >= now:
|
|
if existing_hash == req_hash:
|
|
return IdempotencyResult(decision="exists", job_id=existing_job_id)
|
|
return IdempotencyResult(decision="conflict", job_id=existing_job_id)
|
|
|
|
conn.execute(
|
|
"DELETE FROM idempotency_jobs WHERE idem_key = ? AND gen_type = ?",
|
|
(idem_key, gen_type),
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO idempotency_jobs
|
|
(idem_key, gen_type, req_hash, job_id, created_at, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(idem_key, gen_type, req_hash, job_id, now, expires),
|
|
)
|
|
conn.commit()
|
|
return IdempotencyResult(decision="created", job_id=job_id)
|
|
|
|
|
|
IDEMPOTENCY_STORE: Optional[IdempotencyStore] = None
|
|
|
|
|
|
def init_idempotency_store() -> None:
|
|
global IDEMPOTENCY_STORE
|
|
if IDEMPOTENCY_STORE is not None:
|
|
return
|
|
|
|
db_path = settings.IDEMPOTENCY_DB_PATH
|
|
if not db_path:
|
|
db_path = str(Path(settings.STORAGE_PATH) / "idempotency.sqlite3")
|
|
IDEMPOTENCY_STORE = IdempotencyStore(db_path=db_path, ttl_s=settings.IDEMPOTENCY_TTL_S)
|