from __future__ import annotations import json import math from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from .schemas import AuroraJob, MediaType try: import cv2 # type: ignore[import-untyped] except Exception: # pragma: no cover cv2 = None def _safe_iso(value: Optional[str]) -> Optional[datetime]: if not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except Exception: return None def _processing_time_seconds(job: AuroraJob) -> Optional[int]: meta = job.metadata if isinstance(job.metadata, dict) else {} for key in ("actual_processing_seconds", "processing_time_sec"): if key in meta: try: return int(float(meta[key])) except Exception: pass started = _safe_iso(job.started_at) completed = _safe_iso(job.completed_at) if started and completed: delta = int((completed - started).total_seconds()) return max(0, delta) return None def _models_used(job: AuroraJob) -> List[str]: models: List[str] = [] if job.result and job.result.processing_log: for step in job.result.processing_log: name = str(getattr(step, "model", "") or "").strip() if name and name not in models: models.append(name) return models def _processing_steps(job: AuroraJob) -> List[Any]: if job.result and job.result.processing_log: return list(job.result.processing_log) if job.processing_log: return list(job.processing_log) return [] def _result_media_hash(job: AuroraJob) -> Optional[str]: if not job.result: return None media_type = str(job.media_type).strip().lower() for out in job.result.output_files: out_type = str(getattr(out, "type", "") or "").strip().lower() if out_type in {media_type, "video", "photo", "image", "audio", "unknown"}: value = str(getattr(out, "hash", "") or "").strip() if value: return value return None def _fallback_flags(job: AuroraJob) -> Dict[str, Any]: hard_fallback_used = False soft_sr_fallback_used = False fallback_steps: List[str] = [] warnings: List[str] = [] for step in _processing_steps(job): step_name = str(getattr(step, "step", "") or "").strip() or "unknown" details = getattr(step, "details", {}) or {} if not isinstance(details, dict): continue if bool(details.get("fallback_used")): hard_fallback_used = True fallback_steps.append(step_name) reason = str(details.get("reason") or "").strip() if reason: warnings.append(f"{step_name}: hard fallback used ({reason})") else: warnings.append(f"{step_name}: hard fallback used") sr_fallback_frames = 0 try: sr_fallback_frames = int(details.get("sr_fallback_frames") or 0) except Exception: sr_fallback_frames = 0 if bool(details.get("sr_fallback_used")): sr_fallback_frames = max(sr_fallback_frames, 1) if sr_fallback_frames > 0: soft_sr_fallback_used = True fallback_steps.append(step_name) method = str(details.get("sr_fallback_method") or "").strip() reason = str(details.get("sr_fallback_reason") or "").strip() msg = f"{step_name}: SR soft fallback on {sr_fallback_frames} frame(s)" if method: msg += f" via {method}" if reason: msg += f" ({reason})" warnings.append(msg) fallback_steps_unique = list(dict.fromkeys(fallback_steps)) warnings_unique = list(dict.fromkeys(warnings)) return { "fallback_used": bool(hard_fallback_used or soft_sr_fallback_used), "hard_fallback_used": hard_fallback_used, "soft_sr_fallback_used": soft_sr_fallback_used, "fallback_steps": fallback_steps_unique, "warnings": warnings_unique, } def _detect_faces_with_proxy_confidence(frame_bgr: Any) -> List[Dict[str, Any]]: if cv2 is None: return [] gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) cascade = cv2.CascadeClassifier(str(Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml")) if cascade.empty(): return [] faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(20, 20)) out: List[Dict[str, Any]] = [] for (x, y, w, h) in faces: roi = gray[y : y + h, x : x + w] lap = float(cv2.Laplacian(roi, cv2.CV_64F).var()) if roi.size > 0 else 0.0 # Heuristic "identifiability confidence" proxy from local sharpness. conf = max(0.5, min(0.99, 0.55 + (lap / 400.0))) out.append( { "bbox": [int(x), int(y), int(w), int(h)], "confidence": round(conf, 3), } ) return out def _psnr(img_a: Any, img_b: Any) -> Optional[float]: if cv2 is None: return None if img_a is None or img_b is None: return None if img_a.shape[:2] != img_b.shape[:2]: img_b = cv2.resize(img_b, (img_a.shape[1], img_a.shape[0]), interpolation=cv2.INTER_AREA) a = cv2.cvtColor(img_a, cv2.COLOR_BGR2GRAY) if len(img_a.shape) == 3 else img_a b = cv2.cvtColor(img_b, cv2.COLOR_BGR2GRAY) if len(img_b.shape) == 3 else img_b mse = float(((a.astype("float32") - b.astype("float32")) ** 2).mean()) if mse <= 1e-9: return 99.0 return 20.0 * math.log10(255.0 / math.sqrt(mse)) def _sample_video_frames(path: Path, max_samples: int = 12) -> List[Any]: if cv2 is None: return [] cap = cv2.VideoCapture(str(path)) if not cap.isOpened(): return [] frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) indices: List[int] if frame_count > 0: samples = min(max_samples, max(1, frame_count)) if samples <= 1: indices = [0] else: indices = sorted({int(i * (frame_count - 1) / (samples - 1)) for i in range(samples)}) else: indices = list(range(max_samples)) frames: List[Any] = [] for idx in indices: if frame_count > 0: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ok, frame = cap.read() if ok and frame is not None: frames.append(frame) cap.release() return frames def _face_metrics(source_path: Path, result_path: Path, media_type: MediaType) -> Dict[str, Any]: if cv2 is None: return {"detected": 0, "source_detected": 0, "avg_confidence": 0.0, "identifiable": 0} src_faces_total = 0 out_faces_total = 0 confs: List[float] = [] identifiable = 0 if media_type == "photo": src = cv2.imread(str(source_path), cv2.IMREAD_COLOR) out = cv2.imread(str(result_path), cv2.IMREAD_COLOR) src_faces = _detect_faces_with_proxy_confidence(src) if src is not None else [] out_faces = _detect_faces_with_proxy_confidence(out) if out is not None else [] src_faces_total = len(src_faces) out_faces_total = len(out_faces) confs = [float(x["confidence"]) for x in out_faces] elif media_type == "video": src_frames = _sample_video_frames(source_path, max_samples=10) out_frames = _sample_video_frames(result_path, max_samples=10) for fr in src_frames: src_faces_total += len(_detect_faces_with_proxy_confidence(fr)) for fr in out_frames: faces = _detect_faces_with_proxy_confidence(fr) out_faces_total += len(faces) confs.extend(float(x["confidence"]) for x in faces) if confs: identifiable = sum(1 for c in confs if c >= 0.85) avg_conf = sum(confs) / len(confs) else: avg_conf = 0.0 return { "detected": int(out_faces_total), "source_detected": int(src_faces_total), "avg_confidence": round(float(avg_conf), 3), "identifiable": int(identifiable), } def _plate_metrics(job_output_dir: Path) -> Dict[str, Any]: report = job_output_dir / "plate_detections.json" if not report.exists(): return { "detected": 0, "recognized": 0, "unrecognized": 0, "unrecognized_reason": None, "avg_confidence": 0.0, } try: payload = json.loads(report.read_text(encoding="utf-8")) except Exception: payload = {} detections = payload.get("detections") if isinstance(payload.get("detections"), list) else [] detected = len(detections) recognized = 0 confs: List[float] = [] for d in detections: if not isinstance(d, dict): continue text = str(d.get("text") or "").strip() if text: recognized += 1 try: confs.append(float(d.get("confidence"))) except Exception: pass unrecognized = max(0, detected - recognized) return { "detected": detected, "recognized": recognized, "unrecognized": unrecognized, "unrecognized_reason": "blur_or_ocr_unavailable" if unrecognized > 0 else None, "avg_confidence": round((sum(confs) / len(confs)) if confs else 0.0, 3), } def _overall_metrics(source_path: Path, result_path: Path, media_type: MediaType, job: AuroraJob) -> Dict[str, Any]: psnr_values: List[float] = [] if cv2 is not None: if media_type == "photo": src = cv2.imread(str(source_path), cv2.IMREAD_COLOR) out = cv2.imread(str(result_path), cv2.IMREAD_COLOR) v = _psnr(src, out) if v is not None: psnr_values.append(v) elif media_type == "video": src_frames = _sample_video_frames(source_path, max_samples=8) out_frames = _sample_video_frames(result_path, max_samples=8) for a, b in zip(src_frames, out_frames): v = _psnr(a, b) if v is not None: psnr_values.append(v) psnr_value = round(sum(psnr_values) / len(psnr_values), 2) if psnr_values else None return { "psnr": psnr_value, "processing_time_sec": _processing_time_seconds(job), "models": _models_used(job), } def _resolve_result_media_path(job: AuroraJob, outputs_dir: Path) -> Optional[Path]: if not job.result: return None for out in job.result.output_files: out_type = str(getattr(out, "type", "") or "").strip().lower() if out_type in {"video", "photo", "image", "audio", "unknown"}: p = outputs_dir / job.job_id / str(getattr(out, "name", "") or "") if p.exists(): return p return None def build_quality_report(job: AuroraJob, outputs_dir: Path, *, refresh: bool = False) -> Dict[str, Any]: job_dir = outputs_dir / job.job_id cache_path = job_dir / "quality_report.json" if not refresh and cache_path.exists(): try: return json.loads(cache_path.read_text(encoding="utf-8")) except Exception: pass source_path = Path(job.input_path) result_path = _resolve_result_media_path(job, outputs_dir) if not result_path or not source_path.exists(): raise RuntimeError("Cannot build quality report: source/result file not found") media_type: MediaType = job.media_type processing_flags = _fallback_flags(job) faces = _face_metrics(source_path, result_path, media_type) plates = _plate_metrics(job_dir) overall = _overall_metrics(source_path, result_path, media_type, job) result_hash = _result_media_hash(job) identical_to_input = bool(result_hash and result_hash == str(job.input_hash)) warnings = list(processing_flags.get("warnings") or []) if identical_to_input: warnings.append("output hash matches input hash; enhancement may be skipped.") warnings = list(dict.fromkeys(warnings)) processing_status = "ok" if bool(processing_flags.get("fallback_used")) or identical_to_input: processing_status = "degraded" overall["processing_status"] = processing_status overall["fallback_used"] = bool(processing_flags.get("fallback_used")) overall["hard_fallback_used"] = bool(processing_flags.get("hard_fallback_used")) overall["soft_sr_fallback_used"] = bool(processing_flags.get("soft_sr_fallback_used")) overall["identical_to_input"] = identical_to_input if result_hash: overall["result_hash"] = result_hash if warnings: overall["warnings"] = warnings report = { "job_id": job.job_id, "media_type": media_type, "generated_at": datetime.utcnow().isoformat() + "Z", "faces": faces, "plates": plates, "overall": overall, "processing_flags": { **processing_flags, "identical_to_input": identical_to_input, "warnings": warnings, }, "summary": { "processing_status": processing_status, "faces_detected_ratio": f"{faces['detected']} / {faces['source_detected'] or faces['detected']}", "plates_recognized_ratio": f"{plates['recognized']} / {plates['detected']}", }, } try: job_dir.mkdir(parents=True, exist_ok=True) cache_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") except Exception: pass return report