diff --git a/.env.example b/.env.example index b602add7..23ea95d6 100644 --- a/.env.example +++ b/.env.example @@ -59,6 +59,9 @@ DEEPSEEK_BASE_URL=https://api.deepseek.com # OpenAI API (optional) OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +# Anthropic Claude API (Sofiia agent) +ANTHROPIC_API_KEY=sk-ant-api03-xxxxxxxxxxxxxxxxxxxxxxxxxxxx + # Notion integration (optional) NOTION_API_KEY= NOTION_VERSION=2022-06-28 @@ -139,6 +142,14 @@ ENVIRONMENT=development # Enable debug mode (true/false) DEBUG=true +# ----------------------------------------------------------------------------- +# Aurora / Kling Integration +# ----------------------------------------------------------------------------- +KLING_ACCESS_KEY= +KLING_SECRET_KEY= +KLING_BASE_URL=https://api.klingai.com +KLING_TIMEOUT=60 + # ============================================================================= # SECRET GENERATION COMMANDS # ============================================================================= diff --git a/docker-compose.node2-sofiia.yml b/docker-compose.node2-sofiia.yml index 360dfdc4..06e39736 100644 --- a/docker-compose.node2-sofiia.yml +++ b/docker-compose.node2-sofiia.yml @@ -94,6 +94,10 @@ services: - AURORA_FORCE_CPU=false - AURORA_PREFER_MPS=true - AURORA_ENABLE_VIDEOTOOLBOX=true + - KLING_ACCESS_KEY=${KLING_ACCESS_KEY:-} + - KLING_SECRET_KEY=${KLING_SECRET_KEY:-} + - KLING_BASE_URL=${KLING_BASE_URL:-https://api.klingai.com} + - KLING_TIMEOUT=${KLING_TIMEOUT:-60} volumes: - sofiia-data:/data networks: diff --git a/services/aurora-service/README.md b/services/aurora-service/README.md new file mode 100644 index 00000000..18d89ab0 --- /dev/null +++ b/services/aurora-service/README.md @@ -0,0 +1,74 @@ +# Aurora Service + +`aurora-service` is a FastAPI scaffold for AISTALK media forensics workflows. + +## API + +- `POST /api/aurora/upload` (`multipart/form-data`) + - fields: `file`, `mode` (`tactical|forensic`) + - returns `job_id` +- `GET /api/aurora/status/{job_id}` +- `GET /api/aurora/jobs` +- `GET /api/aurora/result/{job_id}` +- `POST /api/aurora/cancel/{job_id}` +- `POST /api/aurora/delete/{job_id}` +- `GET /api/aurora/files/{job_id}/{file_name}` + +## Notes + +- Visual media (`video`, `photo`) run deterministic sequential enhancement with conservative defaults: + - `frame -> pre-denoise -> deblur -> face restore (GFPGAN / CodeFormer-style fallback) -> Real-ESRGAN` + - For `priority=faces`, pipeline can switch to ROI-only face processing (background preserved). + - Score-driven candidate selection is enabled for forensic face workflows. +- `audio` path remains scaffold (`Echo`) for now. +- Forensic mode adds chain-of-custody artifacts and signature metadata. +- Model weights are auto-downloaded to `AURORA_MODELS_DIR` on first run. + +## Local run + +```bash +cd services/aurora-service +pip install -r requirements.txt +uvicorn app.main:app --host 0.0.0.0 --port 9401 +``` + +## Native macOS run (Apple Silicon) + +```bash +cd services/aurora-service +./setup-native-macos.sh +./start-native-macos.sh +``` + +This profile enables: +- `AURORA_FORCE_CPU=false` +- `AURORA_PREFER_MPS=true` +- `AURORA_ENABLE_VIDEOTOOLBOX=true` + +### Runtime env vars + +- `AURORA_DATA_DIR` (default: `/data/aurora`) +- `AURORA_MODELS_DIR` (default: `/data/aurora/models`) +- `AURORA_FORCE_CPU` (default: `true`) +- `AURORA_PREFER_MPS` (default: `true`) +- `AURORA_ENABLE_VIDEOTOOLBOX` (default: `true`) +- `AURORA_FFMPEG_VIDEO_ENCODER` (optional override, e.g. `h264_videotoolbox`) +- `KLING_ACCESS_KEY` / `KLING_SECRET_KEY` (required for Kling endpoints) +- `KLING_BASE_URL` (default: `https://api.klingai.com`) +- `KLING_TIMEOUT` (default: `60`) + +## Autostart via launchd (macOS) + +```bash +cd services/aurora-service +./launchd/install-launchd.sh +``` + +Useful commands: + +```bash +./launchd/status-launchd.sh +./launchd/uninstall-launchd.sh +``` + +`install-launchd.sh` bootstraps the service immediately, so reboot is not required. diff --git a/services/aurora-service/app/kling.py b/services/aurora-service/app/kling.py new file mode 100644 index 00000000..2305bb02 --- /dev/null +++ b/services/aurora-service/app/kling.py @@ -0,0 +1,273 @@ +"""Kling AI API client for video generation and enhancement.""" +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import os +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +import urllib.request +import urllib.error + +logger = logging.getLogger(__name__) + +KLING_ACCESS_KEY = os.getenv("KLING_ACCESS_KEY", "").strip() +KLING_SECRET_KEY = os.getenv("KLING_SECRET_KEY", "").strip() +KLING_BASE_URL = os.getenv("KLING_BASE_URL", "https://api.klingai.com") +KLING_TIMEOUT = int(os.getenv("KLING_TIMEOUT", "60")) + + +def _kling_sign(access_key: str, secret_key: str) -> str: + """Generate Kling AI Bearer token via HMAC-SHA256 JWT-style signing.""" + import base64 + header = base64.urlsafe_b64encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()).rstrip(b"=").decode() + now = int(time.time()) + payload = base64.urlsafe_b64encode(json.dumps({ + "iss": access_key, + "exp": now + 1800, + "nbf": now - 5, + }).encode()).rstrip(b"=").decode() + msg = f"{header}.{payload}" + sig = hmac.new(secret_key.encode(), msg.encode(), hashlib.sha256).digest() + sig_b64 = base64.urlsafe_b64encode(sig).rstrip(b"=").decode() + return f"{msg}.{sig_b64}" + + +def _kling_headers() -> Dict[str, str]: + if not KLING_ACCESS_KEY or not KLING_SECRET_KEY: + raise RuntimeError( + "Kling credentials are not configured. Set KLING_ACCESS_KEY and KLING_SECRET_KEY." + ) + token = _kling_sign(KLING_ACCESS_KEY, KLING_SECRET_KEY) + return { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + + +def _kling_request(method: str, path: str, body: Optional[Dict] = None, timeout: int = KLING_TIMEOUT) -> Dict[str, Any]: + url = f"{KLING_BASE_URL}{path}" + data = json.dumps(body).encode() if body else None + req = urllib.request.Request(url, data=data, headers=_kling_headers(), method=method) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8") + return json.loads(raw) + except urllib.error.HTTPError as e: + err_body = e.read().decode("utf-8", errors="replace") + raise RuntimeError(f"Kling API {method} {path} → {e.code}: {err_body}") from e + + +def _kling_request_with_fallback( + method: str, + paths: List[str], + body: Optional[Dict] = None, + timeout: int = KLING_TIMEOUT, +) -> Dict[str, Any]: + """Try several endpoint variants to tolerate provider path drift/gateway prefixes.""" + last_error: Optional[str] = None + tried: List[str] = [] + for path in paths: + tried.append(path) + try: + return _kling_request(method, path, body=body, timeout=timeout) + except RuntimeError as e: + msg = str(e) + last_error = msg + # 404 likely means wrong endpoint path; try next candidate. + if "→ 404:" in msg: + continue + # Non-404 errors are usually actionable immediately. + raise + raise RuntimeError( + f"Kling API endpoint mismatch for {method}. Tried: {tried}. Last error: {last_error or 'unknown'}" + ) + + +# ── Video Enhancement (Video-to-Video) ────────────────────────────────────── + +def kling_video_enhance( + *, + video_url: Optional[str] = None, + video_id: Optional[str] = None, + prompt: str = "", + negative_prompt: str = "noise, blur, artifacts", + mode: str = "pro", + duration: str = "5", + cfg_scale: float = 0.5, + callback_url: Optional[str] = None, +) -> Dict[str, Any]: + """Submit a video-to-video enhancement task to Kling AI. + + Args: + video_url: Direct URL to input video. + video_id: Kling resource ID for previously uploaded video. + prompt: Text guidance for enhancement. + negative_prompt: Things to avoid. + mode: 'std' or 'pro'. + duration: '5' or '10' seconds. + cfg_scale: 0.0-1.0, how strongly to follow prompt. + callback_url: Webhook for completion notification. + + Returns: + Task response dict with task_id. + """ + if not video_url and not video_id: + raise ValueError("Either video_url or video_id must be provided") + + payload: Dict[str, Any] = { + "model": f"kling-v1-5", + "mode": mode, + "duration": duration, + "cfg_scale": cfg_scale, + "prompt": prompt, + "negative_prompt": negative_prompt, + } + if video_url: + payload["video_url"] = video_url + if video_id: + payload["video_id"] = video_id + if callback_url: + payload["callback_url"] = callback_url + + return _kling_request_with_fallback( + "POST", + ["/v1/videos/video2video", "/kling/v1/videos/video2video"], + body=payload, + ) + + +def kling_video_generate( + *, + image_url: Optional[str] = None, + image_id: Optional[str] = None, + prompt: str, + negative_prompt: str = "noise, blur, artifacts, distortion", + model: str = "kling-v1-5", + mode: str = "pro", + duration: str = "5", + cfg_scale: float = 0.5, + aspect_ratio: str = "16:9", + callback_url: Optional[str] = None, +) -> Dict[str, Any]: + """Generate video from image + prompt (image-to-video). + + Args: + image_url: Source still frame URL. + image_id: Kling resource ID for previously uploaded image. + prompt: Animation guidance. + model: 'kling-v1', 'kling-v1-5', 'kling-v1-6'. + mode: 'std' or 'pro'. + duration: '5' or '10'. + aspect_ratio: '16:9', '9:16', '1:1'. + """ + if not image_url and not image_id: + raise ValueError("Either image_url or image_id must be provided") + + payload: Dict[str, Any] = { + "model": model, + "mode": mode, + "duration": duration, + "cfg_scale": cfg_scale, + "prompt": prompt, + "negative_prompt": negative_prompt, + "aspect_ratio": aspect_ratio, + } + if image_url: + payload["image"] = {"type": "url", "url": image_url} + if image_id: + payload["image"] = {"type": "id", "id": image_id} + if callback_url: + payload["callback_url"] = callback_url + + return _kling_request_with_fallback( + "POST", + ["/v1/videos/image2video", "/kling/v1/videos/image2video"], + body=payload, + ) + + +def kling_task_status(task_id: str) -> Dict[str, Any]: + """Get status of any Kling task by ID.""" + return _kling_request_with_fallback( + "GET", + [f"/v1/tasks/{task_id}", f"/kling/v1/tasks/{task_id}"], + ) + + +def kling_video_task_status(task_id: str, endpoint: str = "video2video") -> Dict[str, Any]: + """Get status of a video task.""" + return _kling_request_with_fallback( + "GET", + [f"/v1/videos/{endpoint}/{task_id}", f"/kling/v1/videos/{endpoint}/{task_id}"], + ) + + +def kling_list_models() -> Dict[str, Any]: + """List available Kling models.""" + return _kling_request_with_fallback( + "GET", + ["/v1/models", "/kling/v1/models"], + ) + + +def kling_upload_file(file_path: Path) -> Dict[str, Any]: + """Upload a local file to Kling storage and return resource_id.""" + import base64 + with open(file_path, "rb") as f: + data = f.read() + b64 = base64.b64encode(data).decode() + suffix = file_path.suffix.lstrip(".").lower() + mime_map = { + "mp4": "video/mp4", "mov": "video/quicktime", "avi": "video/x-msvideo", + "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", + } + mime = mime_map.get(suffix, "application/octet-stream") + payload = { + "file": b64, + "file_name": file_path.name, + "content_type": mime, + } + return _kling_request_with_fallback( + "POST", + ["/v1/files/upload", "/v1/files", "/kling/v1/files/upload", "/kling/v1/files"], + body=payload, + timeout=120, + ) + + +def kling_poll_until_done( + task_id: str, + endpoint: str = "video2video", + max_wait_sec: int = 600, + poll_interval: int = 5, +) -> Dict[str, Any]: + """Poll Kling task until completed/failed or timeout.""" + start = time.time() + while True: + status_resp = kling_video_task_status(task_id, endpoint) + task = status_resp.get("data", {}) + state = task.get("task_status") or task.get("status") or "processing" + + if state in ("succeed", "completed", "failed", "error"): + return status_resp + + elapsed = time.time() - start + if elapsed >= max_wait_sec: + raise TimeoutError(f"Kling task {task_id} timed out after {max_wait_sec}s (last status: {state})") + + logger.debug("Kling task %s status=%s elapsed=%.0fs", task_id, state, elapsed) + time.sleep(poll_interval) + + +def kling_health_check() -> Dict[str, Any]: + """Quick connectivity check — returns status dict.""" + try: + resp = _kling_request("GET", "/v1/models", timeout=10) + return {"ok": True, "models": resp} + except Exception as exc: + return {"ok": False, "error": str(exc)} diff --git a/services/aurora-service/app/main.py b/services/aurora-service/app/main.py new file mode 100644 index 00000000..ebcd257f --- /dev/null +++ b/services/aurora-service/app/main.py @@ -0,0 +1,1327 @@ +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import os +import re +import shutil +import subprocess +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +from fastapi import Body, FastAPI, File, Form, HTTPException, Query, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse + +from .analysis import ( + analyze_photo, + analyze_video, + estimate_processing_seconds, + probe_video_metadata, +) +from .job_store import JobStore +from .langchain_scaffold import build_subagent_registry +from .orchestrator import AuroraOrchestrator, JobCancelledError +from .reporting import generate_forensic_report_pdf +from .schemas import AuroraMode, MediaType +from .subagents import runtime_diagnostics + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +DATA_DIR = Path(os.getenv("AURORA_DATA_DIR", "/data/aurora")) +PUBLIC_BASE_URL = os.getenv("AURORA_PUBLIC_BASE_URL", "http://localhost:9401").rstrip("/") +CORS_ORIGINS = os.getenv("AURORA_CORS_ORIGINS", "*") +RECOVERY_STRATEGY = os.getenv("AURORA_RECOVERY_STRATEGY", "requeue").strip().lower() + +VIDEO_EXTENSIONS = {".mp4", ".avi", ".mov", ".mkv", ".webm"} +AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".m4a", ".aac", ".ogg"} +PHOTO_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tiff", ".tif", ".webp"} +MAX_CONCURRENT_JOBS = max(1, int(os.getenv("AURORA_MAX_CONCURRENT_JOBS", "1"))) + +store = JobStore(DATA_DIR) +orchestrator = AuroraOrchestrator(store.outputs_dir, PUBLIC_BASE_URL) +RUN_SLOT = asyncio.Semaphore(MAX_CONCURRENT_JOBS) + +app = FastAPI( + title="Aurora Media Forensics Service", + description="AURORA tactical/forensic media pipeline scaffold for AISTALK", + version="0.1.0", +) + +if CORS_ORIGINS.strip() == "*": + allow_origins = ["*"] +else: + allow_origins = [x.strip() for x in CORS_ORIGINS.split(",") if x.strip()] + +app.add_middleware( + CORSMiddleware, + allow_origins=allow_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.on_event("startup") +async def recover_orphan_jobs() -> None: + recovered = store.recover_interrupted_jobs( + completed_at=utc_now_iso(), + message="Interrupted by aurora-service restart", + strategy=RECOVERY_STRATEGY, + ) + if recovered: + logger.warning( + "Recovered %d interrupted Aurora jobs with strategy=%s", + recovered, + RECOVERY_STRATEGY, + ) + queued = sorted( + [job for job in store.list_jobs() if job.status == "queued"], + key=lambda item: item.created_at, + ) + for job in queued: + asyncio.create_task(run_job(job.job_id)) + if queued: + logger.info("Rescheduled %d queued Aurora jobs on startup", len(queued)) + + cleaned = _cleanup_work_dirs() + if cleaned: + logger.info("Cleaned %d orphaned _work directories (%.1f MB freed)", cleaned["dirs"], cleaned["mb"]) + + +def _cleanup_work_dirs() -> Dict[str, Any]: + """Remove leftover _work_* directories from old PNG-based pipeline.""" + total_freed = 0 + dirs_removed = 0 + for job_dir in store.outputs_dir.iterdir(): + if not job_dir.is_dir(): + continue + for entry in list(job_dir.iterdir()): + if entry.is_dir() and entry.name.startswith("_work"): + size = sum(f.stat().st_size for f in entry.rglob("*") if f.is_file()) + shutil.rmtree(entry, ignore_errors=True) + total_freed += size + dirs_removed += 1 + return {"dirs": dirs_removed, "mb": total_freed / (1024 * 1024)} + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def safe_filename(file_name: str) -> str: + base = Path(file_name or "upload.bin").name + sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", base).strip("._") + return sanitized or f"upload_{uuid.uuid4().hex[:8]}.bin" + + +def compute_sha256(path: Path) -> str: + digest = hashlib.sha256() + with path.open("rb") as f: + while True: + chunk = f.read(1024 * 1024) + if not chunk: + break + digest.update(chunk) + return f"sha256:{digest.hexdigest()}" + + +def detect_media_type(file_name: str, content_type: str) -> MediaType: + ext = Path(file_name).suffix.lower() + if content_type.startswith("video/") or ext in VIDEO_EXTENSIONS: + return "video" + if content_type.startswith("audio/") or ext in AUDIO_EXTENSIONS: + return "audio" + if content_type.startswith("image/") or ext in PHOTO_EXTENSIONS: + return "photo" + return "unknown" + + +def _normalize_mode(raw_mode: Optional[str], fallback: AuroraMode = "tactical") -> AuroraMode: + value = (raw_mode or fallback).strip().lower() + if value not in ("tactical", "forensic"): + return fallback + return value # type: ignore[return-value] + + +def _normalize_priority(raw_priority: Optional[str], fallback: str = "balanced") -> str: + value = (raw_priority or fallback).strip().lower() + if value not in {"balanced", "faces", "plates", "details", "speech"}: + return fallback + return value + + +def _job_storage_info(job: Any) -> Dict[str, str]: + upload_dir = (store.uploads_dir / job.job_id).resolve() + output_dir = (store.outputs_dir / job.job_id).resolve() + job_record = (store.jobs_dir / f"{job.job_id}.json").resolve() + payload = { + "upload_dir": str(upload_dir), + "output_dir": str(output_dir), + "job_record": str(job_record), + } + input_path = Path(str(job.input_path)) + if input_path.exists(): + payload["input_path"] = str(input_path.resolve()) + return payload + + +def _queued_position(job_id: str) -> Optional[int]: + target = store.get_job(job_id) + if not target or target.status != "queued": + return None + queued: List[Any] = [] + for path in sorted(store.jobs_dir.glob("*.json")): + try: + payload = json.loads(path.read_text(encoding="utf-8")) + if payload.get("status") == "queued": + queued.append(payload) + except Exception: + continue + queued.sort(key=lambda item: str(item.get("created_at") or "")) + for idx, item in enumerate(queued, start=1): + if str(item.get("job_id") or "") == job_id: + return idx + return None + + +def _resolve_source_media_path(job: Any, *, second_pass: bool = False) -> Path: + input_path = Path(str(job.input_path)) + if not second_pass and input_path.exists() and input_path.is_file(): + return input_path + + result = getattr(job, "result", None) + if result and isinstance(getattr(result, "output_files", None), list): + for item in result.output_files: + file_type = str(getattr(item, "type", "")).lower() + file_name = str(getattr(item, "name", "")) + if file_type != str(job.media_type).lower(): + continue + candidate = (store.outputs_dir / job.job_id / file_name) + if candidate.exists() and candidate.is_file(): + return candidate + + if input_path.exists() and input_path.is_file(): + return input_path + raise HTTPException(status_code=409, detail=f"Source media not available for job {job.job_id}") + + +def _enqueue_job_from_path( + *, + source_path: Path, + file_name: str, + mode: AuroraMode, + media_type: MediaType, + priority: str, + export_options: Dict[str, Any], + metadata_patch: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + now = datetime.now(timezone.utc) + job_id = f"aurora_{now.strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" + + upload_dir = store.uploads_dir / job_id + upload_dir.mkdir(parents=True, exist_ok=True) + input_path = upload_dir / safe_filename(file_name) + shutil.copy2(source_path, input_path) + + input_hash = compute_sha256(input_path) + initial_metadata = _estimate_upload_metadata( + input_path=input_path, + media_type=media_type, + mode=mode, + ) + if export_options: + initial_metadata["export_options"] = export_options + initial_metadata["priority"] = priority + if metadata_patch: + initial_metadata.update(metadata_patch) + + store.create_job( + job_id=job_id, + file_name=input_path.name, + input_path=input_path, + input_hash=input_hash, + mode=mode, + media_type=media_type, + created_at=utc_now_iso(), + metadata=initial_metadata, + ) + asyncio.create_task(run_job(job_id)) + return { + "job_id": job_id, + "mode": mode, + "media_type": media_type, + "priority": priority, + "export_options": export_options, + "status_url": f"/api/aurora/status/{job_id}", + "result_url": f"/api/aurora/result/{job_id}", + "cancel_url": f"/api/aurora/cancel/{job_id}", + } + + +def model_dump(value: Any) -> Dict[str, Any]: + if hasattr(value, "model_dump"): + return value.model_dump() + return value.dict() + + +def _parse_iso_utc(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except Exception: + return None + + +def _estimate_upload_metadata(input_path: Path, media_type: MediaType, mode: AuroraMode) -> Dict[str, Any]: + metadata: Dict[str, Any] = {} + if media_type == "video": + video_meta = probe_video_metadata(input_path) + if video_meta: + metadata["video"] = video_meta + estimate_s = estimate_processing_seconds( + media_type="video", + mode=mode, + width=int(video_meta.get("width") or 0), + height=int(video_meta.get("height") or 0), + frame_count=int(video_meta.get("frame_count") or 0), + ) + if estimate_s: + metadata["estimated_processing_seconds"] = int(estimate_s) + elif media_type == "photo": + try: + import cv2 # type: ignore[import-untyped] + + frame = cv2.imread(str(input_path), cv2.IMREAD_COLOR) + if frame is not None: + h, w = frame.shape[:2] + metadata["image"] = {"width": int(w), "height": int(h)} + estimate_s = estimate_processing_seconds( + media_type="photo", + mode=mode, + width=int(w), + height=int(h), + frame_count=1, + ) + if estimate_s: + metadata["estimated_processing_seconds"] = int(estimate_s) + except Exception: + pass + elif media_type == "audio": + audio_meta = _probe_audio_metadata(input_path) + if audio_meta: + metadata["audio"] = audio_meta + duration_s = float(audio_meta.get("duration_seconds") or 0.0) + if duration_s > 0: + factor = 0.45 if mode == "tactical" else 1.25 + metadata["estimated_processing_seconds"] = int(max(8, min(10800, duration_s * factor))) + return metadata + + +def _probe_audio_metadata(input_path: Path) -> Dict[str, Any]: + try: + cmd = [ + "ffprobe", + "-v", + "error", + "-show_streams", + "-show_format", + "-print_format", + "json", + str(input_path), + ] + proc = subprocess.run(cmd, capture_output=True, text=True, check=False) + if proc.returncode != 0 or not proc.stdout: + return {} + payload = json.loads(proc.stdout) + streams = payload.get("streams") or [] + audio_stream = next((s for s in streams if str(s.get("codec_type", "")).lower() == "audio"), None) + fmt = payload.get("format") or {} + duration_raw = (audio_stream or {}).get("duration") or fmt.get("duration") + duration = float(duration_raw) if duration_raw not in (None, "", "N/A") else 0.0 + sample_rate_raw = (audio_stream or {}).get("sample_rate") + channels_raw = (audio_stream or {}).get("channels") + bitrate_raw = (audio_stream or {}).get("bit_rate") or fmt.get("bit_rate") + return { + "duration_seconds": round(duration, 3) if duration > 0 else None, + "sample_rate_hz": int(sample_rate_raw) if sample_rate_raw not in (None, "", "N/A") else None, + "channels": int(channels_raw) if channels_raw not in (None, "", "N/A") else None, + "bit_rate": int(bitrate_raw) if bitrate_raw not in (None, "", "N/A") else None, + "codec": (audio_stream or {}).get("codec_name"), + "container": fmt.get("format_name"), + } + except Exception: + return {} + + +def _analyze_audio(path: Path) -> Dict[str, Any]: + meta = _probe_audio_metadata(path) + duration = float(meta.get("duration_seconds") or 0.0) + bitrate = float(meta.get("bit_rate") or 0.0) + recommendations: List[str] = [] + if duration <= 0: + recommendations.append("Не вдалося надійно визначити тривалість аудіо.") + if bitrate and bitrate < 128000: + recommendations.append("Низький bitrate: рекомендується forensic-режим та денойз перед транскрипцією.") + else: + recommendations.append("Рекомендується tactical denoise + speech enhance для швидкого перегляду.") + recommendations.append("Для доказового контуру: forensic mode + chain-of-custody + підпис результатів.") + estimate_tactical = int(max(6, min(7200, (duration or 20.0) * 0.45))) + estimate_forensic = int(max(12, min(14400, (duration or 20.0) * 1.25))) + return { + "media_type": "audio", + "audio": meta, + "quality_analysis": { + "bitrate_tier": "low" if bitrate and bitrate < 128000 else "normal", + "duration_bucket": "short" if duration and duration < 60 else "long" if duration and duration > 600 else "medium", + }, + "recommendations": recommendations, + "suggested_priority": "speech", + "suggested_export": { + "format": "wav_pcm_s16le", + "sample_rate_hz": int(meta.get("sample_rate_hz") or 16000), + "channels": 1, + }, + "estimated_processing_seconds": { + "tactical": estimate_tactical, + "forensic": estimate_forensic, + }, + } + + +def _parse_export_options(raw_value: str) -> Dict[str, Any]: + if not raw_value: + return {} + try: + parsed = json.loads(raw_value) + except Exception as exc: + raise HTTPException(status_code=422, detail=f"Invalid export_options JSON: {exc}") from exc + if not isinstance(parsed, dict): + raise HTTPException(status_code=422, detail="export_options must be a JSON object") + return parsed + + +def _status_timing(job: Any) -> Dict[str, Optional[int]]: + started = _parse_iso_utc(job.started_at) + if not started: + return { + "elapsed_seconds": None, + "estimated_total_seconds": None, + "eta_seconds": None, + } + now = datetime.now(timezone.utc) + estimated_total: Optional[int] = None + eta: Optional[int] = None + + if job.status in ("completed", "failed", "cancelled") and job.completed_at: + completed = _parse_iso_utc(job.completed_at) + if completed: + elapsed = max(0, int((completed - started).total_seconds())) + estimated_total = elapsed + eta = 0 + else: + elapsed = max(0, int((now - started).total_seconds())) + else: + elapsed = max(0, int((now - started).total_seconds())) + + if job.status == "processing": + hinted_total = None + if isinstance(job.metadata, dict): + hinted_total = job.metadata.get("estimated_processing_seconds") + if isinstance(hinted_total, (int, float)) and hinted_total > 0: + estimated_total = int(hinted_total) + elif job.progress >= 5: + estimated_total = int(elapsed / max(0.05, job.progress / 100.0)) + + stage_eta = None + if isinstance(job.current_stage, str): + match = re.search(r"eta ~([0-9]+)s", job.current_stage) + if match: + try: + stage_eta = int(match.group(1)) + except Exception: + stage_eta = None + + if estimated_total and estimated_total > 0: + eta = max(0, int(estimated_total - elapsed)) + if stage_eta is not None: + # Early-stage per-frame ETA is noisy (model warmup / cache effects). + # Blend with metadata estimate first; trust stage ETA more after ~10%. + if eta is None: + eta = stage_eta + elif job.progress < 10: + eta = int((eta * 0.75) + (stage_eta * 0.25)) + elif job.progress < 30: + eta = int((eta * 0.50) + (stage_eta * 0.50)) + else: + eta = int((eta * 0.25) + (stage_eta * 0.75)) + estimated_total = max(estimated_total or 0, elapsed + max(0, eta)) + + live_fps: Optional[float] = None + eta_confidence: Optional[str] = None + if isinstance(job.current_stage, str): + fps_match = re.search(r"\(([0-9]+(?:\.[0-9]+)?)\s*fps", job.current_stage) + if fps_match: + try: + live_fps = round(float(fps_match.group(1)), 2) + except Exception: + pass + skip_match = re.search(r"skip=([0-9]+)%", job.current_stage) + skip_pct = int(skip_match.group(1)) if skip_match else 0 + if job.progress >= 30 and live_fps is not None: + eta_confidence = "high" if skip_pct < 50 else "medium" + elif job.progress >= 10: + eta_confidence = "medium" + elif job.progress >= 2: + eta_confidence = "low" + + return { + "elapsed_seconds": elapsed, + "estimated_total_seconds": estimated_total, + "eta_seconds": eta, + "live_fps": live_fps, + "eta_confidence": eta_confidence, + } + + +async def run_job(job_id: str) -> None: + async with RUN_SLOT: + job = store.get_job(job_id) + if not job: + return + if job.status == "cancelled": + return + if job.cancel_requested: + store.mark_cancelled(job_id, completed_at=utc_now_iso()) + return + + store.mark_processing(job_id, started_at=utc_now_iso()) + logger.info("aurora job started: %s (%s, %s)", job_id, job.media_type, job.mode) + + def on_progress(progress: int, stage: str, step: Any = None) -> None: + store.set_progress(job_id, progress=progress, current_stage=stage) + if step is not None: + store.append_processing_step(job_id, step) + + def is_cancelled() -> bool: + current = store.get_job(job_id) + return bool(current and current.cancel_requested) + + try: + current_job = store.get_job(job_id) + if not current_job: + return + result = await asyncio.to_thread( + orchestrator.run, + current_job, + on_progress, + is_cancelled, + ) + if is_cancelled(): + store.mark_cancelled(job_id, completed_at=utc_now_iso()) + return + completed_at = utc_now_iso() + store.mark_completed(job_id, result=result, completed_at=completed_at) + final_job = store.get_job(job_id) + if final_job and isinstance(final_job.metadata, dict): + meta = dict(final_job.metadata) + started = _parse_iso_utc(final_job.started_at) + completed = _parse_iso_utc(completed_at) + if started and completed: + meta["actual_processing_seconds"] = max(0, int((completed - started).total_seconds())) + if isinstance(result.metadata, dict): + meta["result_metadata"] = result.metadata + store.patch_job(job_id, metadata=meta) + logger.info("aurora job completed: %s", job_id) + except JobCancelledError: + store.mark_cancelled(job_id, completed_at=utc_now_iso()) + logger.info("aurora job cancelled: %s", job_id) + except Exception as exc: + store.mark_failed(job_id, message=str(exc), completed_at=utc_now_iso()) + logger.exception("aurora job failed: %s", job_id) + + +def _aurora_chat_reply( + *, + message: str, + job: Optional[Any], + analysis: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + normalized_message = (message or "").strip() + lower = normalized_message.lower() + actions: List[Dict[str, Any]] = [] + context: Dict[str, Any] = {} + lines: List[str] = [] + + if job: + timing = _status_timing(job) + storage = _job_storage_info(job) + context["job_id"] = job.job_id + context["status"] = job.status + context["stage"] = job.current_stage + context["timing"] = timing + context["storage"] = storage + lines.append(f"Job `{job.job_id}`: status `{job.status}`, stage `{job.current_stage}`.") + + if job.status == "queued": + position = _queued_position(job.job_id) + if position: + lines.append(f"Черга: позиція #{position}.") + actions.append({"type": "refresh_status", "label": "Оновити статус"}) + actions.append({"type": "cancel", "label": "Скасувати job"}) + elif job.status == "processing": + elapsed = timing.get("elapsed_seconds") + eta = timing.get("eta_seconds") + if isinstance(elapsed, int): + if isinstance(eta, int): + lines.append(f"Минуло {elapsed}s, орієнтовно залишилось ~{eta}s.") + else: + lines.append(f"Минуло {elapsed}s, ETA ще уточнюється.") + actions.append({"type": "refresh_status", "label": "Оновити статус"}) + actions.append({"type": "cancel", "label": "Скасувати job"}) + elif job.status == "completed": + lines.append(f"Результати збережені в `{storage.get('output_dir', 'n/a')}`.") + actions.append({"type": "open_result", "label": "Відкрити результат"}) + actions.append({"type": "reprocess", "label": "Повторити обробку", "second_pass": False}) + actions.append({"type": "reprocess", "label": "Second pass", "second_pass": True}) + elif job.status in ("failed", "cancelled"): + if job.error_message: + lines.append(f"Причина: {job.error_message}") + lines.append("Можна перезапустити обробку з тими самими або новими параметрами.") + actions.append({"type": "reprocess", "label": "Перезапустити job", "second_pass": False}) + actions.append({"type": "reprocess", "label": "Second pass", "second_pass": True}) + + if any(token in lower for token in ("де", "where", "storage", "збереж")): + lines.append( + "Шляхи: " + f"input `{storage.get('input_path', 'n/a')}`, " + f"output `{storage.get('output_dir', 'n/a')}`, " + f"job `{storage.get('job_record', 'n/a')}`." + ) + + if analysis and isinstance(analysis, dict): + recs = analysis.get("recommendations") + if isinstance(recs, list) and recs: + top_recs = [str(x) for x in recs[:3]] + lines.append("Рекомендації pre-analysis: " + "; ".join(top_recs)) + suggested_priority = str(analysis.get("suggested_priority") or "").strip() + if suggested_priority: + actions.append( + { + "type": "reprocess", + "label": f"Reprocess ({suggested_priority})", + "priority": suggested_priority, + "second_pass": False, + } + ) + + if not lines: + lines.append("Готова допомогти з обробкою. Надішліть файл або оберіть job для контексту.") + lines.append("Я можу пояснити ETA, місце збереження та запустити reprocess.") + actions.append({"type": "refresh_health", "label": "Перевірити Aurora"}) + + if any(token in lower for token in ("повтор", "reprocess", "ще раз", "second pass", "другий прохід")): + actions.append({"type": "reprocess", "label": "Запустити reprocess", "second_pass": "second pass" in lower}) + if "скас" in lower or "cancel" in lower: + actions.append({"type": "cancel", "label": "Скасувати job"}) + if "статус" in lower or "status" in lower: + actions.append({"type": "refresh_status", "label": "Оновити статус"}) + + deduped: List[Dict[str, Any]] = [] + seen = set() + for action in actions: + key = json.dumps(action, sort_keys=True, ensure_ascii=True) + if key in seen: + continue + seen.add(key) + deduped.append(action) + + return { + "agent": "Aurora", + "reply": "\n".join(lines), + "context": context, + "actions": deduped[:6], + } + + +@app.get("/health") +async def health() -> Dict[str, Any]: + subagents = build_subagent_registry() + return { + "status": "healthy", + "service": "aurora-service", + "data_dir": str(DATA_DIR), + "jobs": store.count_by_status(), + "runtime": runtime_diagnostics(), + "scheduler": {"max_concurrent_jobs": MAX_CONCURRENT_JOBS}, + "langchain_scaffold": { + "enabled": True, + "subagents": list(subagents.keys()), + }, + } + + +@app.post("/api/aurora/analyze") +async def analyze_media(file: UploadFile = File(...)) -> Dict[str, Any]: + file_name = safe_filename(file.filename or "upload.bin") + media_type = detect_media_type(file_name, file.content_type or "") + if media_type not in ("video", "photo", "audio"): + raise HTTPException(status_code=415, detail="Analyze supports video/photo/audio only") + + analyze_dir = store.uploads_dir / "_analyze" + analyze_dir.mkdir(parents=True, exist_ok=True) + tmp_path = analyze_dir / f"{uuid.uuid4().hex[:12]}_{file_name}" + + content = await file.read() + if not content: + raise HTTPException(status_code=400, detail="Empty upload") + tmp_path.write_bytes(content) + + try: + if media_type == "video": + payload = analyze_video(tmp_path) + elif media_type == "audio": + payload = _analyze_audio(tmp_path) + else: + payload = analyze_photo(tmp_path) + payload["file_name"] = file_name + payload["media_type"] = media_type + return payload + except HTTPException: + raise + except Exception as exc: + raise HTTPException(status_code=500, detail=f"Analyze failed: {exc}") from exc + finally: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + + +@app.post("/api/aurora/audio/analyze") +async def analyze_audio(file: UploadFile = File(...)) -> Dict[str, Any]: + file_name = safe_filename(file.filename or "upload_audio.bin") + media_type = detect_media_type(file_name, file.content_type or "") + if media_type != "audio": + raise HTTPException(status_code=415, detail="Audio analyze supports audio files only") + analyze_dir = store.uploads_dir / "_analyze_audio" + analyze_dir.mkdir(parents=True, exist_ok=True) + tmp_path = analyze_dir / f"{uuid.uuid4().hex[:12]}_{file_name}" + content = await file.read() + if not content: + raise HTTPException(status_code=400, detail="Empty upload") + tmp_path.write_bytes(content) + try: + payload = _analyze_audio(tmp_path) + payload["file_name"] = file_name + return payload + except HTTPException: + raise + except Exception as exc: + raise HTTPException(status_code=500, detail=f"Audio analyze failed: {exc}") from exc + finally: + tmp_path.unlink(missing_ok=True) + + +@app.post("/api/aurora/audio/process") +async def process_audio( + file: UploadFile = File(...), + mode: str = Form("tactical"), + priority: str = Form("speech"), + export_options: str = Form(""), +) -> Dict[str, Any]: + file_name = safe_filename(file.filename or "upload_audio.bin") + media_type = detect_media_type(file_name, file.content_type or "") + if media_type != "audio": + raise HTTPException(status_code=415, detail="Audio process supports audio files only") + content = await file.read() + if not content: + raise HTTPException(status_code=400, detail="Empty upload") + normalized_mode = _normalize_mode(mode) + normalized_priority = _normalize_priority(priority, fallback="balanced") + parsed_export_options = _parse_export_options(export_options) + tmp_dir = store.uploads_dir / "_incoming_audio" + tmp_dir.mkdir(parents=True, exist_ok=True) + source_path = tmp_dir / f"{uuid.uuid4().hex[:12]}_{file_name}" + source_path.write_bytes(content) + try: + result = _enqueue_job_from_path( + source_path=source_path, + file_name=file_name, + mode=normalized_mode, + media_type="audio", + priority=normalized_priority, + export_options=parsed_export_options, + metadata_patch={"audio_pipeline": "scaffold_v1"}, + ) + result["pipeline"] = "audio_scaffold_v1" + return result + finally: + source_path.unlink(missing_ok=True) + + +@app.post("/api/aurora/upload") +async def upload_media( + file: UploadFile = File(...), + mode: str = Form("tactical"), + priority: str = Form("balanced"), + export_options: str = Form(""), +) -> Dict[str, Any]: + raw_mode = (mode or "").strip().lower() + if raw_mode and raw_mode not in ("tactical", "forensic"): + raise HTTPException(status_code=422, detail="mode must be 'tactical' or 'forensic'") + normalized_mode = _normalize_mode(mode) + if normalized_mode not in ("tactical", "forensic"): + raise HTTPException(status_code=422, detail="mode must be 'tactical' or 'forensic'") + file_name = safe_filename(file.filename or "upload.bin") + media_type = detect_media_type(file_name, file.content_type or "") + if media_type == "unknown": + raise HTTPException(status_code=415, detail="Unsupported media type") + content = await file.read() + if not content: + raise HTTPException(status_code=400, detail="Empty upload") + tmp_dir = store.uploads_dir / "_incoming" + tmp_dir.mkdir(parents=True, exist_ok=True) + source_path = tmp_dir / f"{uuid.uuid4().hex[:12]}_{file_name}" + source_path.write_bytes(content) + + normalized_priority = _normalize_priority(priority, fallback="balanced") + parsed_export_options = _parse_export_options(export_options) + try: + return _enqueue_job_from_path( + source_path=source_path, + file_name=file_name, + mode=normalized_mode, + media_type=media_type, + priority=normalized_priority, + export_options=parsed_export_options, + ) + finally: + source_path.unlink(missing_ok=True) + + +@app.post("/api/aurora/reprocess/{job_id}") +async def reprocess_media( + job_id: str, + payload: Optional[Dict[str, Any]] = Body(default=None), +) -> Dict[str, Any]: + source_job = store.get_job(job_id) + if not source_job: + raise HTTPException(status_code=404, detail="job not found") + + body = payload if isinstance(payload, dict) else {} + second_pass = bool(body.get("second_pass", False)) + source_path = _resolve_source_media_path(source_job, second_pass=second_pass) + + source_meta = source_job.metadata if isinstance(source_job.metadata, dict) else {} + requested_mode = body.get("mode") + requested_priority = body.get("priority") + requested_export = body.get("export_options") + + normalized_mode = _normalize_mode( + str(requested_mode) if isinstance(requested_mode, str) else source_job.mode, + fallback=source_job.mode, + ) + normalized_priority = _normalize_priority( + str(requested_priority) if isinstance(requested_priority, str) else str(source_meta.get("priority") or "balanced"), + fallback="balanced", + ) + + export_options: Dict[str, Any] = {} + if isinstance(source_meta.get("export_options"), dict): + export_options.update(source_meta["export_options"]) + if isinstance(requested_export, dict): + export_options = requested_export + + result = _enqueue_job_from_path( + source_path=source_path, + file_name=source_job.file_name, + mode=normalized_mode, + media_type=source_job.media_type, + priority=normalized_priority, + export_options=export_options, + metadata_patch={ + "reprocess_of": source_job.job_id, + "reprocess_second_pass": second_pass, + }, + ) + result["source_job_id"] = source_job.job_id + result["second_pass"] = second_pass + return result + + +@app.post("/api/aurora/chat") +async def aurora_chat(payload: Optional[Dict[str, Any]] = Body(default=None)) -> Dict[str, Any]: + body = payload if isinstance(payload, dict) else {} + message = str(body.get("message") or "").strip() + job_id = str(body.get("job_id") or "").strip() + analysis = body.get("analysis") + analysis_payload = analysis if isinstance(analysis, dict) else None + job = store.get_job(job_id) if job_id else None + response = _aurora_chat_reply( + message=message, + job=job, + analysis=analysis_payload, + ) + if job_id and not job: + response["context"] = { + **(response.get("context") or {}), + "job_id": job_id, + "warning": "job not found", + } + return response + + +@app.get("/api/aurora/status/{job_id}") +async def job_status(job_id: str) -> Dict[str, Any]: + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="job not found") + + timing = _status_timing(job) + response = { + "job_id": job.job_id, + "status": job.status, + "progress": job.progress, + "current_stage": job.current_stage, + "mode": job.mode, + "media_type": job.media_type, + "error_message": job.error_message, + "created_at": job.created_at, + "started_at": job.started_at, + "completed_at": job.completed_at, + "processing_log_count": len(job.processing_log), + "elapsed_seconds": timing["elapsed_seconds"], + "estimated_total_seconds": timing["estimated_total_seconds"], + "eta_seconds": timing["eta_seconds"], + "live_fps": timing.get("live_fps"), + "eta_confidence": timing.get("eta_confidence"), + "queue_position": _queued_position(job_id), + "metadata": job.metadata, + "storage": _job_storage_info(job), + } + if job.result: + response["output_files"] = [model_dump(item) for item in job.result.output_files] + return response + + +@app.get("/api/aurora/jobs") +async def list_jobs( + limit: int = Query(default=30, ge=1, le=200), + status: Optional[str] = Query(default=None), +) -> Dict[str, Any]: + requested_statuses: Optional[set[str]] = None + if status and status.strip(): + parts = {part.strip().lower() for part in status.split(",") if part.strip()} + valid = {"queued", "processing", "completed", "failed", "cancelled"} + requested_statuses = {part for part in parts if part in valid} or None + + jobs = store.list_jobs() + if requested_statuses: + jobs = [job for job in jobs if job.status in requested_statuses] + + jobs_sorted = sorted( + jobs, + key=lambda item: ( + _parse_iso_utc(item.created_at) or datetime.fromtimestamp(0, tz=timezone.utc), + item.job_id, + ), + reverse=True, + ) + + items: List[Dict[str, Any]] = [] + for job in jobs_sorted[:limit]: + timing = _status_timing(job) + items.append( + { + "job_id": job.job_id, + "status": job.status, + "mode": job.mode, + "media_type": job.media_type, + "file_name": job.file_name, + "progress": job.progress, + "current_stage": job.current_stage, + "error_message": job.error_message, + "created_at": job.created_at, + "started_at": job.started_at, + "completed_at": job.completed_at, + "elapsed_seconds": timing["elapsed_seconds"], + "eta_seconds": timing["eta_seconds"], + "live_fps": timing.get("live_fps"), + "metadata": job.metadata if isinstance(job.metadata, dict) else {}, + "queue_position": _queued_position(job.job_id), + "has_result": bool(job.result), + } + ) + return { + "jobs": items, + "count": len(items), + "total": len(jobs_sorted), + } + + +@app.get("/api/aurora/result/{job_id}") +async def job_result(job_id: str) -> Dict[str, Any]: + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="job not found") + if job.status != "completed" or not job.result: + raise HTTPException( + status_code=409, + detail=f"job not completed (status={job.status})", + ) + payload = model_dump(job.result) + payload["storage"] = _job_storage_info(job) + if job.mode == "forensic": + payload["forensic_report_url"] = f"/api/aurora/report/{job_id}.pdf" + return payload + + +@app.get("/api/aurora/report/{job_id}.pdf") +async def job_forensic_pdf(job_id: str) -> FileResponse: + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="job not found") + if job.status != "completed" or not job.result: + raise HTTPException(status_code=409, detail=f"job not completed (status={job.status})") + if job.mode != "forensic": + raise HTTPException(status_code=409, detail="forensic report is available only in forensic mode") + + report_path = store.outputs_dir / job_id / "forensic_report.pdf" + try: + generate_forensic_report_pdf(job, report_path) + except Exception as exc: + raise HTTPException(status_code=500, detail=f"Cannot generate forensic report: {exc}") from exc + return FileResponse( + path=report_path, + filename=f"{job_id}_forensic_report.pdf", + media_type="application/pdf", + ) + + +@app.post("/api/aurora/cancel/{job_id}") +async def cancel_job(job_id: str) -> Dict[str, Any]: + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="job not found") + updated = store.request_cancel(job_id) + return { + "job_id": updated.job_id, + "status": updated.status, + "cancel_requested": updated.cancel_requested, + } + + +@app.post("/api/aurora/delete/{job_id}") +async def delete_job( + job_id: str, + purge_files: bool = Query(default=True), +) -> Dict[str, Any]: + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="job not found") + if job.status in ("queued", "processing"): + raise HTTPException( + status_code=409, + detail="job is not terminal; cancel it first", + ) + deleted = store.delete_job(job_id, remove_artifacts=purge_files) + if not deleted: + raise HTTPException(status_code=404, detail="job not found") + return { + "job_id": job_id, + "deleted": True, + "purge_files": bool(purge_files), + } + + +@app.get("/api/aurora/storage") +async def storage_info() -> Dict[str, Any]: + """Disk usage breakdown and per-job sizes.""" + jobs = store.list_jobs() + per_job: List[Dict[str, Any]] = [] + total_output = 0 + total_upload = 0 + total_work = 0 + for job in jobs: + out_dir = store.outputs_dir / job.job_id + up_dir = store.uploads_dir / job.job_id + out_size = sum(f.stat().st_size for f in out_dir.rglob("*") if f.is_file()) if out_dir.exists() else 0 + up_size = sum(f.stat().st_size for f in up_dir.rglob("*") if f.is_file()) if up_dir.exists() else 0 + work_size = 0 + if out_dir.exists(): + for d in out_dir.iterdir(): + if d.is_dir() and d.name.startswith("_work"): + work_size += sum(f.stat().st_size for f in d.rglob("*") if f.is_file()) + total_output += out_size + total_upload += up_size + total_work += work_size + per_job.append({ + "job_id": job.job_id, + "status": job.status, + "output_mb": round(out_size / (1024 * 1024), 1), + "upload_mb": round(up_size / (1024 * 1024), 1), + "work_mb": round(work_size / (1024 * 1024), 1), + }) + models_dir = DATA_DIR / "models" + models_size = sum(f.stat().st_size for f in models_dir.rglob("*") if f.is_file()) if models_dir.exists() else 0 + return { + "data_dir": str(DATA_DIR), + "total_mb": round((total_output + total_upload + total_work + models_size) / (1024 * 1024), 1), + "outputs_mb": round(total_output / (1024 * 1024), 1), + "uploads_mb": round(total_upload / (1024 * 1024), 1), + "orphan_work_mb": round(total_work / (1024 * 1024), 1), + "models_mb": round(models_size / (1024 * 1024), 1), + "jobs": sorted(per_job, key=lambda x: x["output_mb"], reverse=True), + } + + +@app.post("/api/aurora/cleanup") +async def cleanup_storage( + max_age_hours: int = Query(default=0, ge=0, description="Delete completed/failed/cancelled jobs older than N hours. 0 = only orphan _work dirs."), +) -> Dict[str, Any]: + """Clean up orphaned _work directories and optionally old terminal jobs.""" + result = _cleanup_work_dirs() + deleted_jobs: List[str] = [] + if max_age_hours > 0: + cutoff = datetime.now(tz=timezone.utc).timestamp() - max_age_hours * 3600 + for job in store.list_jobs(): + if job.status not in ("completed", "failed", "cancelled"): + continue + ts = _parse_iso_utc(job.completed_at or job.created_at) + if ts and ts.timestamp() < cutoff: + store.delete_job(job.job_id, remove_artifacts=True) + deleted_jobs.append(job.job_id) + return { + "work_dirs_removed": result["dirs"], + "work_mb_freed": round(result["mb"], 1), + "jobs_deleted": deleted_jobs, + "jobs_deleted_count": len(deleted_jobs), + } + + +@app.get("/api/aurora/files/{job_id}/{file_name}") +async def download_output_file(job_id: str, file_name: str) -> FileResponse: + base = (store.outputs_dir / job_id).resolve() + target = (base / file_name).resolve() + if not str(target).startswith(str(base)): + raise HTTPException(status_code=403, detail="invalid file path") + if not target.exists() or not target.is_file(): + raise HTTPException(status_code=404, detail="file not found") + return FileResponse(path=target, filename=target.name) + + +# ── Kling AI endpoints ──────────────────────────────────────────────────────── + +@app.get("/api/aurora/kling/health") +async def kling_health() -> Dict[str, Any]: + """Check Kling AI connectivity.""" + from .kling import kling_health_check + return kling_health_check() + + +@app.post("/api/aurora/kling/enhance") +async def kling_enhance_video( + job_id: str = Form(..., description="Aurora job_id whose result to enhance with Kling"), + prompt: str = Form("enhance video quality, improve sharpness and clarity", description="Enhancement guidance"), + negative_prompt: str = Form("noise, blur, artifacts, distortion", description="What to avoid"), + mode: str = Form("pro", description="'std' or 'pro'"), + duration: str = Form("5", description="'5' or '10' seconds"), + cfg_scale: float = Form(0.5, description="Prompt adherence 0.0-1.0"), +) -> Dict[str, Any]: + """Submit Aurora job result to Kling AI for video-to-video enhancement.""" + from .kling import kling_video_enhance, kling_upload_file + + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail=f"Job {job_id} not found") + if job.status != "completed": + raise HTTPException(status_code=409, detail=f"Job must be completed, current status: {job.status}") + + result_path = store.outputs_dir / job_id / "aurora_result.mp4" + if not result_path.exists(): + for ext in [".mov", ".avi", ".mkv"]: + alt = result_path.with_suffix(ext) + if alt.exists(): + result_path = alt + break + if not result_path.exists(): + raise HTTPException(status_code=404, detail="Result file not found for this job") + + try: + upload_resp = kling_upload_file(result_path) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Kling upload error: {str(exc)[:400]}") from exc + file_id = (upload_resp.get("data") or {}).get("resource_id") or (upload_resp.get("data") or {}).get("file_id") + + if not file_id: + raise HTTPException(status_code=502, detail=f"Kling upload failed: {upload_resp}") + + try: + task_resp = kling_video_enhance( + video_id=file_id, + prompt=prompt, + negative_prompt=negative_prompt, + mode=mode, + duration=duration, + cfg_scale=cfg_scale, + ) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Kling task submit error: {str(exc)[:400]}") from exc + task_id = (task_resp.get("data") or {}).get("task_id") or task_resp.get("task_id") + + kling_meta_dir = store.outputs_dir / job_id + kling_meta_path = kling_meta_dir / "kling_task.json" + kling_meta_path.write_text(json.dumps({ + "aurora_job_id": job_id, + "kling_task_id": task_id, + "kling_file_id": file_id, + "prompt": prompt, + "mode": mode, + "duration": duration, + "submitted_at": datetime.now(timezone.utc).isoformat(), + "status": "submitted", + }, ensure_ascii=False, indent=2), encoding="utf-8") + + return { + "aurora_job_id": job_id, + "kling_task_id": task_id, + "kling_file_id": file_id, + "status": "submitted", + "status_url": f"/api/aurora/kling/status/{job_id}", + } + + +@app.get("/api/aurora/kling/status/{job_id}") +async def kling_task_status_for_job(job_id: str) -> Dict[str, Any]: + """Get Kling AI enhancement status for an Aurora job.""" + from .kling import kling_video_task_status + + kling_meta_path = store.outputs_dir / job_id / "kling_task.json" + if not kling_meta_path.exists(): + raise HTTPException(status_code=404, detail=f"No Kling task for job {job_id}") + + meta = json.loads(kling_meta_path.read_text(encoding="utf-8")) + task_id = meta.get("kling_task_id") + if not task_id: + raise HTTPException(status_code=404, detail="Kling task_id missing in metadata") + + try: + status_resp = kling_video_task_status(task_id, endpoint="video2video") + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Kling status error: {str(exc)[:400]}") from exc + task_data = status_resp.get("data") or status_resp + state = task_data.get("task_status") or task_data.get("status") or "unknown" + + meta["status"] = state + meta["last_checked"] = datetime.now(timezone.utc).isoformat() + + result_url = None + works = task_data.get("task_result", {}).get("videos") or [] + if works: + result_url = works[0].get("url") + if result_url: + meta["kling_result_url"] = result_url + meta["completed_at"] = datetime.now(timezone.utc).isoformat() + + kling_meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") + + return { + "aurora_job_id": job_id, + "kling_task_id": task_id, + "status": state, + "kling_result_url": result_url, + "meta": meta, + } + + +@app.post("/api/aurora/kling/image2video") +async def kling_image_to_video( + file: UploadFile = File(..., description="Source image (frame)"), + prompt: str = Form("smooth motion, cinematic video, high quality"), + negative_prompt: str = Form("blur, artifacts, distortion"), + model: str = Form("kling-v1-5"), + mode: str = Form("pro"), + duration: str = Form("5"), + aspect_ratio: str = Form("16:9"), +) -> Dict[str, Any]: + """Generate video from a still image using Kling AI.""" + from .kling import kling_upload_file, kling_video_generate + + file_name = file.filename or "frame.jpg" + content = await file.read() + if not content: + raise HTTPException(status_code=400, detail="Empty upload") + + tmp_dir = store.uploads_dir / "_kling_i2v" + tmp_dir.mkdir(parents=True, exist_ok=True) + tmp_path = tmp_dir / f"{uuid.uuid4().hex[:12]}_{file_name}" + tmp_path.write_bytes(content) + + try: + try: + upload_resp = kling_upload_file(tmp_path) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Kling upload error: {str(exc)[:400]}") from exc + file_id = (upload_resp.get("data") or {}).get("resource_id") or (upload_resp.get("data") or {}).get("file_id") + if not file_id: + raise HTTPException(status_code=502, detail=f"Kling upload failed: {upload_resp}") + + try: + task_resp = kling_video_generate( + image_id=file_id, + prompt=prompt, + negative_prompt=negative_prompt, + model=model, + mode=mode, + duration=duration, + aspect_ratio=aspect_ratio, + ) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Kling task submit error: {str(exc)[:400]}") from exc + task_id = (task_resp.get("data") or {}).get("task_id") or task_resp.get("task_id") + return { + "kling_task_id": task_id, + "kling_file_id": file_id, + "status": "submitted", + "status_url": f"/api/aurora/kling/task/{task_id}?endpoint=image2video", + } + finally: + tmp_path.unlink(missing_ok=True) + + +@app.get("/api/aurora/kling/task/{task_id}") +async def kling_get_task(task_id: str, endpoint: str = Query("video2video")) -> Dict[str, Any]: + """Get status of any Kling task by ID.""" + from .kling import kling_video_task_status + try: + return kling_video_task_status(task_id, endpoint=endpoint) + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Kling task status error: {str(exc)[:400]}") from exc + + +@app.get("/api/aurora/plates/{job_id}") +async def get_plate_detections(job_id: str) -> Dict[str, Any]: + """Return ALPR plate detection results for a completed job.""" + job = store.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail=f"Job {job_id} not found") + + report_path = store.outputs_dir / job_id / "plate_detections.json" + if not report_path.exists(): + return { + "job_id": job_id, + "plates_found": 0, + "unique_plates": 0, + "unique": [], + "detections": [], + "note": "No plate detection report found (job may predate ALPR support)", + } + data = json.loads(report_path.read_text(encoding="utf-8")) + return data diff --git a/services/aurora-service/launchd/install-launchd.sh b/services/aurora-service/launchd/install-launchd.sh new file mode 100755 index 00000000..d5ffe856 --- /dev/null +++ b/services/aurora-service/launchd/install-launchd.sh @@ -0,0 +1,113 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +LABEL="${AURORA_LAUNCHD_LABEL:-com.daarion.aurora}" +DOMAIN="gui/$(id -u)" +LAUNCH_AGENTS_DIR="${HOME}/Library/LaunchAgents" +PLIST_PATH="${LAUNCH_AGENTS_DIR}/${LABEL}.plist" +START_SCRIPT="${ROOT_DIR}/start-native-macos.sh" + +PORT_VALUE="${PORT:-9401}" +DATA_DIR_VALUE="${AURORA_DATA_DIR:-${HOME}/.sofiia/aurora-data}" +MODELS_DIR_VALUE="${AURORA_MODELS_DIR:-${DATA_DIR_VALUE}/models}" +PUBLIC_BASE_URL_VALUE="${AURORA_PUBLIC_BASE_URL:-http://127.0.0.1:${PORT_VALUE}}" +CORS_ORIGINS_VALUE="${AURORA_CORS_ORIGINS:-*}" +FORCE_CPU_VALUE="${AURORA_FORCE_CPU:-false}" +PREFER_MPS_VALUE="${AURORA_PREFER_MPS:-true}" +ENABLE_VTB_VALUE="${AURORA_ENABLE_VIDEOTOOLBOX:-true}" +KLING_ACCESS_KEY_VALUE="${KLING_ACCESS_KEY:-}" +KLING_SECRET_KEY_VALUE="${KLING_SECRET_KEY:-}" +KLING_BASE_URL_VALUE="${KLING_BASE_URL:-https://api.klingai.com}" +KLING_TIMEOUT_VALUE="${KLING_TIMEOUT:-60}" + +LOG_DIR="${DATA_DIR_VALUE}/logs" +LOG_OUT="${LOG_DIR}/launchd.out.log" +LOG_ERR="${LOG_DIR}/launchd.err.log" +PATH_VALUE="${PATH:-/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin}" + +if [ ! -x "${START_SCRIPT}" ]; then + echo "[aurora-launchd] missing start script: ${START_SCRIPT}" + exit 1 +fi + +if [ ! -x "${ROOT_DIR}/.venv-macos/bin/python" ]; then + echo "[aurora-launchd] .venv-macos is missing. Run ./setup-native-macos.sh first." + exit 1 +fi + +mkdir -p "${LAUNCH_AGENTS_DIR}" "${LOG_DIR}" "${DATA_DIR_VALUE}" "${MODELS_DIR_VALUE}" + +cat > "${PLIST_PATH}" < + + + + Label + ${LABEL} + + ProgramArguments + + ${START_SCRIPT} + + + WorkingDirectory + ${ROOT_DIR} + + RunAtLoad + + + KeepAlive + + + StandardOutPath + ${LOG_OUT} + StandardErrorPath + ${LOG_ERR} + + EnvironmentVariables + + PATH + ${PATH_VALUE} + PYTHONUNBUFFERED + 1 + PORT + ${PORT_VALUE} + AURORA_DATA_DIR + ${DATA_DIR_VALUE} + AURORA_MODELS_DIR + ${MODELS_DIR_VALUE} + AURORA_PUBLIC_BASE_URL + ${PUBLIC_BASE_URL_VALUE} + AURORA_CORS_ORIGINS + ${CORS_ORIGINS_VALUE} + AURORA_FORCE_CPU + ${FORCE_CPU_VALUE} + AURORA_PREFER_MPS + ${PREFER_MPS_VALUE} + AURORA_ENABLE_VIDEOTOOLBOX + ${ENABLE_VTB_VALUE} + KLING_ACCESS_KEY + ${KLING_ACCESS_KEY_VALUE} + KLING_SECRET_KEY + ${KLING_SECRET_KEY_VALUE} + KLING_BASE_URL + ${KLING_BASE_URL_VALUE} + KLING_TIMEOUT + ${KLING_TIMEOUT_VALUE} + + + +PLIST + +chmod 644 "${PLIST_PATH}" + +launchctl bootout "${DOMAIN}/${LABEL}" >/dev/null 2>&1 || true +launchctl bootstrap "${DOMAIN}" "${PLIST_PATH}" +launchctl enable "${DOMAIN}/${LABEL}" >/dev/null 2>&1 || true +launchctl kickstart -k "${DOMAIN}/${LABEL}" + +echo "[aurora-launchd] installed: ${PLIST_PATH}" +echo "[aurora-launchd] active label: ${DOMAIN}/${LABEL}" +echo "[aurora-launchd] logs: ${LOG_OUT} | ${LOG_ERR}" +echo "[aurora-launchd] check: launchctl print ${DOMAIN}/${LABEL}"