from __future__ import annotations import json import math import statistics import subprocess from pathlib import Path from typing import Any, Dict, List, Optional, Tuple try: import cv2 # type: ignore[import-untyped] except Exception: # pragma: no cover cv2 = None def _safe_float(value: Any, default: float = 0.0) -> float: try: return float(value) except Exception: return default def _safe_int(value: Any, default: int = 0) -> int: try: return int(float(value)) except Exception: return default def _iso_clamp(v: int, lo: int, hi: int) -> int: return max(lo, min(hi, v)) def _detect_faces(gray_img) -> List[Dict[str, Any]]: if cv2 is None: return [] cascade_path = str(Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml") detector = cv2.CascadeClassifier(cascade_path) if detector.empty(): return [] faces = detector.detectMultiScale( gray_img, scaleFactor=1.1, minNeighbors=4, minSize=(20, 20), ) out: List[Dict[str, Any]] = [] for (x, y, w, h) in faces: out.append( { "bbox": [int(x), int(y), int(w), int(h)], "confidence": 0.75, } ) return out def _detect_plates(gray_img) -> List[Dict[str, Any]]: if cv2 is None: return [] cascade_path = str(Path(cv2.data.haarcascades) / "haarcascade_russian_plate_number.xml") if not Path(cascade_path).exists(): return [] detector = cv2.CascadeClassifier(cascade_path) if detector.empty(): return [] plates = detector.detectMultiScale( gray_img, scaleFactor=1.1, minNeighbors=3, minSize=(28, 10), ) out: List[Dict[str, Any]] = [] for (x, y, w, h) in plates: out.append( { "bbox": [int(x), int(y), int(w), int(h)], "confidence": 0.65, "text": None, } ) return out def _noise_label(noise_sigma: float) -> str: if noise_sigma >= 28: return "high" if noise_sigma >= 14: return "medium" return "low" def _brightness_label(brightness: float) -> str: if brightness < 75: return "low" if brightness > 180: return "high" return "medium" def _blur_label(laplacian_var: float) -> str: if laplacian_var < 45: return "high" if laplacian_var < 120: return "medium" return "low" def _analyze_quality(gray_img) -> Dict[str, Any]: if cv2 is None: return { "noise_level": "unknown", "brightness": "unknown", "blur_level": "unknown", "brightness_value": None, "noise_sigma": None, "laplacian_var": None, } brightness = float(gray_img.mean()) noise_sigma = float(gray_img.std()) lap_var = float(cv2.Laplacian(gray_img, cv2.CV_64F).var()) return { "noise_level": _noise_label(noise_sigma), "brightness": _brightness_label(brightness), "blur_level": _blur_label(lap_var), "brightness_value": round(brightness, 2), "noise_sigma": round(noise_sigma, 2), "laplacian_var": round(lap_var, 2), } def _aggregate_quality(samples: List[Dict[str, Any]]) -> Dict[str, Any]: if not samples: return { "noise_level": "unknown", "brightness": "unknown", "blur_level": "unknown", "brightness_value": None, "noise_sigma": None, "laplacian_var": None, } brightness_values = [float(s["brightness_value"]) for s in samples if s.get("brightness_value") is not None] noise_values = [float(s["noise_sigma"]) for s in samples if s.get("noise_sigma") is not None] lap_values = [float(s["laplacian_var"]) for s in samples if s.get("laplacian_var") is not None] brightness = statistics.mean(brightness_values) if brightness_values else 0.0 noise_sigma = statistics.mean(noise_values) if noise_values else 0.0 lap_var = statistics.mean(lap_values) if lap_values else 0.0 return { "noise_level": _noise_label(noise_sigma), "brightness": _brightness_label(brightness), "blur_level": _blur_label(lap_var), "brightness_value": round(brightness, 2), "noise_sigma": round(noise_sigma, 2), "laplacian_var": round(lap_var, 2), } def probe_video_metadata(path: Path) -> Dict[str, Any]: cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,nb_frames,r_frame_rate,duration", "-show_entries", "format=duration", "-of", "json", str(path), ] try: p = subprocess.run(cmd, check=False, capture_output=True, text=True) if p.returncode != 0 or not p.stdout: return {} payload = json.loads(p.stdout) except Exception: return {} stream = (payload.get("streams") or [{}])[0] if isinstance(payload, dict) else {} fmt = payload.get("format") or {} width = _safe_int(stream.get("width")) height = _safe_int(stream.get("height")) nb_frames = _safe_int(stream.get("nb_frames")) fps_raw = str(stream.get("r_frame_rate") or "0/1") duration = _safe_float(stream.get("duration")) or _safe_float(fmt.get("duration")) fps = 0.0 if "/" in fps_raw: num_s, den_s = fps_raw.split("/", 1) num = _safe_float(num_s) den = _safe_float(den_s, 1.0) if den > 0: fps = num / den elif fps_raw: fps = _safe_float(fps_raw) if nb_frames <= 0 and duration > 0 and fps > 0: nb_frames = int(duration * fps) return { "width": width, "height": height, "fps": round(fps, 3) if fps > 0 else None, "frame_count": nb_frames if nb_frames > 0 else None, "duration_seconds": round(duration, 3) if duration > 0 else None, } def estimate_processing_seconds( *, media_type: str, mode: str, width: int = 0, height: int = 0, frame_count: int = 0, ) -> Optional[int]: if media_type == "video": if frame_count <= 0: return None megapixels = max(0.15, (max(1, width) * max(1, height)) / 1_000_000.0) per_frame = 0.8 * megapixels if mode == "tactical" else 1.35 * megapixels per_frame = max(0.08, min(9.0, per_frame)) overhead = 6 if mode == "tactical" else 12 return int(math.ceil(frame_count * per_frame + overhead)) if media_type == "photo": megapixels = max(0.15, (max(1, width) * max(1, height)) / 1_000_000.0) base = 3.0 if mode == "tactical" else 6.0 return int(math.ceil(base + megapixels * (3.0 if mode == "tactical" else 5.0))) return None def _recommendations( *, faces_count: int, plates_count: int, quality: Dict[str, Any], media_type: str, ) -> Tuple[List[str], str]: recs: List[str] = [] noise_level = quality.get("noise_level") brightness = quality.get("brightness") blur_level = quality.get("blur_level") if noise_level == "high": recs.append("Enable denoise (FastDVDnet/SCUNet) before enhancement.") if brightness == "low": recs.append("Apply low-light normalization before super-resolution.") if blur_level in {"medium", "high"}: recs.append("Enable sharpening after upscaling to recover edges.") if faces_count > 0: recs.append("Run face restoration (GFPGAN) as priority stage.") if plates_count > 0: recs.append("Run license-plate ROI enhancement with focused sharpening.") if not recs: recs.append("Balanced enhancement pipeline is sufficient for this media.") if faces_count > 0 and faces_count >= plates_count: priority = "faces" elif plates_count > 0: priority = "plates" elif media_type == "photo": priority = "details" else: priority = "balanced" return recs, priority def _suggested_export(media_type: str, quality: Dict[str, Any], width: int, height: int) -> Dict[str, Any]: if media_type == "video": if width >= 3840 or height >= 2160: resolution = "original" elif width >= 1920 or height >= 1080: resolution = "4k" else: resolution = "1080p" codec = "mp4_h264" if quality.get("noise_level") != "high" else "mp4_h265" return { "resolution": resolution, "format": codec, "roi": "auto_faces", } return { "resolution": "original", "format": "png", "roi": "full_frame", } def analyze_photo(path: Path) -> Dict[str, Any]: if cv2 is None: raise RuntimeError("opencv-python-headless is not installed") frame = cv2.imread(str(path), cv2.IMREAD_COLOR) if frame is None: raise RuntimeError("Cannot decode uploaded image") h, w = frame.shape[:2] gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = _detect_faces(gray) plates = _detect_plates(gray) quality = _analyze_quality(gray) recs, priority = _recommendations( faces_count=len(faces), plates_count=len(plates), quality=quality, media_type="photo", ) return { "media_type": "photo", "frame_sampled": 1, "resolution": {"width": w, "height": h}, "faces": faces, "license_plates": plates, "quality_analysis": quality, "recommendations": recs, "suggested_priority": priority, "suggested_export": _suggested_export("photo", quality, w, h), "estimated_processing_seconds": estimate_processing_seconds( media_type="photo", mode="tactical", width=w, height=h, frame_count=1, ), } def _sample_video_frames(path: Path, max_samples: int = 24) -> Tuple[List[Tuple[int, Any]], Dict[str, Any]]: if cv2 is None: raise RuntimeError("opencv-python-headless is not installed") cap = cv2.VideoCapture(str(path)) if not cap.isOpened(): raise RuntimeError("Cannot open uploaded video") frame_count = _safe_int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = _safe_float(cap.get(cv2.CAP_PROP_FPS)) width = _safe_int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = _safe_int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) indices: List[int] = [] if frame_count > 0: sample_count = min(max_samples, frame_count) if sample_count <= 1: indices = [0] else: indices = sorted({int(i * (frame_count - 1) / (sample_count - 1)) for i in range(sample_count)}) else: indices = list(range(max_samples)) sampled: List[Tuple[int, Any]] = [] for idx in indices: if frame_count > 0: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ok, frame = cap.read() if not ok or frame is None: continue sampled.append((idx, frame)) cap.release() duration = (frame_count / fps) if (frame_count > 0 and fps > 0) else None meta = { "frame_count": frame_count if frame_count > 0 else None, "fps": round(fps, 3) if fps > 0 else None, "width": width, "height": height, "duration_seconds": round(duration, 3) if duration else None, } return sampled, meta def analyze_video(path: Path) -> Dict[str, Any]: sampled, meta = _sample_video_frames(path, max_samples=24) if not sampled: raise RuntimeError("Cannot sample frames from uploaded video") all_faces: List[Dict[str, Any]] = [] all_plates: List[Dict[str, Any]] = [] quality_samples: List[Dict[str, Any]] = [] for frame_idx, frame in sampled: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # type: ignore[union-attr] faces = _detect_faces(gray) plates = _detect_plates(gray) for f in faces: f["frame_index"] = frame_idx all_faces.append(f) for p in plates: p["frame_index"] = frame_idx all_plates.append(p) quality_samples.append(_analyze_quality(gray)) quality = _aggregate_quality(quality_samples) recs, priority = _recommendations( faces_count=len(all_faces), plates_count=len(all_plates), quality=quality, media_type="video", ) width = _safe_int(meta.get("width")) height = _safe_int(meta.get("height")) frame_count = _safe_int(meta.get("frame_count")) return { "media_type": "video", "frame_sampled": len(sampled), "video_metadata": meta, "faces": all_faces[:120], "license_plates": all_plates[:120], "quality_analysis": quality, "recommendations": recs, "suggested_priority": priority, "suggested_export": _suggested_export("video", quality, width, height), "estimated_processing_seconds": estimate_processing_seconds( media_type="video", mode="tactical", width=width, height=height, frame_count=frame_count, ), }