Files
microdao-daarion/services/artifact-registry/app/main.py
2026-02-21 17:02:55 +01:00

817 lines
26 KiB
Python

"""
Artifact Registry v0
- Stores artifacts + versions + jobs in Postgres
- Stores payloads in MinIO
- Publishes render jobs to NATS
"""
import asyncio
import base64
import hashlib
import json
import logging
import os
import re
import uuid
from io import BytesIO
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import asyncpg
import httpx
from fastapi import FastAPI, HTTPException, Query
from minio import Minio
from minio.error import S3Error
from nats.aio.client import Client as NATS
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
APP_VERSION = "0.1.0"
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "dagi-postgres")
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432"))
POSTGRES_USER = os.getenv("POSTGRES_USER", "daarion")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "DaarionDB2026!")
POSTGRES_DB = os.getenv("POSTGRES_DB", "daarion_main")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "artifacts")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
pool: Optional[asyncpg.Pool] = None
nats_client: Optional[NATS] = None
minio_client: Optional[Minio] = None
app = FastAPI(
title="Artifact Registry",
version=APP_VERSION,
description="Registry for presentations/docs artifacts"
)
class PresentationRenderRequest(BaseModel):
brand_id: str
project_id: Optional[str] = None
title: str
slides: List[str]
theme_id: Optional[str] = "default-v1"
version_label: Optional[str] = "source"
acl_ref: Optional[str] = None
class PresentationRenderResponse(BaseModel):
artifact_id: str
input_version_id: str
job_id: str
status_url: str
class ArtifactCreateRequest(BaseModel):
type: str
title: Optional[str] = None
brand_id: Optional[str] = None
project_id: Optional[str] = None
acl_ref: Optional[str] = None
created_by: Optional[str] = None
class ArtifactCreateResponse(BaseModel):
artifact_id: str
class ArtifactVersionFromUrlRequest(BaseModel):
url: str
mime: str
label: Optional[str] = "source"
meta_json: Optional[Dict[str, Any]] = None
class ArtifactVersionFromBase64Request(BaseModel):
content_base64: str
mime: str
filename: Optional[str] = "source.bin"
label: Optional[str] = "source"
meta_json: Optional[Dict[str, Any]] = None
class ArtifactVersionResponse(BaseModel):
version_id: str
storage_key: str
sha256: str
size_bytes: int
class ArtifactVersionCreateRequest(BaseModel):
storage_key: str
sha256: str
mime: str
size_bytes: int
label: Optional[str] = "source"
meta_json: Optional[Dict[str, Any]] = None
class ArtifactJobRequest(BaseModel):
job_type: str
input_version_id: Optional[str] = None
force: Optional[bool] = False
class ArtifactJobResponse(BaseModel):
job_id: str
status_url: str
class JobCompleteRequest(BaseModel):
output_storage_key: str
mime: str
size_bytes: int
sha256: str
label: Optional[str] = "pptx"
class JobDoneRequest(BaseModel):
note: Optional[str] = None
meta_json: Optional[Dict[str, Any]] = None
class JobFailRequest(BaseModel):
error_text: str
SQL_CREATE = """
create table if not exists artifacts (
id text primary key,
type text not null check (type in ('presentation','doc')),
title text,
brand_id text,
project_id text,
acl_ref text,
created_by text,
created_at timestamptz not null default now()
);
create table if not exists artifact_versions (
id text primary key,
artifact_id text not null references artifacts(id) on delete cascade,
label text,
sha256 text not null,
mime text not null,
size_bytes bigint not null,
storage_key text not null,
meta_json jsonb not null default '{}'::jsonb,
created_at timestamptz not null default now(),
unique (artifact_id, sha256)
);
create table if not exists artifact_jobs (
id text primary key,
artifact_id text not null references artifacts(id) on delete cascade,
input_version_id text not null references artifact_versions(id),
job_type text not null check (job_type in ('render_pptx','render_pdf','index_doc')),
status text not null check (status in ('queued','running','done','failed')),
output_version_id text references artifact_versions(id),
error_text text,
attempts int not null default 0,
locked_at timestamptz,
locked_by text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists idx_jobs_status on artifact_jobs(status, job_type);
create index if not exists idx_versions_artifact on artifact_versions(artifact_id, created_at desc);
"""
def _now() -> str:
return datetime.utcnow().isoformat() + "Z"
def _storage_key(artifact_id: str, version_id: str, filename: str) -> str:
return f"artifacts/{artifact_id}/versions/{version_id}/{filename}"
def _hash_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _normalize_meta_json(meta: Any) -> Dict[str, Any]:
if meta is None:
return {}
if isinstance(meta, dict):
return meta
if isinstance(meta, str):
try:
parsed = json.loads(meta)
if isinstance(parsed, dict):
return parsed
except Exception:
return {}
return {}
def _format_to_mime(fmt: str) -> str:
fmt = fmt.lower()
if "/" in fmt:
return fmt
if fmt == "pptx":
return "application/vnd.openxmlformats-officedocument.presentationml.presentation"
if fmt == "pdf":
return "application/pdf"
if fmt == "source":
return "application/json"
if fmt == "docx":
return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
if fmt == "xlsx":
return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
if fmt == "txt":
return "text/plain; charset=utf-8"
if fmt == "md":
return "text/markdown; charset=utf-8"
if fmt == "json":
return "application/json"
if fmt == "csv":
return "text/csv; charset=utf-8"
return "application/octet-stream"
def _safe_filename(name: Optional[str], fallback: str = "source.bin") -> str:
raw = (name or fallback).strip() or fallback
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", raw)
cleaned = cleaned.strip("._")
if not cleaned:
return fallback
return cleaned[:120]
async def _download_bytes(url: str) -> bytes:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.get(url)
if resp.status_code >= 400:
raise HTTPException(status_code=502, detail=f"Failed to download url: {resp.status_code}")
return resp.content
async def _ensure_minio() -> None:
global minio_client
minio_client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE,
)
if not minio_client.bucket_exists(MINIO_BUCKET):
minio_client.make_bucket(MINIO_BUCKET)
async def _ensure_nats() -> None:
global nats_client
nats_client = NATS()
await nats_client.connect(servers=[NATS_URL])
async def _ensure_db() -> None:
global pool
pool = await asyncpg.create_pool(
host=POSTGRES_HOST,
port=POSTGRES_PORT,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
database=POSTGRES_DB,
min_size=1,
max_size=5,
)
async with pool.acquire() as conn:
await conn.execute(SQL_CREATE)
await conn.execute(
"alter table artifact_jobs add column if not exists meta_json jsonb default '{}'::jsonb"
)
@app.on_event("startup")
async def startup() -> None:
await _ensure_db()
await _ensure_minio()
await _ensure_nats()
logger.info("Artifact Registry started")
@app.on_event("shutdown")
async def shutdown() -> None:
if nats_client:
await nats_client.drain()
if pool:
await pool.close()
@app.get("/")
async def root() -> Dict[str, Any]:
return {"service": "artifact-registry", "version": APP_VERSION}
@app.get("/health")
async def health() -> Dict[str, Any]:
return {"status": "healthy"}
@app.post("/presentations/render", response_model=PresentationRenderResponse)
async def presentations_render(req: PresentationRenderRequest) -> PresentationRenderResponse:
if not req.slides:
raise HTTPException(status_code=400, detail="Slides list is empty")
artifact_id = f"art_{uuid.uuid4().hex}"
version_id = f"ver_{uuid.uuid4().hex}"
job_id = f"job_{uuid.uuid4().hex}"
slidespec = {
"schema": "slidespec.v1",
"artifact_id": artifact_id,
"title": req.title,
"brand_id": req.brand_id,
"theme_id": req.theme_id or "default-v1",
"slides": [{"type": "title", "title": req.title}],
"meta": {"lang": "uk"},
}
for item in req.slides:
if item.strip() and item.strip() != req.title:
slidespec["slides"].append({"type": "section", "title": item})
payload = json.dumps(slidespec, ensure_ascii=False).encode("utf-8")
sha256 = _hash_bytes(payload)
storage_key = _storage_key(artifact_id, version_id, "slidespec.json")
if not minio_client:
raise HTTPException(status_code=500, detail="MinIO not available")
try:
minio_client.put_object(
MINIO_BUCKET,
storage_key,
data=payload,
length=len(payload),
content_type="application/json",
)
except S3Error as e:
raise HTTPException(status_code=502, detail=f"MinIO error: {e}")
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"""
insert into artifacts (id, type, title, brand_id, project_id, acl_ref, created_by)
values ($1, 'presentation', $2, $3, $4, $5, 'gateway')
""",
artifact_id,
req.title,
req.brand_id,
req.project_id,
req.acl_ref,
)
await conn.execute(
"""
insert into artifact_versions
(id, artifact_id, label, sha256, mime, size_bytes, storage_key, meta_json)
values ($1, $2, $3, $4, $5, $6, $7, $8)
""",
version_id,
artifact_id,
req.version_label or "source",
sha256,
"application/json",
len(payload),
storage_key,
json.dumps({"theme_id": req.theme_id or "default-v1"}),
)
await conn.execute(
"""
insert into artifact_jobs (id, artifact_id, input_version_id, job_type, status)
values ($1, $2, $3, 'render_pptx', 'queued')
""",
job_id,
artifact_id,
version_id,
)
if nats_client:
msg = {
"job_id": job_id,
"artifact_id": artifact_id,
"input_version_id": version_id,
"storage_key": storage_key,
"theme_id": req.theme_id or "default-v1",
"brand_id": req.brand_id,
"acl_ref": req.acl_ref,
"project_id": req.project_id,
}
await nats_client.publish("artifact.job.render_pptx.requested", json.dumps(msg).encode("utf-8"))
return PresentationRenderResponse(
artifact_id=artifact_id,
input_version_id=version_id,
job_id=job_id,
status_url=f"/jobs/{job_id}",
)
@app.post("/artifacts", response_model=ArtifactCreateResponse)
async def create_artifact(req: ArtifactCreateRequest) -> ArtifactCreateResponse:
if req.type not in {"presentation", "doc"}:
raise HTTPException(status_code=400, detail="Invalid artifact type")
artifact_id = f"art_{uuid.uuid4().hex}"
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
await conn.execute(
"""
insert into artifacts (id, type, title, brand_id, project_id, acl_ref, created_by)
values ($1, $2, $3, $4, $5, $6, $7)
""",
artifact_id,
req.type,
req.title,
req.brand_id,
req.project_id,
req.acl_ref,
req.created_by,
)
return ArtifactCreateResponse(artifact_id=artifact_id)
@app.post("/artifacts/{artifact_id}/versions/from_url", response_model=ArtifactVersionResponse)
async def add_version_from_url(artifact_id: str, payload: ArtifactVersionFromUrlRequest) -> ArtifactVersionResponse:
if not minio_client:
raise HTTPException(status_code=500, detail="MinIO not available")
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
version_id = f"ver_{uuid.uuid4().hex}"
content = await _download_bytes(payload.url)
sha256 = _hash_bytes(content)
storage_key = _storage_key(artifact_id, version_id, "source.bin")
try:
minio_client.put_object(
MINIO_BUCKET,
storage_key,
data=BytesIO(content),
length=len(content),
content_type=payload.mime,
)
except S3Error as e:
raise HTTPException(status_code=502, detail=f"MinIO error: {e}")
meta_json = _normalize_meta_json(payload.meta_json)
async with pool.acquire() as conn:
await conn.execute(
"""
insert into artifact_versions
(id, artifact_id, label, sha256, mime, size_bytes, storage_key, meta_json)
values ($1, $2, $3, $4, $5, $6, $7, $8)
""",
version_id,
artifact_id,
payload.label or "source",
sha256,
payload.mime,
len(content),
storage_key,
json.dumps(meta_json),
)
return ArtifactVersionResponse(
version_id=version_id,
storage_key=storage_key,
sha256=sha256,
size_bytes=len(content),
)
@app.post("/artifacts/{artifact_id}/versions/from_base64", response_model=ArtifactVersionResponse)
async def add_version_from_base64(artifact_id: str, payload: ArtifactVersionFromBase64Request) -> ArtifactVersionResponse:
if not minio_client:
raise HTTPException(status_code=500, detail="MinIO not available")
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
raw = (payload.content_base64 or "").strip()
if not raw:
raise HTTPException(status_code=400, detail="content_base64 is required")
if raw.startswith("data:") and "," in raw:
raw = raw.split(",", 1)[1]
try:
content = base64.b64decode(raw, validate=True)
except Exception:
raise HTTPException(status_code=400, detail="Invalid base64 payload")
if not content:
raise HTTPException(status_code=400, detail="Decoded payload is empty")
version_id = f"ver_{uuid.uuid4().hex}"
filename = _safe_filename(payload.filename, fallback="source.bin")
sha256 = _hash_bytes(content)
storage_key = _storage_key(artifact_id, version_id, filename)
try:
minio_client.put_object(
MINIO_BUCKET,
storage_key,
data=BytesIO(content),
length=len(content),
content_type=payload.mime,
)
except S3Error as e:
raise HTTPException(status_code=502, detail=f"MinIO error: {e}")
meta_json = _normalize_meta_json(payload.meta_json)
if "file_name" not in meta_json:
meta_json["file_name"] = filename
async with pool.acquire() as conn:
await conn.execute(
"""
insert into artifact_versions
(id, artifact_id, label, sha256, mime, size_bytes, storage_key, meta_json)
values ($1, $2, $3, $4, $5, $6, $7, $8)
""",
version_id,
artifact_id,
payload.label or "source",
sha256,
payload.mime,
len(content),
storage_key,
json.dumps(meta_json),
)
return ArtifactVersionResponse(
version_id=version_id,
storage_key=storage_key,
sha256=sha256,
size_bytes=len(content),
)
@app.post("/artifacts/{artifact_id}/versions", response_model=ArtifactVersionResponse)
async def add_version(artifact_id: str, payload: ArtifactVersionCreateRequest) -> ArtifactVersionResponse:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
version_id = f"ver_{uuid.uuid4().hex}"
meta_json = _normalize_meta_json(payload.meta_json)
async with pool.acquire() as conn:
await conn.execute(
"""
insert into artifact_versions
(id, artifact_id, label, sha256, mime, size_bytes, storage_key, meta_json)
values ($1, $2, $3, $4, $5, $6, $7, $8)
""",
version_id,
artifact_id,
payload.label or "source",
payload.sha256,
payload.mime,
payload.size_bytes,
payload.storage_key,
json.dumps(meta_json),
)
return ArtifactVersionResponse(
version_id=version_id,
storage_key=payload.storage_key,
sha256=payload.sha256,
size_bytes=payload.size_bytes,
)
@app.post("/artifacts/{artifact_id}/jobs", response_model=ArtifactJobResponse)
async def create_job(artifact_id: str, payload: ArtifactJobRequest) -> ArtifactJobResponse:
if payload.job_type not in {"render_pptx", "render_pdf", "index_doc"}:
raise HTTPException(status_code=400, detail="Invalid job type")
job_id = f"job_{uuid.uuid4().hex}"
input_version_id = payload.input_version_id
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
artifact = await conn.fetchrow("select * from artifacts where id=$1", artifact_id)
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
if not input_version_id:
if payload.job_type == "index_doc":
row = await conn.fetchrow(
"""
select id from artifact_versions
where artifact_id=$1 and label='source'
order by created_at desc limit 1
""",
artifact_id,
)
if not row:
raise HTTPException(status_code=400, detail="Source version not found")
input_version_id = row["id"]
else:
raise HTTPException(status_code=400, detail="input_version_id is required")
await conn.execute(
"""
insert into artifact_jobs (id, artifact_id, input_version_id, job_type, status, meta_json)
values ($1, $2, $3, $4, 'queued', $5)
""",
job_id,
artifact_id,
input_version_id,
payload.job_type,
json.dumps({"force": bool(payload.force)} if payload.force else {}),
)
if nats_client:
subject = f"artifact.job.{payload.job_type}.requested"
msg = {
"job_id": job_id,
"artifact_id": artifact_id,
"input_version_id": input_version_id,
"acl_ref": artifact.get("acl_ref"),
"brand_id": artifact.get("brand_id"),
"project_id": artifact.get("project_id"),
"force": bool(payload.force),
}
await nats_client.publish(subject, json.dumps(msg).encode("utf-8"))
return ArtifactJobResponse(job_id=job_id, status_url=f"/jobs/{job_id}")
@app.get("/jobs/{job_id}")
async def job_status(job_id: str) -> Dict[str, Any]:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
row = await conn.fetchrow("select * from artifact_jobs where id=$1", job_id)
if not row:
raise HTTPException(status_code=404, detail="Job not found")
data = dict(row)
data["meta_json"] = _normalize_meta_json(data.get("meta_json"))
return data
@app.post("/jobs/{job_id}/complete")
async def job_complete(job_id: str, payload: JobCompleteRequest) -> Dict[str, Any]:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
output_version_id = f"ver_{uuid.uuid4().hex}"
async with pool.acquire() as conn:
async with conn.transaction():
job = await conn.fetchrow("select * from artifact_jobs where id=$1", job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
await conn.execute(
"""
insert into artifact_versions
(id, artifact_id, label, sha256, mime, size_bytes, storage_key)
values ($1, $2, $3, $4, $5, $6, $7)
""",
output_version_id,
job["artifact_id"],
payload.label or "pptx",
payload.sha256,
payload.mime,
payload.size_bytes,
payload.output_storage_key,
)
await conn.execute(
"""
update artifact_jobs
set status='done', output_version_id=$1, updated_at=now()
where id=$2
""",
output_version_id,
job_id,
)
return {"status": "done", "output_version_id": output_version_id}
@app.post("/jobs/{job_id}/done")
async def job_done(job_id: str, payload: JobDoneRequest) -> Dict[str, Any]:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
meta_json = _normalize_meta_json(payload.meta_json)
async with pool.acquire() as conn:
await conn.execute(
"""
update artifact_jobs
set status='done', updated_at=now(), meta_json=$2
where id=$1
""",
job_id,
json.dumps(meta_json),
)
return {"status": "done"}
@app.post("/jobs/{job_id}/fail")
async def job_fail(job_id: str, payload: JobFailRequest) -> Dict[str, Any]:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
await conn.execute(
"""
update artifact_jobs
set status='failed', error_text=$1, updated_at=now()
where id=$2
""",
payload.error_text[:1000],
job_id,
)
return {"status": "failed"}
@app.get("/artifacts/{artifact_id}")
async def get_artifact(artifact_id: str) -> Dict[str, Any]:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
row = await conn.fetchrow("select * from artifacts where id=$1", artifact_id)
if not row:
raise HTTPException(status_code=404, detail="Artifact not found")
return dict(row)
@app.get("/artifacts/{artifact_id}/versions")
async def get_versions(artifact_id: str) -> Dict[str, Any]:
if not pool:
raise HTTPException(status_code=500, detail="DB not available")
async with pool.acquire() as conn:
rows = await conn.fetch(
"select * from artifact_versions where artifact_id=$1 order by created_at desc",
artifact_id,
)
items = []
for r in rows:
data = dict(r)
data["meta_json"] = _normalize_meta_json(data.get("meta_json"))
items.append(data)
return {"items": items}
@app.get("/artifacts/{artifact_id}/download")
async def download_artifact(artifact_id: str, format: str = Query("pptx")) -> Dict[str, Any]:
if not pool or not minio_client:
raise HTTPException(status_code=500, detail="Service not available")
mime = _format_to_mime(format)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select * from artifact_versions
where artifact_id=$1 and mime=$2
order by created_at desc limit 1
""",
artifact_id,
mime,
)
if not row:
raise HTTPException(status_code=404, detail="Version not found")
try:
url = minio_client.presigned_get_object(
MINIO_BUCKET,
row["storage_key"],
expires=timedelta(seconds=1800),
)
except S3Error as e:
raise HTTPException(status_code=502, detail=f"MinIO error: {e}")
return {"url": url, "storage_key": row["storage_key"], "mime": row["mime"]}
@app.get("/artifacts/{artifact_id}/versions/{version_id}/download")
async def download_artifact_version(artifact_id: str, version_id: str) -> Dict[str, Any]:
if not pool or not minio_client:
raise HTTPException(status_code=500, detail="Service not available")
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select * from artifact_versions
where artifact_id=$1 and id=$2
limit 1
""",
artifact_id,
version_id,
)
if not row:
raise HTTPException(status_code=404, detail="Version not found")
try:
url = minio_client.presigned_get_object(
MINIO_BUCKET,
row["storage_key"],
expires=timedelta(seconds=1800),
)
except S3Error as e:
raise HTTPException(status_code=502, detail=f"MinIO error: {e}")
return {"url": url, "storage_key": row["storage_key"], "mime": row["mime"], "version_id": row["id"]}