#!/usr/bin/env python3 """ Idempotent DDL migration for Postgres audit backend. Creates the `tool_audit_events` table and its indexes if they don't already exist. Usage: python3 ops/scripts/migrate_audit_postgres.py DATABASE_URL=postgresql://user:pass@host/db python3 ops/scripts/migrate_audit_postgres.py --dry-run Environment variables: DATABASE_URL — PostgreSQL DSN (required). Exit codes: 0 — success / already up-to-date 1 — error """ from __future__ import annotations import argparse import os import sys import textwrap # ─── DDL ───────────────────────────────────────────────────────────────────── DDL = textwrap.dedent("""\ -- Audit events table (idempotent) CREATE TABLE IF NOT EXISTS tool_audit_events ( id BIGSERIAL PRIMARY KEY, ts TIMESTAMPTZ NOT NULL, req_id TEXT NOT NULL, workspace_id TEXT NOT NULL, user_id TEXT NOT NULL, agent_id TEXT NOT NULL, tool TEXT NOT NULL, action TEXT NOT NULL, status TEXT NOT NULL, duration_ms INT NOT NULL DEFAULT 0, in_size INT NOT NULL DEFAULT 0, out_size INT NOT NULL DEFAULT 0, input_hash TEXT NOT NULL DEFAULT '', graph_run_id TEXT, graph_node TEXT, job_id TEXT ); -- Indexes (idempotent) CREATE INDEX IF NOT EXISTS idx_tool_audit_ts ON tool_audit_events (ts); CREATE INDEX IF NOT EXISTS idx_tool_audit_ws_ts ON tool_audit_events (workspace_id, ts); CREATE INDEX IF NOT EXISTS idx_tool_audit_tool_ts ON tool_audit_events (tool, ts); CREATE INDEX IF NOT EXISTS idx_tool_audit_agent_ts ON tool_audit_events (agent_id, ts); """) # ─── Runner ─────────────────────────────────────────────────────────────────── def run(dsn: str, dry_run: bool) -> int: """Execute migration against Postgres. Returns 0 on success, 1 on error.""" try: import psycopg2 # type: ignore except ImportError: try: import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "psycopg2-binary"]) import psycopg2 # type: ignore # noqa: F811 except Exception as pip_err: print(f"[ERROR] psycopg2 not available and could not be installed: {pip_err}", file=sys.stderr) return 1 print(f"[migrate] Connecting to: {dsn[:40]}…") if dry_run: print("[migrate] DRY-RUN — printing DDL only, not executing:\n") print(DDL) return 0 try: conn = psycopg2.connect(dsn) conn.autocommit = False cur = conn.cursor() cur.execute(DDL) conn.commit() cur.close() conn.close() print("[migrate] ✅ Migration applied successfully.") return 0 except Exception as exc: print(f"[migrate] ❌ Migration failed: {exc}", file=sys.stderr) return 1 def main() -> None: parser = argparse.ArgumentParser(description="Idempotent Postgres audit DDL migration") parser.add_argument("--dry-run", action="store_true", help="Print DDL without executing") parser.add_argument( "--dsn", default=os.getenv("DATABASE_URL") or os.getenv("POSTGRES_DSN", ""), help="PostgreSQL DSN (default: $DATABASE_URL)", ) args = parser.parse_args() if not args.dsn: print("[migrate] ERROR: DATABASE_URL not set. Provide --dsn or set DATABASE_URL.", file=sys.stderr) sys.exit(1) sys.exit(run(args.dsn, args.dry_run)) if __name__ == "__main__": main()