from __future__ import annotations import json import logging import shutil import threading from pathlib import Path from typing import Any, Dict, List, Optional from .schemas import AuroraJob, AuroraResult, AuroraMode, JobStatus, MediaType, ProcessingStep logger = logging.getLogger(__name__) def _model_dump(model: Any) -> Dict[str, Any]: if hasattr(model, "model_dump"): return model.model_dump() return model.dict() class JobStore: def __init__(self, data_dir: Path) -> None: self.data_dir = data_dir self.jobs_dir = data_dir / "jobs" self.uploads_dir = data_dir / "uploads" self.outputs_dir = data_dir / "outputs" self.jobs_dir.mkdir(parents=True, exist_ok=True) self.uploads_dir.mkdir(parents=True, exist_ok=True) self.outputs_dir.mkdir(parents=True, exist_ok=True) self._lock = threading.RLock() self._jobs: Dict[str, AuroraJob] = {} self._load_existing_jobs() def _job_path(self, job_id: str) -> Path: return self.jobs_dir / f"{job_id}.json" def _save_job(self, job: AuroraJob) -> None: self._job_path(job.job_id).write_text( json.dumps(_model_dump(job), ensure_ascii=False, indent=2), encoding="utf-8", ) def _load_existing_jobs(self) -> None: for path in sorted(self.jobs_dir.glob("*.json")): try: payload = json.loads(path.read_text(encoding="utf-8")) job = AuroraJob(**payload) self._jobs[job.job_id] = job except Exception as exc: logger.warning("Skipping unreadable job file %s: %s", path, exc) def create_job( self, *, job_id: str, file_name: str, input_path: Path, input_hash: str, mode: AuroraMode, media_type: MediaType, created_at: str, metadata: Optional[Dict[str, Any]] = None, ) -> AuroraJob: job = AuroraJob( job_id=job_id, file_name=file_name, mode=mode, media_type=media_type, input_path=str(input_path), input_hash=input_hash, created_at=created_at, metadata=metadata or {}, ) with self._lock: self._jobs[job_id] = job self._save_job(job) return job def get_job(self, job_id: str) -> Optional[AuroraJob]: with self._lock: return self._jobs.get(job_id) def list_jobs(self) -> List[AuroraJob]: with self._lock: return list(self._jobs.values()) def patch_job(self, job_id: str, **changes: Any) -> AuroraJob: with self._lock: current = self._jobs.get(job_id) if not current: raise KeyError(job_id) payload = _model_dump(current) payload.update(changes) payload["job_id"] = job_id updated = AuroraJob(**payload) self._jobs[job_id] = updated self._save_job(updated) return updated def append_processing_step(self, job_id: str, step: ProcessingStep) -> AuroraJob: job = self.get_job(job_id) if not job: raise KeyError(job_id) steps = list(job.processing_log) steps.append(step) return self.patch_job(job_id, processing_log=steps) def set_progress(self, job_id: str, *, progress: int, current_stage: str) -> AuroraJob: bounded = max(0, min(100, int(progress))) return self.patch_job(job_id, progress=bounded, current_stage=current_stage) def mark_processing(self, job_id: str, *, started_at: str) -> AuroraJob: return self.patch_job( job_id, status="processing", progress=1, current_stage="dispatching", started_at=started_at, error_message=None, ) def mark_completed(self, job_id: str, *, result: AuroraResult, completed_at: str) -> AuroraJob: return self.patch_job( job_id, status="completed", progress=100, current_stage="completed", result=result, completed_at=completed_at, error_message=None, ) def mark_failed(self, job_id: str, *, message: str, completed_at: str) -> AuroraJob: return self.patch_job( job_id, status="failed", current_stage="failed", error_message=message, completed_at=completed_at, ) def request_cancel(self, job_id: str) -> AuroraJob: job = self.get_job(job_id) if not job: raise KeyError(job_id) if job.status in ("completed", "failed", "cancelled"): return job if job.status == "queued": return self.patch_job( job_id, status="cancelled", current_stage="cancelled", cancel_requested=True, progress=0, ) return self.patch_job( job_id, cancel_requested=True, current_stage="cancelling", ) def delete_job(self, job_id: str, *, remove_artifacts: bool = True) -> bool: with self._lock: current = self._jobs.pop(job_id, None) if not current: return False self._job_path(job_id).unlink(missing_ok=True) if remove_artifacts: shutil.rmtree(self.uploads_dir / job_id, ignore_errors=True) shutil.rmtree(self.outputs_dir / job_id, ignore_errors=True) return True def mark_cancelled(self, job_id: str, *, completed_at: str, message: str = "Cancelled by user") -> AuroraJob: return self.patch_job( job_id, status="cancelled", current_stage="cancelled", cancel_requested=True, error_message=message, completed_at=completed_at, ) def count_by_status(self) -> Dict[JobStatus, int]: counts: Dict[JobStatus, int] = { "queued": 0, "processing": 0, "completed": 0, "failed": 0, "cancelled": 0, } with self._lock: for job in self._jobs.values(): counts[job.status] += 1 return counts def recover_interrupted_jobs( self, *, completed_at: str, message: str, strategy: str = "failed", ) -> int: """Recover queued/processing jobs after service restart. strategy: - "failed": mark as failed - "requeue": move back to queue for auto-retry on startup """ mode = (strategy or "failed").strip().lower() recovered = 0 with self._lock: for job_id, current in list(self._jobs.items()): if current.status not in ("queued", "processing"): continue payload = _model_dump(current) meta = payload.get("metadata") or {} if not isinstance(meta, dict): meta = {} meta["recovery_count"] = int(meta.get("recovery_count", 0)) + 1 meta["last_recovery_at"] = completed_at meta["last_recovery_reason"] = message payload["metadata"] = meta if mode == "requeue": payload.update( { "status": "queued", "current_stage": "queued (recovered after restart)", "error_message": None, "started_at": None, "completed_at": None, "cancel_requested": False, "progress": 0, } ) else: payload.update( { "status": "failed", "current_stage": "failed", "error_message": message, "completed_at": completed_at, "progress": max(1, int(payload.get("progress", 0))), } ) payload["job_id"] = job_id updated = AuroraJob(**payload) self._jobs[job_id] = updated self._save_job(updated) recovered += 1 return recovered