Files
microdao-daarion/gateway-bot/http_api_doc.py

466 lines
16 KiB
Python

"""
Document API Endpoints
Channel-agnostic HTTP API for document operations.
Endpoints:
- POST /api/doc/parse - Parse a document
- POST /api/doc/ingest - Ingest document to RAG
- POST /api/doc/ask - Ask question about document
- POST /api/doc/update - Update existing document text (versioned)
- POST /api/doc/publish - Publish physical file version via artifact registry
- GET /api/doc/versions/{doc_id} - List document versions
- GET /api/doc/artifacts/{artifact_id}/versions/{version_id}/download - Download via gateway proxy
"""
import logging
import os
import re
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
from fastapi.responses import Response
from pydantic import BaseModel
import httpx
from services.doc_service import (
doc_service,
parse_document,
ingest_document,
ask_about_document,
update_document,
list_document_versions,
publish_document_artifact,
get_doc_context,
ParsedResult,
IngestResult,
UpdateResult,
QAResult,
DocContext
)
logger = logging.getLogger(__name__)
router = APIRouter()
ARTIFACT_REGISTRY_URL = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9220").rstrip("/")
DOC_DOWNLOAD_TIMEOUT_SECONDS = float(os.getenv("DOC_DOWNLOAD_TIMEOUT_SECONDS", "60"))
# ========================================
# Request Models
# ========================================
class ParseDocumentRequest(BaseModel):
"""Request to parse a document"""
session_id: str
doc_url: str
file_name: str
dao_id: str
user_id: str
output_mode: str = "qa_pairs" # qa_pairs, markdown, chunks
metadata: Optional[Dict[str, Any]] = None
class IngestDocumentRequest(BaseModel):
"""Request to ingest a document"""
session_id: str
doc_id: Optional[str] = None
doc_url: Optional[str] = None
file_name: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: str = "daarwizz"
class AskDocumentRequest(BaseModel):
"""Request to ask about a document"""
session_id: str
question: str
doc_id: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: str = "daarwizz"
class UpdateDocumentRequest(BaseModel):
"""Request to update existing document content."""
session_id: str
doc_id: Optional[str] = None
doc_url: Optional[str] = None
file_name: Optional[str] = None
text: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: str = "daarwizz"
storage_ref: Optional[str] = None
publish_artifact: bool = False
artifact_id: Optional[str] = None
target_format: Optional[str] = None
artifact_label: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class PublishDocumentRequest(BaseModel):
"""Request to publish document as physical artifact version."""
session_id: str
doc_id: Optional[str] = None
doc_url: Optional[str] = None
file_name: Optional[str] = None
text: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
artifact_id: Optional[str] = None
target_format: Optional[str] = None
artifact_label: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
# ========================================
# Endpoints
# ========================================
@router.post("/api/doc/parse")
async def parse_document_endpoint(request: ParseDocumentRequest):
"""
Parse a document through DAGI Router.
Accepts JSON with doc_url or can accept file upload.
Returns parsed document data (qa_pairs, markdown, or chunks).
"""
try:
result = await parse_document(
session_id=request.session_id,
doc_url=request.doc_url,
file_name=request.file_name,
dao_id=request.dao_id,
user_id=request.user_id,
output_mode=request.output_mode,
metadata=request.metadata
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
# Convert QAItem to dict for JSON response
qa_pairs_dict = None
if result.qa_pairs:
qa_pairs_dict = [{"question": qa.question, "answer": qa.answer} for qa in result.qa_pairs]
return {
"ok": True,
"doc_id": result.doc_id,
"qa_pairs": qa_pairs_dict,
"markdown": result.markdown,
"chunks_meta": result.chunks_meta,
"raw": result.raw
}
except Exception as e:
logger.error(f"Parse document error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/doc/parse/upload")
async def parse_document_upload(
file: UploadFile = File(...),
session_id: str = Form(...),
dao_id: str = Form(...),
user_id: str = Form(...),
output_mode: str = Form("qa_pairs")
):
"""
Parse a document from file upload.
Accepts multipart/form-data with file and metadata.
"""
try:
# Check file type
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are supported")
# For now, we need to upload file somewhere accessible
# TODO: Implement file storage (S3, local storage, etc.)
# For now, return error suggesting to use doc_url instead
raise HTTPException(
status_code=501,
detail="File upload not yet implemented. Please use /api/doc/parse with doc_url instead."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Parse document upload error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/doc/ingest")
async def ingest_document_endpoint(request: IngestDocumentRequest):
"""
Ingest document chunks into RAG/Memory.
Can use doc_id from previous parse, or doc_url to parse and ingest.
"""
try:
# If doc_id not provided, try to get from context
doc_id = request.doc_id
if not doc_id:
doc_context = await get_doc_context(request.session_id)
if doc_context:
doc_id = doc_context.doc_id
if not request.dao_id:
request.dao_id = doc_context.dao_id
if not request.user_id:
request.user_id = doc_context.user_id
result = await ingest_document(
session_id=request.session_id,
doc_id=doc_id,
doc_url=request.doc_url,
file_name=request.file_name,
dao_id=request.dao_id,
user_id=request.user_id,
agent_id=request.agent_id,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return {
"ok": True,
"doc_id": result.doc_id,
"ingested_chunks": result.ingested_chunks,
"status": result.status
}
except Exception as e:
logger.error(f"Ingest document error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/doc/ask")
async def ask_about_document_endpoint(request: AskDocumentRequest):
"""
Ask a question about a document using RAG query.
Uses doc_id from session context if not provided.
"""
try:
# If doc_id not provided, try to get from context
doc_id = request.doc_id
if not doc_id:
doc_context = await get_doc_context(request.session_id)
if doc_context:
doc_id = doc_context.doc_id
if not request.dao_id:
request.dao_id = doc_context.dao_id
if not request.user_id:
request.user_id = doc_context.user_id
result = await ask_about_document(
session_id=request.session_id,
question=request.question,
doc_id=doc_id,
dao_id=request.dao_id,
user_id=request.user_id,
agent_id=request.agent_id,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return {
"ok": True,
"answer": result.answer,
"doc_id": result.doc_id,
"sources": result.sources
}
except Exception as e:
logger.error(f"Ask document error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/doc/update")
async def update_document_endpoint(request: UpdateDocumentRequest):
"""
Update a document and bump its version.
If text is omitted and doc_url exists, text is re-parsed from the source document.
"""
try:
result = await update_document(
session_id=request.session_id,
doc_id=request.doc_id,
doc_url=request.doc_url,
file_name=request.file_name,
text=request.text,
dao_id=request.dao_id,
user_id=request.user_id,
agent_id=request.agent_id,
storage_ref=request.storage_ref,
publish_artifact=request.publish_artifact,
artifact_id=request.artifact_id,
target_format=request.target_format,
artifact_label=request.artifact_label,
metadata=request.metadata,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
response = {
"ok": True,
"doc_id": result.doc_id,
"version_no": result.version_no,
"version_id": result.version_id,
"updated_chunks": result.updated_chunks,
"status": result.status,
"publish_error": result.publish_error,
"artifact_id": result.artifact_id,
"artifact_version_id": result.artifact_version_id,
"artifact_storage_key": result.artifact_storage_key,
"artifact_mime": result.artifact_mime,
"artifact_download_url": result.artifact_download_url,
}
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Update document error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/doc/publish")
async def publish_document_endpoint(request: PublishDocumentRequest):
"""
Publish current document text as physical file artifact version.
"""
try:
result = await publish_document_artifact(
session_id=request.session_id,
doc_id=request.doc_id,
doc_url=request.doc_url,
file_name=request.file_name,
text=request.text,
dao_id=request.dao_id,
user_id=request.user_id,
artifact_id=request.artifact_id,
target_format=request.target_format,
artifact_label=request.artifact_label,
metadata=request.metadata,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return {
"ok": True,
"artifact_id": result.artifact_id,
"version_id": result.version_id,
"storage_key": result.storage_key,
"mime": result.mime,
"file_name": result.file_name,
"download_url": result.download_url,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Publish document error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/doc/versions/{doc_id}")
async def list_document_versions_endpoint(doc_id: str, agent_id: str = "daarwizz", limit: int = 20):
"""
List document versions for agent/doc pair.
"""
try:
data = await list_document_versions(agent_id=agent_id, doc_id=doc_id, limit=limit)
if not data.get("ok"):
raise HTTPException(status_code=400, detail=data.get("error", "Failed to load versions"))
return data
except HTTPException:
raise
except Exception as e:
logger.error(f"List document versions error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/doc/context/{session_id}")
async def get_document_context(session_id: str):
"""
Get document context for a session.
Returns the last parsed document ID and metadata for the session.
"""
try:
context = await get_doc_context(session_id)
if not context:
raise HTTPException(status_code=404, detail="No document context found")
return {
"ok": True,
"context": {
"doc_id": context.doc_id,
"dao_id": context.dao_id,
"user_id": context.user_id,
"doc_url": context.doc_url,
"file_name": context.file_name,
"saved_at": context.saved_at
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Get document context error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/doc/artifacts/{artifact_id}/versions/{version_id}/download")
async def download_artifact_version_via_gateway(
artifact_id: str,
version_id: str,
filename: Optional[str] = None,
inline: bool = False,
):
"""
Proxy download for artifact version to avoid exposing internal MinIO host to browser clients.
"""
aid = (artifact_id or "").strip()
vid = (version_id or "").strip()
if not aid or not vid:
raise HTTPException(status_code=400, detail="artifact_id and version_id are required")
try:
async with httpx.AsyncClient(timeout=DOC_DOWNLOAD_TIMEOUT_SECONDS) as client:
meta_resp = await client.get(
f"{ARTIFACT_REGISTRY_URL}/artifacts/{aid}/versions/{vid}/download"
)
if meta_resp.status_code >= 400:
detail = ""
try:
detail = meta_resp.json().get("detail") # type: ignore[assignment]
except Exception:
detail = meta_resp.text[:200]
raise HTTPException(status_code=meta_resp.status_code, detail=detail or "Version download info failed")
meta = meta_resp.json()
signed_url = (meta.get("url") or "").strip()
if not signed_url:
raise HTTPException(status_code=502, detail="artifact-registry returned empty download URL")
file_resp = await client.get(signed_url)
if file_resp.status_code >= 400:
raise HTTPException(status_code=502, detail=f"Artifact storage download failed: {file_resp.status_code}")
mime = (meta.get("mime") or file_resp.headers.get("content-type") or "application/octet-stream").strip()
storage_key = str(meta.get("storage_key") or "")
inferred_name = storage_key.rsplit("/", 1)[-1] if "/" in storage_key else storage_key
out_name = (filename or inferred_name or f"{aid}_{vid}.bin").strip()
out_name = re.sub(r"[^A-Za-z0-9._-]+", "_", out_name).strip("._") or f"{aid}_{vid}.bin"
disposition = "inline" if inline else "attachment"
headers = {
"Content-Disposition": f'{disposition}; filename="{out_name}"',
"Cache-Control": "private, max-age=60",
}
return Response(content=file_resp.content, media_type=mime, headers=headers)
except HTTPException:
raise
except Exception as e:
logger.error(f"Artifact version proxy download failed: aid={aid}, vid={vid}, err={e}", exc_info=True)
raise HTTPException(status_code=500, detail="Artifact proxy download failed")