Files
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

1969 lines
72 KiB
Python

from __future__ import annotations
import hashlib
import importlib
import json
import logging
import os
import queue
import shutil
import subprocess
import sys
import threading
import time
import uuid
from dataclasses import dataclass, field
from functools import lru_cache
from fractions import Fraction
from pathlib import Path
from threading import Lock
from typing import Any, Callable, Dict, List, Optional, Tuple
from .schemas import AuroraMode, MediaType, ProcessingStep
logger = logging.getLogger("aurora.subagents")
try:
import cv2 # type: ignore[import-untyped]
except Exception: # pragma: no cover - handled at runtime
cv2 = None
try:
import numpy as np # type: ignore[import-untyped]
except Exception: # pragma: no cover - handled at runtime
np = None
GFPGAN_MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth"
REALESRGAN_MODEL_URL = (
"https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth"
)
def _env_flag(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _is_container_runtime() -> bool:
return Path("/.dockerenv").exists() or bool(os.getenv("KUBERNETES_SERVICE_HOST"))
@lru_cache(maxsize=1)
def _ffmpeg_hwaccels_text() -> str:
try:
return _run_command(["ffmpeg", "-hide_banner", "-hwaccels"])
except Exception:
return ""
@lru_cache(maxsize=1)
def _ffmpeg_encoders_text() -> str:
try:
return _run_command(["ffmpeg", "-hide_banner", "-encoders"])
except Exception:
return ""
def _ffmpeg_has_hwaccel(name: str) -> bool:
text = _ffmpeg_hwaccels_text().lower()
return any(line.strip() == name.lower() for line in text.splitlines())
def _ffmpeg_has_encoder(name: str) -> bool:
text = _ffmpeg_encoders_text().lower()
return f" {name.lower()} " in f" {text} "
def _torch_capabilities() -> Dict[str, object]:
payload: Dict[str, object] = {
"torch": False,
"torch_version": None,
"cuda_available": False,
"mps_backend": False,
"mps_available": False,
"mps_built": False,
}
try:
import torch # type: ignore[import-untyped]
payload["torch"] = True
payload["torch_version"] = getattr(torch, "__version__", None)
payload["cuda_available"] = bool(torch.cuda.is_available())
mps_backend = getattr(torch.backends, "mps", None)
payload["mps_backend"] = bool(mps_backend)
payload["mps_available"] = bool(mps_backend and mps_backend.is_available())
payload["mps_built"] = bool(mps_backend and mps_backend.is_built())
except Exception:
pass
return payload
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(1024 * 1024)
if not chunk:
break
digest.update(chunk)
return f"sha256:{digest.hexdigest()}"
def _copy_with_stage_suffix(input_path: Path, output_dir: Path, stage_suffix: str) -> Path:
output_dir.mkdir(parents=True, exist_ok=True)
suffix = input_path.suffix or ".bin"
staged = output_dir / f"{input_path.stem}_{stage_suffix}{suffix}"
shutil.copy2(input_path, staged)
return staged
def _run_command(args: List[str]) -> str:
process = subprocess.run(
args,
check=False,
capture_output=True,
text=True,
)
if process.returncode != 0:
stderr = (process.stderr or "").strip()
raise RuntimeError(f"Command failed ({process.returncode}): {' '.join(args)}\n{stderr}")
return (process.stdout or "").strip()
def _ffmpeg_available() -> bool:
return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None
def runtime_diagnostics() -> Dict[str, object]:
torch_caps = _torch_capabilities()
device = _ModelCache._device()
is_container = _is_container_runtime()
force_cpu = _env_flag("AURORA_FORCE_CPU", is_container)
prefer_mps = _env_flag("AURORA_PREFER_MPS", True)
enable_vtb = _env_flag("AURORA_ENABLE_VIDEOTOOLBOX", True)
return {
"opencv": cv2 is not None,
"ffmpeg": _ffmpeg_available(),
"ffmpeg_videotoolbox_hwaccel": _ffmpeg_has_hwaccel("videotoolbox"),
"ffmpeg_h264_videotoolbox": _ffmpeg_has_encoder("h264_videotoolbox"),
"ffmpeg_hevc_videotoolbox": _ffmpeg_has_encoder("hevc_videotoolbox"),
"torch": bool(torch_caps["torch"]),
"torch_version": torch_caps["torch_version"],
"cuda_available": bool(torch_caps["cuda_available"]),
"mps_backend": bool(torch_caps["mps_backend"]),
"mps_available": bool(torch_caps["mps_available"]),
"mps_built": bool(torch_caps["mps_built"]),
"force_cpu": force_cpu,
"prefer_mps": prefer_mps,
"enable_videotoolbox": enable_vtb,
"device": device,
"container_runtime": _is_container_runtime(),
"models_dir": os.getenv("AURORA_MODELS_DIR", "/data/aurora/models"),
}
class PipelineCancelledError(RuntimeError):
pass
@dataclass
class SubagentContext:
job_id: str
mode: AuroraMode
media_type: MediaType
input_hash: str
output_dir: Path
priority: str = "balanced"
export_options: Dict[str, object] = field(default_factory=dict)
cancel_check: Optional[Callable[[], bool]] = None
stage_progress: Optional[Callable[[float, str], None]] = None
@dataclass
class SubagentRunResult:
output_path: Path
steps: List[ProcessingStep] = field(default_factory=list)
artifacts: List[Path] = field(default_factory=list)
metadata: Dict[str, str] = field(default_factory=dict)
def _resolve_models_dir() -> Path:
target = Path(os.getenv("AURORA_MODELS_DIR", "/data/aurora/models")).expanduser()
target.mkdir(parents=True, exist_ok=True)
return target
def _ensure_persistent_gfpgan_weights() -> Path:
persistent = _resolve_models_dir() / "gfpgan_weights"
persistent.mkdir(parents=True, exist_ok=True)
# In containers, some libs expect /app/gfpgan/weights.
# In native macOS run we may not have write access to /app, so keep this best-effort.
runtime_weights = Path(os.getenv("AURORA_GFPGAN_RUNTIME_WEIGHTS_DIR", "/app/gfpgan/weights"))
try:
runtime_weights.parent.mkdir(parents=True, exist_ok=True)
if runtime_weights.exists() and not runtime_weights.is_symlink():
for item in runtime_weights.iterdir():
dst = persistent / item.name
if not dst.exists():
shutil.move(str(item), str(dst))
shutil.rmtree(runtime_weights, ignore_errors=True)
if not runtime_weights.exists():
runtime_weights.symlink_to(persistent, target_is_directory=True)
except Exception:
pass
return persistent
def _warmup_gfpgan(restorer: object) -> None:
"""Run a tiny inference to trigger MPS JIT compilation up front."""
try:
dummy = np.zeros((64, 64, 3), dtype=np.uint8)
restorer.enhance(dummy, has_aligned=False, only_center_face=False, paste_back=True) # type: ignore[attr-defined]
except Exception:
pass
class _ModelCache:
_lock = Lock()
_gfpgan_by_mode: Dict[AuroraMode, object] = {}
_realesrgan_by_mode: Dict[AuroraMode, object] = {}
@classmethod
def _download_model(cls, *, url: str, file_name: str) -> Path:
target = _resolve_models_dir() / file_name
if target.exists():
return target
from basicsr.utils.download_util import load_file_from_url # type: ignore[import-untyped]
downloaded = load_file_from_url(
url=url,
model_dir=str(target.parent),
file_name=file_name,
progress=True,
)
return Path(downloaded)
@classmethod
def _device(cls) -> str:
is_container = _is_container_runtime()
force_cpu = _env_flag("AURORA_FORCE_CPU", is_container)
if force_cpu:
return "cpu"
prefer_mps = _env_flag("AURORA_PREFER_MPS", True)
try:
import torch # type: ignore[import-untyped]
if torch.cuda.is_available():
return "cuda"
mps_be = getattr(torch.backends, "mps", None)
if prefer_mps and mps_be and mps_be.is_available() and mps_be.is_built():
return "mps"
except Exception:
return "cpu"
return "cpu"
@classmethod
def _patch_torchvision_compat(cls) -> None:
try:
importlib.import_module("torchvision.transforms.functional_tensor")
return
except Exception:
pass
try:
ft = importlib.import_module("torchvision.transforms._functional_tensor")
sys.modules["torchvision.transforms.functional_tensor"] = ft
except Exception:
return
@classmethod
def gfpgan(cls, mode: AuroraMode) -> object:
with cls._lock:
cached = cls._gfpgan_by_mode.get(mode)
if cached is not None:
return cached
cls._patch_torchvision_compat()
_ensure_persistent_gfpgan_weights()
from gfpgan import GFPGANer # type: ignore[import-untyped]
model_path = cls._download_model(url=GFPGAN_MODEL_URL, file_name="GFPGANv1.4.pth")
device = cls._device()
logger.info("Loading GFPGAN mode=%s device=%s", mode, device)
t0 = time.monotonic()
restorer = GFPGANer(
model_path=str(model_path),
upscale=1,
arch="clean",
channel_multiplier=2,
bg_upsampler=None,
device=device,
)
if device == "mps" and np is not None:
_warmup_gfpgan(restorer)
logger.info("GFPGAN ready mode=%s device=%s elapsed=%.1fs", mode, device, time.monotonic() - t0)
cls._gfpgan_by_mode[mode] = restorer
return restorer
@classmethod
def realesrgan(cls, mode: AuroraMode) -> object:
with cls._lock:
cached = cls._realesrgan_by_mode.get(mode)
if cached is not None:
return cached
cls._patch_torchvision_compat()
from basicsr.archs.rrdbnet_arch import RRDBNet # type: ignore[import-untyped]
from realesrgan import RealESRGANer # type: ignore[import-untyped]
model_path = cls._download_model(url=REALESRGAN_MODEL_URL, file_name="RealESRGAN_x4plus.pth")
rrdb = RRDBNet(
num_in_ch=3,
num_out_ch=3,
num_feat=64,
num_block=23,
num_grow_ch=32,
scale=4,
)
device = cls._device()
use_half = device in ("cuda", "mps")
if mode == "tactical":
tile = 256
elif device == "cpu":
tile = int(os.getenv("AURORA_CPU_FORENSIC_TILE", "192"))
else:
tile = 0
logger.info("Loading RealESRGAN mode=%s device=%s half=%s tile=%d", mode, device, use_half, tile)
t0 = time.monotonic()
upsampler = RealESRGANer(
scale=4,
model_path=str(model_path),
model=rrdb,
tile=tile,
tile_pad=10,
pre_pad=0,
half=use_half,
device=device,
)
logger.info("RealESRGAN ready mode=%s device=%s elapsed=%.1fs", mode, device, time.monotonic() - t0)
cls._realesrgan_by_mode[mode] = upsampler
return upsampler
def _clamp_int(val: int, low: int, high: int) -> int:
return max(low, min(high, int(val)))
def _option_bool(opts: Optional[Dict[str, object]], key: str, default: bool) -> bool:
if not opts:
return default
raw = opts.get(key)
if raw is None:
return default
if isinstance(raw, bool):
return raw
if isinstance(raw, (int, float)):
return bool(raw)
return str(raw).strip().lower() in {"1", "true", "yes", "on"}
def _option_str(opts: Optional[Dict[str, object]], key: str, default: str = "") -> str:
if not opts:
return default
raw = opts.get(key)
if raw is None:
return default
return str(raw).strip()
def _option_float(opts: Optional[Dict[str, object]], key: str, default: float) -> float:
if not opts:
return default
raw = opts.get(key)
if raw is None:
return default
try:
return float(raw)
except Exception:
return default
def _face_pipeline_config(
*,
mode: AuroraMode,
media_type: MediaType,
priority: str,
export_options: Optional[Dict[str, object]],
) -> Dict[str, object]:
opts = export_options or {}
roi_hint = _option_str(opts, "roi", "").lower()
task_hint = _option_str(opts, "task_hint", "")
hint_lower = task_hint.lower()
focus_profile = _option_str(opts, "focus_profile", "auto").lower()
if focus_profile not in {"auto", "max_faces", "text_readability", "plates"}:
focus_profile = "auto"
if focus_profile == "auto":
text_keywords = ("text", "logo", "label", "cap", "hat", "надпис", "напис", "кеп")
face_keywords = ("face", "portrait", "облич", "портрет")
plate_keywords = ("plate", "license", "номер", "знак")
if any(k in hint_lower for k in text_keywords):
focus_profile = "text_readability"
elif any(k in hint_lower for k in face_keywords):
focus_profile = "max_faces"
elif any(k in hint_lower for k in plate_keywords):
focus_profile = "plates"
focus_faces = focus_profile == "max_faces"
text_focus = focus_profile == "text_readability" or _option_bool(opts, "text_focus", False)
focus_plates = focus_profile == "plates"
roi_only_default = roi_hint in {"faces", "face", "auto_faces"} or priority == "faces" or focus_faces
pre_denoise_default = media_type == "video" and (mode == "forensic" or priority == "faces" or text_focus or focus_plates)
temporal_default = media_type == "video" and (mode == "forensic" or priority == "faces" or text_focus)
deblur_default = priority == "faces" or mode == "forensic" or text_focus or focus_plates
score_loop_default = mode == "forensic" or priority == "faces" or text_focus
face_model = _option_str(opts, "face_model", "auto").lower()
if face_model not in {"auto", "gfpgan", "codeformer"}:
face_model = "auto"
if focus_faces and face_model == "auto":
face_model = "codeformer"
return {
"roi_only_faces": _option_bool(opts, "roi_only_faces", roi_only_default),
"pre_denoise": _option_bool(opts, "pre_denoise", pre_denoise_default),
"temporal_denoise": _option_bool(opts, "temporal_denoise", temporal_default),
"deblur_before_face": _option_bool(opts, "deblur_before_face", deblur_default),
"score_loop": _option_bool(opts, "score_loop", score_loop_default),
"face_model": face_model,
"denoise_strength": max(1.0, min(15.0, _option_float(opts, "denoise_strength", 4.0))),
"deblur_amount": max(0.2, min(2.0, _option_float(opts, "deblur_amount", 0.8))),
"focus_profile": focus_profile,
"task_hint": task_hint,
"text_focus": text_focus,
}
@lru_cache(maxsize=1)
def _face_detector():
if cv2 is None:
return None
cascade_path = Path(cv2.data.haarcascades) / "haarcascade_frontalface_default.xml"
detector = cv2.CascadeClassifier(str(cascade_path))
if detector.empty():
return None
return detector
def _detect_face_boxes(frame_bgr, limit: int = 8) -> List[Tuple[int, int, int, int]]:
if cv2 is None:
return []
detector = _face_detector()
if detector is None:
return []
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
scale_factor = float(os.getenv("AURORA_HAAR_SCALE", "1.05"))
min_neighbors = int(os.getenv("AURORA_HAAR_MIN_NEIGHBORS", "2"))
min_face = int(os.getenv("AURORA_HAAR_MIN_FACE", "15"))
eq = cv2.equalizeHist(gray)
found = detector.detectMultiScale(
eq,
scaleFactor=scale_factor,
minNeighbors=min_neighbors,
minSize=(min_face, min_face),
)
boxes: List[Tuple[int, int, int, int]] = []
for (x, y, w, h) in found:
boxes.append((int(x), int(y), int(w), int(h)))
boxes.sort(key=lambda item: item[2] * item[3], reverse=True)
return boxes[: max(1, limit)]
def _expand_roi(
x: int,
y: int,
w: int,
h: int,
frame_w: int,
frame_h: int,
pad_ratio: float = 0.28,
) -> Tuple[int, int, int, int]:
pad_x = int(w * pad_ratio)
pad_y = int(h * pad_ratio)
x1 = max(0, x - pad_x)
y1 = max(0, y - pad_y)
x2 = min(frame_w, x + w + pad_x)
y2 = min(frame_h, y + h + pad_y)
return x1, y1, x2, y2
def _pre_denoise_frame(frame_bgr, previous_denoised, strength: float, temporal: bool):
if cv2 is None:
return frame_bgr, previous_denoised
h_val = float(max(1.0, min(15.0, strength)))
denoised = cv2.fastNlMeansDenoisingColored(frame_bgr, None, h_val, h_val, 7, 21)
if temporal and previous_denoised is not None:
try:
alpha = float(os.getenv("AURORA_TEMPORAL_DENOISE_ALPHA", "0.18"))
except Exception:
alpha = 0.18
alpha = max(0.05, min(0.40, alpha))
denoised = cv2.addWeighted(denoised, 1.0 - alpha, previous_denoised, alpha, 0.0)
return denoised, denoised
def _deblur_unsharp(frame_bgr, amount: float):
if cv2 is None:
return frame_bgr
amt = max(0.2, min(2.0, float(amount)))
blurred = cv2.GaussianBlur(frame_bgr, (0, 0), sigmaX=1.2, sigmaY=1.2)
sharpened = cv2.addWeighted(frame_bgr, 1.0 + amt, blurred, -amt, 0.0)
return sharpened
def _patch_sharpness(patch) -> float:
if cv2 is None:
return 0.0
gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
return float(cv2.Laplacian(gray, cv2.CV_64F).var())
def _patch_diff(original_patch, candidate_patch) -> float:
if np is None:
return 0.0
base = original_patch.astype(np.float32)
cand = candidate_patch.astype(np.float32)
return float(np.mean(np.abs(base - cand)))
def _compact_error_text(exc: Exception, limit: int = 220) -> str:
text = str(exc).replace("\n", " ").strip()
if len(text) <= limit:
return text
return text[: max(0, limit - 3)] + "..."
def _is_mps_conv_override_error(exc: Exception) -> bool:
text = str(exc).lower()
return "convolution_overrideable not implemented" in text
def _sr_soft_fallback(
enhanced_img,
requested_outscale: int,
) -> Tuple[object, int, str]:
"""Soft fallback when Real-ESRGAN fails on MPS for very large frames.
Keeps face-restored frame and optionally performs lightweight resize if the
target output is still within sane pixel bounds.
"""
if cv2 is None:
return enhanced_img, 1, "keep_face_enhanced"
try:
max_pixels = int(float(os.getenv("AURORA_SR_SOFT_FALLBACK_MAX_PIXELS", "12000000")))
except Exception:
max_pixels = 12_000_000
max_pixels = max(1_000_000, max_pixels)
src_h, src_w = enhanced_img.shape[:2]
if requested_outscale <= 1:
return enhanced_img, 1, "keep_face_enhanced"
target_w = max(1, int(src_w * requested_outscale))
target_h = max(1, int(src_h * requested_outscale))
target_pixels = target_w * target_h
if target_pixels <= max_pixels:
resized = cv2.resize(enhanced_img, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
return resized, requested_outscale, "lanczos_resize"
return enhanced_img, 1, "keep_face_enhanced"
def _safe_ocr_score(patch) -> float:
# Optional OCR hint for plate/text clarity loop; returns 0 when unavailable.
if not _pytesseract_available():
return 0.0
try:
import pytesseract # type: ignore[import-untyped]
except Exception:
return 0.0
if cv2 is None:
return 0.0
try:
gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
payload = pytesseract.image_to_data(
gray,
output_type=pytesseract.Output.DICT,
config="--psm 7 --oem 1",
)
confs = [float(v) for v in payload.get("conf", []) if str(v).strip() not in {"", "-1"}]
if not confs:
return 0.0
return max(0.0, min(1.0, sum(confs) / (len(confs) * 100.0)))
except Exception:
return 0.0
@lru_cache(maxsize=1)
def _codeformer_available() -> bool:
try:
importlib.import_module("codeformer")
return True
except Exception:
return False
@lru_cache(maxsize=1)
def _pytesseract_available() -> bool:
try:
importlib.import_module("pytesseract")
return True
except Exception:
return False
def _face_candidate_score(original_patch, candidate_patch) -> float:
sharpness_orig = _patch_sharpness(original_patch)
sharpness_new = _patch_sharpness(candidate_patch)
sharpness_gain = sharpness_new / max(1.0, sharpness_orig)
faces_new = len(_detect_face_boxes(candidate_patch, limit=2))
face_factor = 1.0 + (0.35 * max(0, faces_new))
diff_penalty = _patch_diff(original_patch, candidate_patch) / 255.0
ocr_bonus = _safe_ocr_score(candidate_patch)
return (sharpness_gain * face_factor) + (0.18 * ocr_bonus) - (0.22 * diff_penalty)
def _requested_outscale(export_options: Optional[Dict[str, object]], width: int, height: int) -> int:
opts = export_options or {}
max_outscale = _clamp_int(int(os.getenv("AURORA_MAX_OUTSCALE", "4")), 1, 4)
raw_upscale = opts.get("upscale")
if raw_upscale is None:
# Compatibility alias used by console UI.
raw_upscale = opts.get("outscale")
if raw_upscale is not None:
try:
return _clamp_int(int(raw_upscale), 1, max_outscale)
except Exception:
pass
requested_w: Optional[int] = None
requested_h: Optional[int] = None
# Explicit width/height override.
try:
if opts.get("width") is not None and opts.get("height") is not None:
requested_w = int(opts.get("width") or 0)
requested_h = int(opts.get("height") or 0)
except Exception:
requested_w = None
requested_h = None
# Resolution profile override.
res = str(opts.get("resolution") or "").strip().lower()
if requested_w is None or requested_h is None:
if res in {"4k", "2160p"}:
requested_w, requested_h = 3840, 2160
elif res in {"8k", "4320p"}:
requested_w, requested_h = 7680, 4320
elif "x" in res:
try:
w_txt, h_txt = res.split("x", 1)
requested_w, requested_h = int(w_txt), int(h_txt)
except Exception:
requested_w, requested_h = None, None
if not requested_w or not requested_h or requested_w <= 0 or requested_h <= 0:
return 1
scale = max(requested_w / max(1, width), requested_h / max(1, height))
if scale <= 1.1:
return 1
if scale <= 2.1:
return _clamp_int(2, 1, max_outscale)
if scale <= 3.1:
return _clamp_int(3, 1, max_outscale)
return _clamp_int(4, 1, max_outscale)
def _decide_outscale(mode: AuroraMode, frame_bgr, export_options: Optional[Dict[str, object]] = None) -> int:
h, w = frame_bgr.shape[:2]
opts = export_options or {}
requested_outscale = _requested_outscale(opts, w, h)
max_outscale = _clamp_int(int(os.getenv("AURORA_MAX_OUTSCALE", "4")), 1, 4)
raw_upscale = opts.get("upscale")
if raw_upscale is None:
raw_upscale = opts.get("outscale")
has_explicit_upscale = raw_upscale is not None
if mode == "tactical":
# Tactical defaults to readability, not synthetic upscaling.
return requested_outscale if requested_outscale > 1 else 1
if requested_outscale <= 1 and not has_explicit_upscale and _option_bool(opts, "auto_forensic_outscale", True):
# Default forensic processing can upscale even without explicit user width/height.
forensic_default = _clamp_int(int(os.getenv("AURORA_FORENSIC_DEFAULT_OUTSCALE", "2")), 1, max_outscale)
requested_outscale = forensic_default
if requested_outscale <= 1:
# Keep source resolution only when forensic auto-upscale is disabled.
return 1
device = _ModelCache._device()
megapixels = (h * w) / 1_000_000.0
max_cpu_mp_for_x2 = float(os.getenv("AURORA_CPU_MAX_MP_FOR_X2", "0.8"))
if device == "cpu" and megapixels > max_cpu_mp_for_x2:
# Keep forensic job stable on CPU for HD+ inputs (avoid OOM + heavy artifacts).
return 1
return requested_outscale
def _enhance_frame_bgr(
frame_bgr,
mode: AuroraMode,
media_type: MediaType,
priority: str = "balanced",
export_options: Optional[Dict[str, object]] = None,
previous_denoised=None,
) -> Tuple[object, int, int, int, int, Dict[str, object], object]:
if cv2 is None:
raise RuntimeError("opencv-python-headless is not installed")
gfpganer = _ModelCache.gfpgan(mode)
realesrganer = _ModelCache.realesrgan(mode)
cfg = _face_pipeline_config(
mode=mode,
media_type=media_type,
priority=priority,
export_options=export_options,
)
source_frame = frame_bgr
if bool(cfg["pre_denoise"]):
frame_bgr, previous_denoised = _pre_denoise_frame(
frame_bgr,
previous_denoised=previous_denoised,
strength=float(cfg["denoise_strength"]),
temporal=bool(cfg["temporal_denoise"]),
)
if bool(cfg["deblur_before_face"]):
frame_bgr = _deblur_unsharp(frame_bgr, amount=float(cfg["deblur_amount"]))
outscale = _decide_outscale(mode, frame_bgr, export_options=export_options)
opts = export_options or {}
raw_upscale = opts.get("upscale")
if raw_upscale is None:
raw_upscale = opts.get("outscale")
allow_roi_upscale = _option_bool(opts, "allow_roi_upscale", False) or _option_bool(opts, "max_face_quality", False)
if bool(cfg["roi_only_faces"]) and not allow_roi_upscale and raw_upscale is None:
outscale = 1
try:
tactical_weight = float(os.getenv("AURORA_GFPGAN_WEIGHT_TACTICAL", "0.35"))
except Exception:
tactical_weight = 0.35
try:
forensic_weight = float(os.getenv("AURORA_GFPGAN_WEIGHT_FORENSIC", "0.65"))
except Exception:
forensic_weight = 0.65
face_weight = max(0.0, min(1.0, tactical_weight if mode == "tactical" else forensic_weight))
requested_model = str(cfg["face_model"])
codeformer_available = _codeformer_available()
if requested_model == "auto":
requested_model = "codeformer" if codeformer_available else "gfpgan"
gfpgan_face_size = 512
def _force_enhance_roi(patch, weight: float):
"""Force face restoration on a patch where Haar found a face but RetinaFace did not.
Upscale to 512px, run GFPGAN in aligned mode, then resize back."""
h_p, w_p = patch.shape[:2]
aligned = cv2.resize(patch, (gfpgan_face_size, gfpgan_face_size), interpolation=cv2.INTER_CUBIC)
cropped_faces, _, restored = gfpganer.enhance(
aligned, has_aligned=True, only_center_face=True, paste_back=False,
weight=max(0.0, min(1.0, weight)),
)
if cropped_faces:
result = cropped_faces[0]
elif restored is not None:
result = restored
else:
result = aligned
return cv2.resize(result, (w_p, h_p), interpolation=cv2.INTER_AREA)
def _run_gfpgan(candidate_input, candidate_weight: float, *, force_aligned: bool = False):
t_local = time.perf_counter()
w = max(0.0, min(1.0, candidate_weight))
if force_aligned:
local_restored = _force_enhance_roi(candidate_input, w)
elapsed = int((time.perf_counter() - t_local) * 1000)
return local_restored, 1, elapsed, "GFPGAN v1.4 (forced-align)"
_, local_faces, local_restored = gfpganer.enhance(
candidate_input, has_aligned=False, only_center_face=False, paste_back=True, weight=w,
)
if len(local_faces) == 0:
local_restored = _force_enhance_roi(candidate_input, w)
elapsed = int((time.perf_counter() - t_local) * 1000)
return local_restored, 1, elapsed, "GFPGAN v1.4 (forced-align)"
elapsed = int((time.perf_counter() - t_local) * 1000)
return local_restored, len(local_faces), elapsed, "GFPGAN v1.4"
def _run_codeformer_or_fallback(candidate_input, candidate_weight: float, *, force_aligned: bool = False):
t_local = time.perf_counter()
w = max(0.0, min(1.0, candidate_weight))
if force_aligned:
local_restored = _force_enhance_roi(candidate_input, w)
local_restored = cv2.detailEnhance(local_restored, sigma_s=12, sigma_r=0.15)
elapsed = int((time.perf_counter() - t_local) * 1000)
return local_restored, 1, elapsed, "CodeFormer(forced-align+detailEnhance)"
_, local_faces, local_restored = gfpganer.enhance(
candidate_input, has_aligned=False, only_center_face=False, paste_back=True, weight=w,
)
if len(local_faces) == 0:
local_restored = _force_enhance_roi(candidate_input, w)
local_restored = cv2.detailEnhance(local_restored, sigma_s=12, sigma_r=0.15)
face_count = len(local_faces) if local_faces else 1
elapsed = int((time.perf_counter() - t_local) * 1000)
return local_restored, face_count, elapsed, "CodeFormer(fallback-detailEnhance)"
run_face_model = _run_gfpgan if requested_model == "gfpgan" else _run_codeformer_or_fallback
model_label_used = "GFPGAN v1.4"
roi_faces_processed = 0
candidate_evals = 0
score_loop_enabled = bool(cfg["score_loop"])
t_face = time.perf_counter()
if bool(cfg["roi_only_faces"]):
enhanced_img = frame_bgr.copy()
frame_h, frame_w = frame_bgr.shape[:2]
boxes = _detect_face_boxes(frame_bgr, limit=8)
for (bx, by, bw, bh) in boxes:
x1, y1, x2, y2 = _expand_roi(bx, by, bw, bh, frame_w, frame_h)
original_patch = frame_bgr[y1:y2, x1:x2]
if original_patch.size == 0:
continue
candidates: List[Tuple[float, object, int, str]] = []
candidate_weights = [face_weight]
if score_loop_enabled:
candidate_weights.append(max(0.0, min(1.0, face_weight - 0.18)))
for w_candidate in candidate_weights:
restored_patch, faces_count, _, model_name = run_face_model(original_patch, w_candidate)
score = _face_candidate_score(original_patch, restored_patch)
candidates.append((score, restored_patch, faces_count, model_name))
candidate_evals += 1
candidates.sort(key=lambda item: item[0], reverse=True)
best_score, best_patch, best_faces, best_model = candidates[0]
del best_score
model_label_used = best_model
roi_faces_processed += best_faces
blended = cv2.addWeighted(best_patch, 0.88, original_patch, 0.12, 0.0)
enhanced_img[y1:y2, x1:x2] = blended
else:
candidate_weights = [face_weight]
if score_loop_enabled and media_type == "photo":
candidate_weights.append(max(0.0, min(1.0, face_weight - 0.18)))
candidates_full: List[Tuple[float, object, int, str]] = []
for w_candidate in candidate_weights:
restored_img, restored_faces_count, _, model_name = run_face_model(frame_bgr, w_candidate)
score = _face_candidate_score(source_frame, restored_img)
candidates_full.append((score, restored_img, restored_faces_count, model_name))
candidate_evals += 1
candidates_full.sort(key=lambda item: item[0], reverse=True)
_, enhanced_img, roi_faces_processed, model_label_used = candidates_full[0]
if roi_faces_processed == 0:
haar_boxes = _detect_face_boxes(frame_bgr, limit=16)
roi_faces_processed = len(haar_boxes)
face_ms = int((time.perf_counter() - t_face) * 1000)
requested_outscale = int(max(1, outscale))
effective_outscale = requested_outscale
sr_fallback_used = False
sr_fallback_method: Optional[str] = None
sr_fallback_reason: Optional[str] = None
sr_model_used = "Real-ESRGAN x4plus"
t_sr = time.perf_counter()
try:
upscaled_img, _ = realesrganer.enhance(enhanced_img, outscale=requested_outscale)
except Exception as sr_exc:
soft_fallback_enabled = _option_bool(opts, "sr_soft_fallback", _env_flag("AURORA_SR_SOFT_FALLBACK", True))
device = _ModelCache._device()
if not (soft_fallback_enabled and device == "mps" and _is_mps_conv_override_error(sr_exc)):
raise
upscaled_img, effective_outscale, sr_fallback_method = _sr_soft_fallback(
enhanced_img,
requested_outscale,
)
sr_fallback_used = True
sr_fallback_reason = _compact_error_text(sr_exc, limit=260)
sr_model_used = f"soft-fallback:{sr_fallback_method}"
logger.warning(
"SR soft fallback enabled on MPS device=%s requested_outscale=%d effective_outscale=%d reason=%s",
device,
requested_outscale,
effective_outscale,
sr_fallback_reason,
)
if bool(cfg.get("text_focus")):
upscaled_img = _deblur_unsharp(upscaled_img, amount=max(0.9, float(cfg.get("deblur_amount") or 1.0)))
sr_ms = int((time.perf_counter() - t_sr) * 1000)
return upscaled_img, roi_faces_processed, face_ms, sr_ms, effective_outscale, {
"roi_only_faces": bool(cfg["roi_only_faces"]),
"pre_denoise": bool(cfg["pre_denoise"]),
"temporal_denoise": bool(cfg["temporal_denoise"]),
"deblur_before_face": bool(cfg["deblur_before_face"]),
"score_loop": score_loop_enabled,
"face_model_requested": str(cfg["face_model"]),
"face_model_used": model_label_used,
"codeformer_available": codeformer_available,
"candidate_evaluations": candidate_evals,
"focus_profile": str(cfg.get("focus_profile") or "auto"),
"task_hint": str(cfg.get("task_hint") or ""),
"text_focus": bool(cfg.get("text_focus")),
"sr_model_used": sr_model_used,
"sr_requested_outscale": requested_outscale,
"effective_outscale": effective_outscale,
"sr_fallback_used": sr_fallback_used,
"sr_fallback_method": sr_fallback_method,
"sr_fallback_reason": sr_fallback_reason,
}, previous_denoised
def _probe_fps(input_path: Path) -> float:
value = _run_command(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=r_frame_rate",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(input_path),
]
)
fraction = Fraction(value.strip())
if fraction.numerator == 0:
return 25.0
return float(fraction)
def _select_video_encoder(mode: AuroraMode, export_options: Optional[Dict[str, object]]) -> str:
override = str(os.getenv("AURORA_FFMPEG_VIDEO_ENCODER", "")).strip()
if override:
return override
opts = export_options or {}
requested_encoder = str(opts.get("encoder") or "").strip().lower()
if requested_encoder:
aliases = {
"x264": "libx264",
"h264": "libx264",
"x265": "libx265",
"h265": "libx265",
"hevc": "libx265",
}
normalized_encoder = aliases.get(requested_encoder, requested_encoder)
if normalized_encoder == "auto":
normalized_encoder = ""
if normalized_encoder:
if _ffmpeg_has_encoder(normalized_encoder):
return normalized_encoder
logger.warning(
"Requested encoder '%s' is unavailable, falling back to auto selection",
normalized_encoder,
)
requested_format = str(opts.get("format") or "").strip().lower()
wants_h265 = requested_format in {"mp4_h265", "h265", "hevc"}
enable_vtb = _env_flag("AURORA_ENABLE_VIDEOTOOLBOX", True)
if enable_vtb:
if wants_h265 and _ffmpeg_has_encoder("hevc_videotoolbox"):
return "hevc_videotoolbox"
if _ffmpeg_has_encoder("h264_videotoolbox"):
return "h264_videotoolbox"
if wants_h265 and _ffmpeg_has_encoder("libx265"):
return "libx265"
return "libx264"
def _is_video_encode_failure(exc: Exception) -> bool:
text = str(exc).lower()
return (
"broken pipe" in text
or "video encode failed" in text
or "encode pipe broken" in text
or "error while opening encoder" in text
)
def _should_retry_with_libx264(exc: Exception, export_options: Optional[Dict[str, object]]) -> bool:
if not _is_video_encode_failure(exc):
return False
opts = export_options or {}
requested = str(opts.get("encoder") or "").strip().lower()
if requested in {"libx264"}:
return False
return True
def _extract_video_frames(input_path: Path, output_pattern: Path) -> str:
use_vtb_decode = _env_flag("AURORA_ENABLE_VIDEOTOOLBOX", True) and _ffmpeg_has_hwaccel("videotoolbox")
hwaccel_used = "none"
if use_vtb_decode:
try:
_run_command(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-hwaccel",
"videotoolbox",
"-i",
str(input_path),
str(output_pattern),
]
)
hwaccel_used = "videotoolbox"
return hwaccel_used
except Exception:
hwaccel_used = "fallback_cpu"
_run_command(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-i",
str(input_path),
str(output_pattern),
]
)
return hwaccel_used
def _compose_video(
processed_frames_dir: Path,
source_video: Path,
output_video: Path,
fps: float,
mode: AuroraMode,
export_options: Optional[Dict[str, object]] = None,
) -> str:
crf = "22" if mode == "tactical" else "18"
encoder = _select_video_encoder(mode, export_options)
common = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-framerate",
f"{fps:.6f}",
"-i",
str(processed_frames_dir / "%08d.png"),
"-i",
str(source_video),
"-map",
"0:v:0",
"-map",
"1:a?",
"-c:v",
encoder,
"-pix_fmt",
"yuv420p",
"-shortest",
"-movflags",
"+faststart",
]
if encoder in {"libx264", "libx265"}:
common.extend(
[
"-preset",
os.getenv("AURORA_FFMPEG_PRESET", "medium"),
"-crf",
crf,
]
)
elif encoder == "h264_videotoolbox":
common.extend(["-q:v", os.getenv("AURORA_VTB_H264_QUALITY", "65")])
elif encoder == "hevc_videotoolbox":
common.extend(["-q:v", os.getenv("AURORA_VTB_HEVC_QUALITY", "60")])
try:
_run_command(common + ["-c:a", "copy", str(output_video)])
except RuntimeError:
_run_command(common + ["-c:a", "aac", "-b:a", "192k", str(output_video)])
return encoder
def _probe_video_info(input_path: Path) -> Dict[str, Any]:
"""Probe video metadata: fps, dimensions, frame count."""
out = _run_command([
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", str(input_path),
])
data = json.loads(out)
vs = next((s for s in data.get("streams", []) if s.get("codec_type") == "video"), {})
w = int(vs.get("width", 0))
h = int(vs.get("height", 0))
fps_str = vs.get("r_frame_rate") or vs.get("avg_frame_rate") or "25/1"
try:
fps_val = float(Fraction(fps_str))
except Exception:
fps_val = 25.0
nb = int(vs.get("nb_frames", 0))
if not nb:
dur = float(data.get("format", {}).get("duration", 0))
nb = max(1, int(dur * fps_val))
return {"fps": fps_val, "width": w, "height": h, "total_frames": nb}
def _frames_similar(prev_thumb, curr_thumb, threshold: float = 8.0) -> bool:
"""Fast scene change detection on pre-downsampled thumbnails (64x64).
Mean absolute pixel difference on 0-255 scale.
threshold 8.0 catches scene changes while ignoring compression noise.
For surveillance video most consecutive frames score < 3.0.
"""
if np is None:
return False
diff = float(np.mean(np.abs(
prev_thumb.astype(np.float32) - curr_thumb.astype(np.float32)
)))
return diff < threshold
def _build_encode_pipe_cmd(
out_w: int,
out_h: int,
fps: float,
encoder: str,
mode: AuroraMode,
source_video: Path,
output_video: Path,
export_options: Optional[Dict[str, object]] = None,
) -> List[str]:
"""Build ffmpeg command that reads raw BGR frames from stdin and muxes with source audio."""
crf = "22" if mode == "tactical" else "18"
cmd = [
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
"-f", "rawvideo", "-pix_fmt", "bgr24",
"-s", f"{out_w}x{out_h}",
"-r", f"{fps:.6f}",
"-i", "pipe:0",
"-i", str(source_video),
"-map", "0:v:0", "-map", "1:a?",
"-c:v", encoder, "-pix_fmt", "yuv420p",
"-movflags", "+faststart",
]
if encoder in {"libx264", "libx265"}:
cmd.extend(["-preset", os.getenv("AURORA_FFMPEG_PRESET", "medium"), "-crf", crf])
elif encoder == "h264_videotoolbox":
cmd.extend(["-q:v", os.getenv("AURORA_VTB_H264_QUALITY", "65")])
elif encoder == "hevc_videotoolbox":
cmd.extend(["-q:v", os.getenv("AURORA_VTB_HEVC_QUALITY", "60")])
cmd.extend(["-c:a", "aac", "-b:a", "192k", str(output_video)])
return cmd
def _cleanup_pipes(*procs) -> None:
for p in procs:
if p is None:
continue
try:
if p.stdin and not p.stdin.closed:
p.stdin.close()
except Exception:
pass
try:
p.kill()
p.wait(timeout=5)
except Exception:
pass
def _visual_pipeline_video(
*,
input_path: Path,
output_dir: Path,
mode: AuroraMode,
priority: str,
export_options: Optional[Dict[str, object]],
cancel_check: Optional[Callable[[], bool]],
stage_progress: Optional[Callable[[float, str], None]],
) -> Tuple[Path, Dict[str, object]]:
"""Optimized video pipeline: pipe decode → scene skip → pipe encode.
v2 optimizations (zero disk I/O for intermediate frames):
- ffmpeg decode → stdout pipe → numpy (no PNG extraction to disk)
- Scene detection: skip unchanged frames (huge win for surveillance)
- numpy → stdin pipe → ffmpeg encode (no PNG write for output frames)
- VideoToolbox HW decode/encode when available on macOS
"""
if cv2 is None:
raise RuntimeError("opencv-python-headless is not installed")
if not _ffmpeg_available():
raise RuntimeError("ffmpeg/ffprobe is not installed")
info = _probe_video_info(input_path)
src_w, src_h, fps = info["width"], info["height"], info["fps"]
est_total = info["total_frames"]
if src_w == 0 or src_h == 0:
raise RuntimeError(f"Cannot determine video dimensions: {input_path.name}")
# Scene detection config (quality-first defaults; opt-in from env/export options)
opts = export_options or {}
scene_skip_on = _option_bool(opts, "scene_skip", _env_flag("AURORA_SCENE_SKIP", True))
scene_thresh_default = float(os.getenv("AURORA_SCENE_THRESHOLD", "4.0"))
scene_thresh = max(0.5, min(64.0, _option_float(opts, "scene_threshold", scene_thresh_default)))
scene_skip_max_ratio = max(
0.0,
min(0.95, _option_float(opts, "scene_skip_max_ratio", float(os.getenv("AURORA_SCENE_SKIP_MAX_RATIO", "0.35")))),
)
_THUMB = 64
# --- Decode pipe (VideoToolbox HW accel when available) ---
use_vtb = (
_env_flag("AURORA_ENABLE_VIDEOTOOLBOX", True)
and _ffmpeg_has_hwaccel("videotoolbox")
)
dec_cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
if use_vtb:
dec_cmd.extend(["-hwaccel", "videotoolbox"])
dec_cmd.extend([
"-i", str(input_path),
"-f", "rawvideo", "-pix_fmt", "bgr24", "pipe:1",
])
decode_proc = subprocess.Popen(dec_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
decode_accel = "videotoolbox" if use_vtb else "cpu"
frame_bytes = src_w * src_h * 3
if stage_progress:
skip_hint = f"scene-skip={'on' if scene_skip_on else 'off'}"
if scene_skip_on:
skip_hint += f", thr={scene_thresh:.2f}, max={int(scene_skip_max_ratio * 100)}%"
stage_progress(0.02, f"pipe decode started ({est_total} est. frames, accel={decode_accel}, {skip_hint})")
# Stats accumulators
total_faces = 0
total_face_ms = 0
total_sr_ms = 0
effective_outscale = 1
roi_only_frames = 0
candidates_evaluated_total = 0
face_model_used = "GFPGAN v1.4"
sr_model_used = "Real-ESRGAN x4plus"
sr_fallback_frames = 0
sr_fallback_method = ""
sr_fallback_reason = ""
frames_skipped = 0
previous_denoised = None
focus_profile_used = "auto"
task_hint_used = ""
text_focus_enabled = False
# Encode pipe — started after first frame reveals output dimensions
encode_proc: Optional[subprocess.Popen] = None
output_path = output_dir / f"{input_path.stem}_aurora_visual.mp4"
encoder = "unknown"
progress_every = max(1, est_total // 120)
t_loop = time.perf_counter()
idx = 0
prev_thumb = None
prev_enhanced = None
# Read-ahead buffer: overlap decode I/O with GPU inference
_READAHEAD = int(os.getenv("AURORA_READAHEAD_FRAMES", "4"))
frame_q: queue.Queue = queue.Queue(maxsize=_READAHEAD)
reader_error: List[Optional[Exception]] = [None]
def _reader():
try:
while True:
raw = decode_proc.stdout.read(frame_bytes)
if len(raw) < frame_bytes:
frame_q.put(None)
break
frame_q.put(raw)
except Exception as exc:
reader_error[0] = exc
frame_q.put(None)
reader_thread = threading.Thread(target=_reader, daemon=True)
reader_thread.start()
try:
while True:
if cancel_check and cancel_check():
raise PipelineCancelledError("Video processing cancelled")
raw = frame_q.get(timeout=60)
if raw is None:
if reader_error[0]:
raise reader_error[0]
break
idx += 1
frame = np.frombuffer(raw, dtype=np.uint8).reshape(src_h, src_w, 3).copy()
# --- Scene detection: skip if nearly identical to previous ---
curr_thumb = cv2.resize(frame, (_THUMB, _THUMB))
skip_this = False
if scene_skip_on and prev_thumb is not None and prev_enhanced is not None:
projected_skip_ratio = (frames_skipped + 1) / max(1, idx)
if projected_skip_ratio <= scene_skip_max_ratio and _frames_similar(prev_thumb, curr_thumb, scene_thresh):
skip_this = True
frames_skipped += 1
prev_thumb = curr_thumb
if skip_this:
enhanced = prev_enhanced
else:
enhanced, faces, face_ms, sr_ms, outscale, details, previous_denoised = (
_enhance_frame_bgr(
frame, mode, media_type="video", priority=priority,
export_options=export_options,
previous_denoised=previous_denoised,
)
)
try:
effective_outscale = int(details.get("effective_outscale") or outscale)
except Exception:
effective_outscale = outscale
total_faces += faces
total_face_ms += face_ms
total_sr_ms += sr_ms
if bool(details.get("roi_only_faces")):
roi_only_frames += 1
candidates_evaluated_total += int(details.get("candidate_evaluations") or 0)
face_model_used = str(details.get("face_model_used") or face_model_used)
focus_profile_used = str(details.get("focus_profile") or focus_profile_used)
maybe_task_hint = str(details.get("task_hint") or "").strip()
if maybe_task_hint:
task_hint_used = maybe_task_hint
text_focus_enabled = text_focus_enabled or bool(details.get("text_focus"))
sr_model_used = str(details.get("sr_model_used") or sr_model_used)
if bool(details.get("sr_fallback_used")):
sr_fallback_frames += 1
sr_fallback_method = str(details.get("sr_fallback_method") or sr_fallback_method)
if not sr_fallback_reason:
sr_fallback_reason = str(details.get("sr_fallback_reason") or "")
prev_enhanced = enhanced
# --- Start encode pipe after first frame (output size now known) ---
if encode_proc is None:
out_h, out_w = enhanced.shape[:2]
encoder = _select_video_encoder(mode, export_options)
enc_cmd = _build_encode_pipe_cmd(
out_w, out_h, fps, encoder, mode,
input_path, output_path, export_options,
)
encode_proc = subprocess.Popen(
enc_cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE,
)
try:
encode_proc.stdin.write(enhanced.tobytes())
except BrokenPipeError as exc:
stderr_text = ""
try:
if encode_proc:
try:
encode_proc.wait(timeout=1)
except Exception:
pass
if encode_proc and encode_proc.stderr:
stderr_text = (encode_proc.stderr.read() or b"").decode(errors="replace").strip()
except Exception:
stderr_text = ""
detail = (stderr_text or str(exc)).strip()
if len(detail) > 280:
detail = detail[:280]
raise RuntimeError(f"Video encode pipe broken ({encoder}): {detail}") from exc
# --- Progress ---
if stage_progress and (idx == 1 or idx % progress_every == 0):
elapsed = max(0.001, time.perf_counter() - t_loop)
fps_eff = idx / elapsed
eta_s = int(max(0, (est_total - idx) / max(0.01, fps_eff)))
skip_pct = int(100 * frames_skipped / max(1, idx))
stage_progress(
min(0.97, 0.02 + 0.93 * (idx / max(1, est_total))),
f"enhancing frame {idx}/{est_total} "
f"({fps_eff:.2f} fps, skip={skip_pct}%, eta ~{eta_s}s)",
)
# --- Finalize ---
reader_thread.join(timeout=30)
decode_proc.stdout.close()
decode_proc.wait(timeout=30)
if encode_proc:
encode_proc.stdin.close()
encode_proc.wait(timeout=300)
if encode_proc.returncode != 0:
stderr = (encode_proc.stderr.read() or b"").decode(errors="replace")
raise RuntimeError(f"Video encode failed ({encoder}): {stderr[:300]}")
if idx == 0:
raise RuntimeError("No frames decoded from input video")
except PipelineCancelledError:
_cleanup_pipes(decode_proc, encode_proc)
reader_thread.join(timeout=5)
raise
except Exception:
_cleanup_pipes(decode_proc, encode_proc)
reader_thread.join(timeout=5)
raise
if stage_progress:
skip_pct = int(100 * frames_skipped / max(1, idx))
stage_progress(1.0, f"completed ({idx} frames, {frames_skipped} skipped [{skip_pct}%], encode={encoder})")
return output_path, {
"frame_count": idx,
"faces_detected_total": total_faces,
"face_time_ms": total_face_ms,
"sr_time_ms": total_sr_ms,
"effective_outscale": effective_outscale,
"encoder": encoder,
"decode_accel": decode_accel,
"roi_only_frames": roi_only_frames,
"candidate_evaluations": candidates_evaluated_total,
"face_model_used": face_model_used,
"sr_model_used": sr_model_used,
"sr_fallback_frames": sr_fallback_frames,
"sr_fallback_method": sr_fallback_method,
"sr_fallback_reason": sr_fallback_reason,
"frames_skipped": frames_skipped,
"scene_skip_enabled": scene_skip_on,
"scene_threshold": scene_thresh,
"scene_skip_max_ratio": scene_skip_max_ratio,
"focus_profile": focus_profile_used,
"task_hint": task_hint_used,
"text_focus": text_focus_enabled,
}
def _visual_pipeline_photo(
*,
input_path: Path,
output_dir: Path,
mode: AuroraMode,
priority: str,
stage_progress: Optional[Callable[[float, str], None]],
export_options: Optional[Dict[str, object]] = None,
) -> Tuple[Path, Dict[str, object]]:
if cv2 is None:
raise RuntimeError("opencv-python-headless is not installed")
frame = cv2.imread(str(input_path), cv2.IMREAD_COLOR)
if frame is None:
raise RuntimeError(f"Cannot read image: {input_path.name}")
if stage_progress:
stage_progress(0.1, "processing image")
enhanced, faces, face_ms, sr_ms, outscale, details, _ = _enhance_frame_bgr(
frame,
mode,
media_type="photo",
priority=priority,
export_options=export_options,
)
ext = input_path.suffix.lower() or ".png"
if ext in {".jpg", ".jpeg"}:
ext = ".jpg"
elif ext not in {".jpg", ".jpeg", ".png", ".webp", ".tif", ".tiff"}:
ext = ".png"
output_path = output_dir / f"{input_path.stem}_aurora_visual{ext}"
cv2.imwrite(str(output_path), enhanced)
if stage_progress:
stage_progress(1.0, "image stage completed")
return output_path, {
"frame_count": 1,
"faces_detected_total": faces,
"face_time_ms": face_ms,
"sr_time_ms": sr_ms,
"effective_outscale": outscale,
"roi_only_frames": 1 if bool(details.get("roi_only_faces")) else 0,
"candidate_evaluations": int(details.get("candidate_evaluations") or 0),
"face_model_used": str(details.get("face_model_used") or "GFPGAN v1.4"),
"sr_model_used": str(details.get("sr_model_used") or "Real-ESRGAN x4plus"),
"sr_fallback_frames": 1 if bool(details.get("sr_fallback_used")) else 0,
"sr_fallback_method": str(details.get("sr_fallback_method") or ""),
"sr_fallback_reason": str(details.get("sr_fallback_reason") or ""),
}
class BaseSubagent:
name = "Base"
step_name = "noop"
model_by_mode: Dict[AuroraMode, str] = {
"tactical": "stub.fast",
"forensic": "stub.full",
}
stage_suffix = "noop"
sleep_seconds = 0.05
def run(self, ctx: SubagentContext, input_path: Path) -> SubagentRunResult:
t0 = time.perf_counter()
output_path = _copy_with_stage_suffix(input_path, ctx.output_dir, self.stage_suffix)
time.sleep(self.sleep_seconds)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
step = ProcessingStep(
step=self.step_name,
agent=self.name,
model=self.model_by_mode[ctx.mode],
time_ms=elapsed_ms,
)
return SubagentRunResult(output_path=output_path, steps=[step])
class ClarityAgent(BaseSubagent):
name = "Clarity"
step_name = "video_enhancement"
stage_suffix = "clarity"
model_by_mode = {
"tactical": "Real-ESRGAN(light)",
"forensic": "Real-ESRGAN(full)",
}
class VeraAgent(BaseSubagent):
name = "Vera"
step_name = "face_enhancement"
stage_suffix = "vera"
model_by_mode = {
"tactical": "GFPGAN/CodeFormer + Real-ESRGAN x4plus",
"forensic": "GFPGAN/CodeFormer + Real-ESRGAN x4plus(forensic)",
}
def run(self, ctx: SubagentContext, input_path: Path) -> SubagentRunResult:
t_start = time.perf_counter()
def _build_steps(
stats: Dict[str, object],
output_path: Path,
*,
encoder_retry: bool = False,
encoder_retry_reason: str = "",
) -> List[ProcessingStep]:
face_step = ProcessingStep(
step="face_enhancement",
agent=self.name,
model=str(stats.get("face_model_used") or "GFPGAN v1.4"),
time_ms=stats["face_time_ms"],
details={
"frames": stats["frame_count"],
"faces_detected_total": stats["faces_detected_total"],
"roi_only_frames": stats.get("roi_only_frames"),
"candidate_evaluations": stats.get("candidate_evaluations"),
},
)
sr_details = {
"frames": stats["frame_count"],
"output": output_path.name,
"effective_outscale": stats.get("effective_outscale", 1),
"encoder": stats.get("encoder"),
"decode_accel": stats.get("decode_accel"),
"frames_skipped": stats.get("frames_skipped"),
"scene_skip_enabled": stats.get("scene_skip_enabled"),
"scene_threshold": stats.get("scene_threshold"),
"scene_skip_max_ratio": stats.get("scene_skip_max_ratio"),
"focus_profile": stats.get("focus_profile"),
"task_hint": stats.get("task_hint"),
"text_focus": stats.get("text_focus"),
"sr_fallback_frames": stats.get("sr_fallback_frames", 0),
"sr_fallback_used": bool(stats.get("sr_fallback_frames", 0)),
"sr_fallback_method": stats.get("sr_fallback_method"),
"sr_fallback_reason": stats.get("sr_fallback_reason"),
}
if encoder_retry:
sr_details["encoder_retry"] = True
if encoder_retry_reason:
sr_details["encoder_retry_reason"] = encoder_retry_reason
sr_step = ProcessingStep(
step="super_resolution",
agent=self.name,
model=str(stats.get("sr_model_used") or "Real-ESRGAN x4plus"),
time_ms=stats["sr_time_ms"],
details=sr_details,
)
return [face_step, sr_step]
try:
if ctx.media_type == "video":
output_path, stats = _visual_pipeline_video(
input_path=input_path,
output_dir=ctx.output_dir,
mode=ctx.mode,
priority=ctx.priority,
export_options=ctx.export_options,
cancel_check=ctx.cancel_check,
stage_progress=ctx.stage_progress,
)
elif ctx.media_type == "photo":
output_path, stats = _visual_pipeline_photo(
input_path=input_path,
output_dir=ctx.output_dir,
mode=ctx.mode,
priority=ctx.priority,
stage_progress=ctx.stage_progress,
export_options=ctx.export_options,
)
else:
return super().run(ctx, input_path)
return SubagentRunResult(output_path=output_path, steps=_build_steps(stats, output_path))
except PipelineCancelledError:
raise
except Exception as exc:
retry_attempted = False
if ctx.media_type == "video" and _should_retry_with_libx264(exc, ctx.export_options):
retry_attempted = True
retry_reason = _compact_error_text(exc, limit=280)
retry_opts: Dict[str, object] = dict(ctx.export_options or {})
retry_opts["encoder"] = "libx264"
if ctx.stage_progress:
ctx.stage_progress(0.03, "encoder fallback: retry with libx264")
try:
output_path, stats = _visual_pipeline_video(
input_path=input_path,
output_dir=ctx.output_dir,
mode=ctx.mode,
priority=ctx.priority,
export_options=retry_opts,
cancel_check=ctx.cancel_check,
stage_progress=ctx.stage_progress,
)
return SubagentRunResult(
output_path=output_path,
steps=_build_steps(
stats,
output_path,
encoder_retry=True,
encoder_retry_reason=retry_reason,
),
)
except PipelineCancelledError:
raise
except Exception as retry_exc:
exc = RuntimeError(
f"{_compact_error_text(exc, limit=180)}; retry(libx264) failed: {_compact_error_text(retry_exc, limit=180)}"
)
fallback = _copy_with_stage_suffix(input_path, ctx.output_dir, self.stage_suffix)
elapsed_ms = int((time.perf_counter() - t_start) * 1000)
step = ProcessingStep(
step="face_enhancement",
agent=self.name,
model="GFPGAN/CodeFormer + Real-ESRGAN x4plus",
time_ms=elapsed_ms,
details={
"fallback_used": True,
"fallback_type": "copy_passthrough",
"reason": str(exc),
"encoder_retry_attempted": retry_attempted,
},
)
return SubagentRunResult(output_path=fallback, steps=[step])
def _alpr_instance():
"""Lazy-load fast-alpr ALPR instance (singleton)."""
if not hasattr(_alpr_instance, "_cached"):
try:
from fast_alpr import ALPR # type: ignore[import-untyped]
_alpr_instance._cached = ALPR(
detector_model="yolo-v9-t-384-license-plate-end2end",
ocr_model="global-plates-mobile-vit-v2-model",
)
except Exception as exc:
logger.warning("fast-alpr init failed (plates disabled): %s", exc)
_alpr_instance._cached = None
return _alpr_instance._cached
def _detect_plates_in_frame(frame_bgr) -> List[Dict[str, Any]]:
"""Return list of {text, confidence, bbox} for detected plates in frame."""
alpr = _alpr_instance()
if alpr is None or cv2 is None:
return []
try:
results = alpr.predict(frame_bgr)
plates = []
for r in results:
plates.append({
"text": r.ocr.text,
"confidence": round(float(r.ocr.confidence), 3),
"bbox": list(r.detection.bounding_box),
})
return plates
except Exception as exc:
logger.debug("ALPR frame error: %s", exc)
return []
def _enhance_plate_roi(frame_bgr, bbox, realesrganer) -> object:
"""Upscale plate region using Real-ESRGAN for sharper OCR."""
if cv2 is None or realesrganer is None:
return frame_bgr
try:
x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
h_f, w_f = frame_bgr.shape[:2]
pad = 8
x1 = max(0, x1 - pad); y1 = max(0, y1 - pad)
x2 = min(w_f, x2 + pad); y2 = min(h_f, y2 + pad)
patch = frame_bgr[y1:y2, x1:x2]
if patch.size == 0:
return frame_bgr
enhanced, _ = realesrganer.enhance(patch, outscale=2)
enhanced_resized = cv2.resize(enhanced, (x2 - x1, y2 - y1), interpolation=cv2.INTER_AREA)
result = frame_bgr.copy()
result[y1:y2, x1:x2] = enhanced_resized
return result
except Exception:
return frame_bgr
class PlateAgent(BaseSubagent):
"""ALPR agent: detect and OCR license plates, enhance plate ROIs."""
name = "PlateDetect"
step_name = "plate_detection"
stage_suffix = "plate"
model_by_mode = {
"tactical": "YOLO-v9 ALPR + fast-plate-ocr",
"forensic": "YOLO-v9 ALPR + fast-plate-ocr + RealESRGAN-plate-enhance",
}
def run(self, ctx: SubagentContext, input_path: Path) -> SubagentRunResult:
t0 = time.perf_counter()
alpr = _alpr_instance()
if alpr is None:
step = ProcessingStep(
step=self.step_name, agent=self.name,
model="fast-alpr (unavailable)", time_ms=0,
details={"plates_detected": 0, "skipped": True},
)
return SubagentRunResult(output_path=input_path, steps=[step])
media_type = ctx.media_type
all_plates: List[Dict[str, Any]] = []
unique_texts: Dict[str, Dict[str, Any]] = {}
frames_sampled = 0
if media_type == "video":
if cv2 is None:
step = ProcessingStep(
step=self.step_name, agent=self.name,
model=self.model_by_mode[ctx.mode], time_ms=0,
details={"plates_detected": 0, "skipped": True, "reason": "opencv not available"},
)
return SubagentRunResult(output_path=input_path, steps=[step])
cap = cv2.VideoCapture(str(input_path))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 15.0
sample_interval = max(1, int(fps * 2))
fn = 0
while True:
cap.set(cv2.CAP_PROP_POS_FRAMES, fn)
ret, frame = cap.read()
if not ret:
break
plates = _detect_plates_in_frame(frame)
frames_sampled += 1
if plates and ctx.mode == "forensic":
realesrganer = _ModelCache.realesrgan(ctx.mode)
for pl in plates:
frame = _enhance_plate_roi(frame, pl["bbox"], realesrganer)
updated = _detect_plates_in_frame(frame)
if updated:
plates = updated
for pl in plates:
all_plates.append({**pl, "frame": fn})
txt = (pl.get("text") or "").strip().upper()
if txt and (txt not in unique_texts or pl["confidence"] > unique_texts[txt]["confidence"]):
unique_texts[txt] = pl
fn += sample_interval
if ctx.cancel_check and ctx.cancel_check():
break
cap.release()
elif media_type == "photo":
if cv2 is None:
step = ProcessingStep(
step=self.step_name, agent=self.name,
model=self.model_by_mode[ctx.mode], time_ms=0,
details={"plates_detected": 0, "skipped": True},
)
return SubagentRunResult(output_path=input_path, steps=[step])
frame = cv2.imread(str(input_path), cv2.IMREAD_COLOR)
plates = _detect_plates_in_frame(frame)
frames_sampled = 1
if plates and ctx.mode == "forensic":
realesrganer = _ModelCache.realesrgan(ctx.mode)
for pl in plates:
frame = _enhance_plate_roi(frame, pl["bbox"], realesrganer)
updated = _detect_plates_in_frame(frame)
if updated:
plates = updated
for pl in plates:
all_plates.append(pl)
txt = (pl.get("text") or "").strip().upper()
if txt and (txt not in unique_texts or pl["confidence"] > unique_texts[txt]["confidence"]):
unique_texts[txt] = pl
report_path = ctx.output_dir / "plate_detections.json"
report_data = {
"job_id": ctx.job_id,
"frames_sampled": frames_sampled,
"plates_found": len(all_plates),
"unique_plates": len(unique_texts),
"detections": all_plates[:200],
"unique": list(unique_texts.values()),
}
report_path.write_text(json.dumps(report_data, ensure_ascii=False, indent=2), encoding="utf-8")
elapsed_ms = int((time.perf_counter() - t0) * 1000)
step = ProcessingStep(
step=self.step_name,
agent=self.name,
model=self.model_by_mode[ctx.mode],
time_ms=elapsed_ms,
details={
"plates_detected": len(all_plates),
"unique_plates": len(unique_texts),
"unique_texts": list(unique_texts.keys())[:20],
"frames_sampled": frames_sampled,
"report_file": report_path.name,
},
)
return SubagentRunResult(
output_path=input_path,
steps=[step],
artifacts=[report_path],
)
class EchoAgent(BaseSubagent):
name = "Echo"
step_name = "audio_forensics"
stage_suffix = "echo"
model_by_mode = {
"tactical": "Demucs+Whisper(small)",
"forensic": "Demucs+Whisper(large)+RawNet3",
}
def run(self, ctx: SubagentContext, input_path: Path) -> SubagentRunResult:
result = super().run(ctx, input_path)
transcript = ctx.output_dir / f"{input_path.stem}_echo_transcript.txt"
transcript.write_text(
"Transcript scaffold: replace with Whisper output.\n",
encoding="utf-8",
)
result.artifacts.append(transcript)
result.steps[0].details["transcript"] = transcript.name
return result
class PixisAgent(BaseSubagent):
name = "Pixis"
step_name = "photo_restoration"
stage_suffix = "pixis"
model_by_mode = {
"tactical": "SCUNet+SwinIR(light)",
"forensic": "SCUNet+SwinIR(full)+Real-ESRGAN",
}
class KoreAgent(BaseSubagent):
name = "Kore"
step_name = "forensic_verification"
stage_suffix = "kore"
model_by_mode = {
"tactical": "OpenSSL(light)",
"forensic": "OpenSSL+ChainOfCustody",
}
def run(self, ctx: SubagentContext, input_path: Path) -> SubagentRunResult:
t0 = time.perf_counter()
result_hash = sha256_file(input_path)
chain_of_custody = {
"job_id": ctx.job_id,
"mode": ctx.mode,
"media_type": ctx.media_type,
"input_hash": ctx.input_hash,
"result_hash": result_hash,
"timestamp_unix_ms": int(time.time() * 1000),
"pipeline": "frame -> pre_denoise -> deblur -> (roi/full) face_restore(gfpgan/codeformer) -> realesrgan",
"stages": ["Vera", "Kore"],
}
chain_path = ctx.output_dir / "forensic_log.json"
chain_path.write_text(
json.dumps(chain_of_custody, ensure_ascii=False, indent=2),
encoding="utf-8",
)
signature_raw = hashlib.sha256(
f"{ctx.input_hash}:{result_hash}:{ctx.job_id}".encode("utf-8")
).hexdigest()[:48]
digital_signature = f"ed25519:{signature_raw}"
signed_manifest = {
"signature": digital_signature,
"forensic_log": chain_path.name,
"result_hash": result_hash,
}
manifest_path = ctx.output_dir / "forensic_signature.json"
manifest_path.write_text(
json.dumps(signed_manifest, ensure_ascii=False, indent=2),
encoding="utf-8",
)
elapsed_ms = int((time.perf_counter() - t0) * 1000)
step = ProcessingStep(
step=self.step_name,
agent=self.name,
model=self.model_by_mode[ctx.mode],
time_ms=elapsed_ms,
details={
"forensic_log": chain_path.name,
"signature_manifest": manifest_path.name,
},
)
return SubagentRunResult(
output_path=input_path,
steps=[step],
artifacts=[chain_path, manifest_path],
metadata={
"digital_signature": digital_signature,
"result_hash": result_hash,
},
)