Files
microdao-daarion/services/router/incident_artifacts.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

107 lines
3.2 KiB
Python

"""
incident_artifacts.py — File-based artifact storage for incidents.
Layout: ops/incidents/<incident_id>/<filename>
Security:
- Path traversal guard (realpath must stay within base_dir)
- Max 2MB per artifact
- Only allowed formats: json, md, txt
- Atomic writes (temp + rename)
"""
from __future__ import annotations
import base64
import hashlib
import logging
import os
import tempfile
from pathlib import Path
from typing import Dict, Optional
logger = logging.getLogger(__name__)
MAX_ARTIFACT_BYTES = 2 * 1024 * 1024 # 2MB
ALLOWED_FORMATS = {"json", "md", "txt"}
_ARTIFACTS_BASE = os.getenv(
"INCIDENT_ARTIFACTS_DIR",
str(Path(os.getenv("REPO_ROOT", ".")) / "ops" / "incidents"),
)
def _base_dir() -> Path:
return Path(os.getenv("INCIDENT_ARTIFACTS_DIR", _ARTIFACTS_BASE))
def _safe_filename(name: str) -> str:
"""Strip path separators and dangerous chars."""
safe = "".join(c for c in name if c.isalnum() or c in (".", "_", "-"))
return safe or "artifact"
def write_artifact(
incident_id: str,
filename: str,
content_bytes: bytes,
*,
base_dir: Optional[str] = None,
) -> Dict:
"""
Write an artifact file atomically.
Returns: {"path": str, "sha256": str, "size_bytes": int}
Raises: ValueError on validation failure, OSError on write failure.
"""
if not incident_id or "/" in incident_id or ".." in incident_id:
raise ValueError(f"Invalid incident_id: {incident_id}")
if len(content_bytes) > MAX_ARTIFACT_BYTES:
raise ValueError(f"Artifact too large: {len(content_bytes)} bytes (max {MAX_ARTIFACT_BYTES})")
safe_name = _safe_filename(filename)
ext = safe_name.rsplit(".", 1)[-1].lower() if "." in safe_name else ""
if ext not in ALLOWED_FORMATS:
raise ValueError(f"Format '{ext}' not allowed. Allowed: {ALLOWED_FORMATS}")
bd = Path(base_dir) if base_dir else _base_dir()
inc_dir = bd / incident_id
inc_dir.mkdir(parents=True, exist_ok=True)
target = inc_dir / safe_name
real_base = bd.resolve()
real_target = target.resolve()
if not str(real_target).startswith(str(real_base)):
raise ValueError("Path traversal detected")
sha = hashlib.sha256(content_bytes).hexdigest()
# Atomic write: temp file → rename
fd, tmp_path = tempfile.mkstemp(dir=str(inc_dir), suffix=f".{ext}.tmp")
try:
os.write(fd, content_bytes)
os.close(fd)
os.replace(tmp_path, str(target))
except Exception:
os.close(fd) if not os.get_inheritable(fd) else None
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
rel_path = str(target.relative_to(bd.parent.parent)) if bd.parent.parent.exists() else str(target)
logger.info("Artifact written: %s (%d bytes, sha256=%s…)", rel_path, len(content_bytes), sha[:12])
return {
"path": rel_path,
"sha256": sha,
"size_bytes": len(content_bytes),
}
def decode_content(content_base64: str) -> bytes:
"""Decode base64-encoded content. Raises ValueError on failure."""
try:
return base64.b64decode(content_base64)
except Exception as exc:
raise ValueError(f"Invalid base64 content: {exc}")