""" incident_artifacts.py — File-based artifact storage for incidents. Layout: ops/incidents// Security: - Path traversal guard (realpath must stay within base_dir) - Max 2MB per artifact - Only allowed formats: json, md, txt - Atomic writes (temp + rename) """ from __future__ import annotations import base64 import hashlib import logging import os import tempfile from pathlib import Path from typing import Dict, Optional logger = logging.getLogger(__name__) MAX_ARTIFACT_BYTES = 2 * 1024 * 1024 # 2MB ALLOWED_FORMATS = {"json", "md", "txt"} _ARTIFACTS_BASE = os.getenv( "INCIDENT_ARTIFACTS_DIR", str(Path(os.getenv("REPO_ROOT", ".")) / "ops" / "incidents"), ) def _base_dir() -> Path: return Path(os.getenv("INCIDENT_ARTIFACTS_DIR", _ARTIFACTS_BASE)) def _safe_filename(name: str) -> str: """Strip path separators and dangerous chars.""" safe = "".join(c for c in name if c.isalnum() or c in (".", "_", "-")) return safe or "artifact" def write_artifact( incident_id: str, filename: str, content_bytes: bytes, *, base_dir: Optional[str] = None, ) -> Dict: """ Write an artifact file atomically. Returns: {"path": str, "sha256": str, "size_bytes": int} Raises: ValueError on validation failure, OSError on write failure. """ if not incident_id or "/" in incident_id or ".." in incident_id: raise ValueError(f"Invalid incident_id: {incident_id}") if len(content_bytes) > MAX_ARTIFACT_BYTES: raise ValueError(f"Artifact too large: {len(content_bytes)} bytes (max {MAX_ARTIFACT_BYTES})") safe_name = _safe_filename(filename) ext = safe_name.rsplit(".", 1)[-1].lower() if "." in safe_name else "" if ext not in ALLOWED_FORMATS: raise ValueError(f"Format '{ext}' not allowed. Allowed: {ALLOWED_FORMATS}") bd = Path(base_dir) if base_dir else _base_dir() inc_dir = bd / incident_id inc_dir.mkdir(parents=True, exist_ok=True) target = inc_dir / safe_name real_base = bd.resolve() real_target = target.resolve() if not str(real_target).startswith(str(real_base)): raise ValueError("Path traversal detected") sha = hashlib.sha256(content_bytes).hexdigest() # Atomic write: temp file → rename fd, tmp_path = tempfile.mkstemp(dir=str(inc_dir), suffix=f".{ext}.tmp") try: os.write(fd, content_bytes) os.close(fd) os.replace(tmp_path, str(target)) except Exception: os.close(fd) if not os.get_inheritable(fd) else None if os.path.exists(tmp_path): os.unlink(tmp_path) raise rel_path = str(target.relative_to(bd.parent.parent)) if bd.parent.parent.exists() else str(target) logger.info("Artifact written: %s (%d bytes, sha256=%s…)", rel_path, len(content_bytes), sha[:12]) return { "path": rel_path, "sha256": sha, "size_bytes": len(content_bytes), } def decode_content(content_base64: str) -> bytes: """Decode base64-encoded content. Raises ValueError on failure.""" try: return base64.b64decode(content_base64) except Exception as exc: raise ValueError(f"Invalid base64 content: {exc}")