from __future__ import annotations import json import math from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from .schemas import AuroraJob, MediaType try: import cv2 # type: ignore[import-untyped] except Exception: # pragma: no cover cv2 = None def _safe_iso(value: Optional[str]) -> Optional[datetime]: if not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except Exception: return None def _processing_time_seconds(job: AuroraJob) -> Optional[int]: meta = job.metadata if isinstance(job.metadata, dict) else {} for key in ("actual_processing_seconds", "processing_time_sec"): if key in meta: try: return int(float(meta[key])) except Exception: pass started = _safe_iso(job.started_at) completed = _safe_iso(job.completed_at) if started and completed: delta = int((completed - started).total_seconds()) return max(0, delta) return None def _models_used(job: AuroraJob) -> List[str]: models: List[str] = [] if job.result and job.result.processing_log: for step in job.result.processing_log: name = str(getattr(step, "model", "") or "").strip() if name and name not in models: models.append(name) return models def _detect_faces_with_proxy_confidence(frame_bgr: Any) -> List[Dict[str, Any]]: if cv2 is None: return [] gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) cascade = cv2.CascadeClassifier(str(Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml")) if cascade.empty(): return [] faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(20, 20)) out: List[Dict[str, Any]] = [] for (x, y, w, h) in faces: roi = gray[y : y + h, x : x + w] lap = float(cv2.Laplacian(roi, cv2.CV_64F).var()) if roi.size > 0 else 0.0 # Heuristic "identifiability confidence" proxy from local sharpness. conf = max(0.5, min(0.99, 0.55 + (lap / 400.0))) out.append( { "bbox": [int(x), int(y), int(w), int(h)], "confidence": round(conf, 3), } ) return out def _psnr(img_a: Any, img_b: Any) -> Optional[float]: if cv2 is None: return None if img_a is None or img_b is None: return None if img_a.shape[:2] != img_b.shape[:2]: img_b = cv2.resize(img_b, (img_a.shape[1], img_a.shape[0]), interpolation=cv2.INTER_AREA) a = cv2.cvtColor(img_a, cv2.COLOR_BGR2GRAY) if len(img_a.shape) == 3 else img_a b = cv2.cvtColor(img_b, cv2.COLOR_BGR2GRAY) if len(img_b.shape) == 3 else img_b mse = float(((a.astype("float32") - b.astype("float32")) ** 2).mean()) if mse <= 1e-9: return 99.0 return 20.0 * math.log10(255.0 / math.sqrt(mse)) def _sample_video_frames(path: Path, max_samples: int = 12) -> List[Any]: if cv2 is None: return [] cap = cv2.VideoCapture(str(path)) if not cap.isOpened(): return [] frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) indices: List[int] if frame_count > 0: samples = min(max_samples, max(1, frame_count)) if samples <= 1: indices = [0] else: indices = sorted({int(i * (frame_count - 1) / (samples - 1)) for i in range(samples)}) else: indices = list(range(max_samples)) frames: List[Any] = [] for idx in indices: if frame_count > 0: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ok, frame = cap.read() if ok and frame is not None: frames.append(frame) cap.release() return frames def _face_metrics(source_path: Path, result_path: Path, media_type: MediaType) -> Dict[str, Any]: if cv2 is None: return {"detected": 0, "source_detected": 0, "avg_confidence": 0.0, "identifiable": 0} src_faces_total = 0 out_faces_total = 0 confs: List[float] = [] identifiable = 0 if media_type == "photo": src = cv2.imread(str(source_path), cv2.IMREAD_COLOR) out = cv2.imread(str(result_path), cv2.IMREAD_COLOR) src_faces = _detect_faces_with_proxy_confidence(src) if src is not None else [] out_faces = _detect_faces_with_proxy_confidence(out) if out is not None else [] src_faces_total = len(src_faces) out_faces_total = len(out_faces) confs = [float(x["confidence"]) for x in out_faces] elif media_type == "video": src_frames = _sample_video_frames(source_path, max_samples=10) out_frames = _sample_video_frames(result_path, max_samples=10) for fr in src_frames: src_faces_total += len(_detect_faces_with_proxy_confidence(fr)) for fr in out_frames: faces = _detect_faces_with_proxy_confidence(fr) out_faces_total += len(faces) confs.extend(float(x["confidence"]) for x in faces) if confs: identifiable = sum(1 for c in confs if c >= 0.85) avg_conf = sum(confs) / len(confs) else: avg_conf = 0.0 return { "detected": int(out_faces_total), "source_detected": int(src_faces_total), "avg_confidence": round(float(avg_conf), 3), "identifiable": int(identifiable), } def _plate_metrics(job_output_dir: Path) -> Dict[str, Any]: report = job_output_dir / "plate_detections.json" if not report.exists(): return { "detected": 0, "recognized": 0, "unrecognized": 0, "unrecognized_reason": None, "avg_confidence": 0.0, } try: payload = json.loads(report.read_text(encoding="utf-8")) except Exception: payload = {} detections = payload.get("detections") if isinstance(payload.get("detections"), list) else [] detected = len(detections) recognized = 0 confs: List[float] = [] for d in detections: if not isinstance(d, dict): continue text = str(d.get("text") or "").strip() if text: recognized += 1 try: confs.append(float(d.get("confidence"))) except Exception: pass unrecognized = max(0, detected - recognized) return { "detected": detected, "recognized": recognized, "unrecognized": unrecognized, "unrecognized_reason": "blur_or_ocr_unavailable" if unrecognized > 0 else None, "avg_confidence": round((sum(confs) / len(confs)) if confs else 0.0, 3), } def _overall_metrics(source_path: Path, result_path: Path, media_type: MediaType, job: AuroraJob) -> Dict[str, Any]: psnr_values: List[float] = [] if cv2 is not None: if media_type == "photo": src = cv2.imread(str(source_path), cv2.IMREAD_COLOR) out = cv2.imread(str(result_path), cv2.IMREAD_COLOR) v = _psnr(src, out) if v is not None: psnr_values.append(v) elif media_type == "video": src_frames = _sample_video_frames(source_path, max_samples=8) out_frames = _sample_video_frames(result_path, max_samples=8) for a, b in zip(src_frames, out_frames): v = _psnr(a, b) if v is not None: psnr_values.append(v) psnr_value = round(sum(psnr_values) / len(psnr_values), 2) if psnr_values else None return { "psnr": psnr_value, "processing_time_sec": _processing_time_seconds(job), "models": _models_used(job), } def _resolve_result_media_path(job: AuroraJob, outputs_dir: Path) -> Optional[Path]: if not job.result: return None for out in job.result.output_files: out_type = str(getattr(out, "type", "") or "").strip().lower() if out_type in {"video", "photo", "image", "audio", "unknown"}: p = outputs_dir / job.job_id / str(getattr(out, "name", "") or "") if p.exists(): return p return None def build_quality_report(job: AuroraJob, outputs_dir: Path, *, refresh: bool = False) -> Dict[str, Any]: job_dir = outputs_dir / job.job_id cache_path = job_dir / "quality_report.json" if not refresh and cache_path.exists(): try: return json.loads(cache_path.read_text(encoding="utf-8")) except Exception: pass source_path = Path(job.input_path) result_path = _resolve_result_media_path(job, outputs_dir) if not result_path or not source_path.exists(): raise RuntimeError("Cannot build quality report: source/result file not found") media_type: MediaType = job.media_type faces = _face_metrics(source_path, result_path, media_type) plates = _plate_metrics(job_dir) overall = _overall_metrics(source_path, result_path, media_type, job) report = { "job_id": job.job_id, "media_type": media_type, "generated_at": datetime.utcnow().isoformat() + "Z", "faces": faces, "plates": plates, "overall": overall, "summary": { "faces_detected_ratio": f"{faces['detected']} / {faces['source_detected'] or faces['detected']}", "plates_recognized_ratio": f"{plates['recognized']} / {plates['detected']}", }, } try: job_dir.mkdir(parents=True, exist_ok=True) cache_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") except Exception: pass return report