Files
microdao-daarion/services/clan-consent-adapter/app.py

226 lines
6.9 KiB
Python

import json
import os
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional
from fastapi import Depends, FastAPI, Header, HTTPException
from pydantic import BaseModel, Field
DB_PATH = Path(os.getenv("CLAN_CONSENT_DB_PATH", "/data/clan_consent.sqlite"))
API_KEY = os.getenv("CLAN_ADAPTER_API_KEY", "").strip()
VISIBILITY = {"public", "interclan", "incircle", "soulsafe", "sacred"}
app = FastAPI(title="CLAN Consent Adapter", version="1.0.0")
def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def _connect() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _init_db() -> None:
conn = _connect()
try:
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS consent_events (
id TEXT PRIMARY KEY,
circle TEXT,
subject TEXT,
method TEXT,
signers_json TEXT,
caveats TEXT,
visibility_level TEXT,
provenance_json TEXT,
created_at TEXT NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS testimony_drafts (
id TEXT PRIMARY KEY,
title TEXT,
circle TEXT,
visibility_level TEXT,
content_json TEXT,
status TEXT,
provenance_json TEXT,
consent_event_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.commit()
finally:
conn.close()
class ConsentCreate(BaseModel):
circle: str
subject: str = Field(description="decision/testimony/bridge/rights")
method: str = "live_presence"
signers: list[str] = Field(default_factory=list)
caveats: Optional[str] = None
visibility_level: str = "incircle"
provenance: Dict[str, Any] = Field(default_factory=dict)
class TestimonyDraftCreate(BaseModel):
title: str
circle: str
visibility_level: str = "incircle"
content: Dict[str, Any] = Field(default_factory=dict)
status: str = "draft"
provenance: Dict[str, Any] = Field(default_factory=dict)
consent_event_id: Optional[str] = None
def _auth(authorization: Optional[str] = Header(default=None)) -> None:
if not API_KEY:
return
expected = f"Bearer {API_KEY}"
if authorization != expected:
raise HTTPException(status_code=401, detail="Unauthorized")
def _check_visibility(level: str) -> str:
v = (level or "").strip().lower()
if v not in VISIBILITY:
raise HTTPException(status_code=400, detail=f"invalid visibility_level: {level}")
return v
@app.on_event("startup")
def startup() -> None:
_init_db()
@app.get("/health")
def health() -> Dict[str, Any]:
return {"status": "ok", "service": "clan-consent-adapter"}
@app.post("/consent/events")
def create_consent_event(body: ConsentCreate, _: None = Depends(_auth)) -> Dict[str, Any]:
cid = str(uuid.uuid4())
visibility = _check_visibility(body.visibility_level)
now = _utc_now()
conn = _connect()
try:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO consent_events(id, circle, subject, method, signers_json, caveats, visibility_level, provenance_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
cid,
body.circle,
body.subject,
body.method,
json.dumps(body.signers, ensure_ascii=False),
body.caveats or "",
visibility,
json.dumps(body.provenance, ensure_ascii=False),
now,
),
)
conn.commit()
finally:
conn.close()
return {"consent_event_id": cid, "status": "confirmed", "visibility_level": visibility}
@app.get("/consent/events/{event_id}")
def get_consent_event(event_id: str, _: None = Depends(_auth)) -> Dict[str, Any]:
conn = _connect()
try:
cur = conn.cursor()
cur.execute("SELECT * FROM consent_events WHERE id = ?", (event_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
raise HTTPException(status_code=404, detail="consent_event_not_found")
return {
"id": row["id"],
"circle": row["circle"],
"subject": row["subject"],
"method": row["method"],
"signers": json.loads(row["signers_json"] or "[]"),
"caveats": row["caveats"],
"visibility_level": row["visibility_level"],
"provenance": json.loads(row["provenance_json"] or "{}"),
"created_at": row["created_at"],
}
@app.post("/testimony/drafts")
def create_testimony_draft(body: TestimonyDraftCreate, _: None = Depends(_auth)) -> Dict[str, Any]:
tid = str(uuid.uuid4())
visibility = _check_visibility(body.visibility_level)
status = body.status if body.status in {"draft", "needs_confirmation", "confirmed"} else "draft"
now = _utc_now()
conn = _connect()
try:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO testimony_drafts(id, title, circle, visibility_level, content_json, status, provenance_json, consent_event_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tid,
body.title,
body.circle,
visibility,
json.dumps(body.content, ensure_ascii=False),
status,
json.dumps(body.provenance, ensure_ascii=False),
body.consent_event_id,
now,
now,
),
)
conn.commit()
finally:
conn.close()
return {"testimony_id": tid, "status": status, "visibility_level": visibility}
@app.get("/testimony/drafts/{testimony_id}")
def get_testimony_draft(testimony_id: str, _: None = Depends(_auth)) -> Dict[str, Any]:
conn = _connect()
try:
cur = conn.cursor()
cur.execute("SELECT * FROM testimony_drafts WHERE id = ?", (testimony_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
raise HTTPException(status_code=404, detail="testimony_not_found")
return {
"id": row["id"],
"title": row["title"],
"circle": row["circle"],
"visibility_level": row["visibility_level"],
"content": json.loads(row["content_json"] or "{}"),
"status": row["status"],
"provenance": json.loads(row["provenance_json"] or "{}"),
"consent_event_id": row["consent_event_id"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}