""" Document API Endpoints Channel-agnostic HTTP API for document operations. Endpoints: - POST /api/doc/parse - Parse a document - POST /api/doc/ingest - Ingest document to RAG - POST /api/doc/ask - Ask question about document - POST /api/doc/update - Update existing document text (versioned) - POST /api/doc/publish - Publish physical file version via artifact registry - GET /api/doc/versions/{doc_id} - List document versions - GET /api/doc/artifacts/{artifact_id}/versions/{version_id}/download - Download via gateway proxy """ import logging import os import re from typing import Optional, Dict, Any from fastapi import APIRouter, HTTPException, UploadFile, File, Form from fastapi.responses import Response from pydantic import BaseModel import httpx from services.doc_service import ( doc_service, parse_document, ingest_document, ask_about_document, update_document, list_document_versions, publish_document_artifact, get_doc_context, ParsedResult, IngestResult, UpdateResult, QAResult, DocContext ) logger = logging.getLogger(__name__) router = APIRouter() ARTIFACT_REGISTRY_URL = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9220").rstrip("/") DOC_DOWNLOAD_TIMEOUT_SECONDS = float(os.getenv("DOC_DOWNLOAD_TIMEOUT_SECONDS", "60")) # ======================================== # Request Models # ======================================== class ParseDocumentRequest(BaseModel): """Request to parse a document""" session_id: str doc_url: str file_name: str dao_id: str user_id: str output_mode: str = "qa_pairs" # qa_pairs, markdown, chunks metadata: Optional[Dict[str, Any]] = None class IngestDocumentRequest(BaseModel): """Request to ingest a document""" session_id: str doc_id: Optional[str] = None doc_url: Optional[str] = None file_name: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None agent_id: str = "daarwizz" class AskDocumentRequest(BaseModel): """Request to ask about a document""" session_id: str question: str doc_id: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None agent_id: str = "daarwizz" class UpdateDocumentRequest(BaseModel): """Request to update existing document content.""" session_id: str doc_id: Optional[str] = None doc_url: Optional[str] = None file_name: Optional[str] = None text: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None agent_id: str = "daarwizz" storage_ref: Optional[str] = None publish_artifact: bool = False artifact_id: Optional[str] = None target_format: Optional[str] = None artifact_label: Optional[str] = None metadata: Optional[Dict[str, Any]] = None class PublishDocumentRequest(BaseModel): """Request to publish document as physical artifact version.""" session_id: str doc_id: Optional[str] = None doc_url: Optional[str] = None file_name: Optional[str] = None text: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None artifact_id: Optional[str] = None target_format: Optional[str] = None artifact_label: Optional[str] = None metadata: Optional[Dict[str, Any]] = None # ======================================== # Endpoints # ======================================== @router.post("/api/doc/parse") async def parse_document_endpoint(request: ParseDocumentRequest): """ Parse a document through DAGI Router. Accepts JSON with doc_url or can accept file upload. Returns parsed document data (qa_pairs, markdown, or chunks). """ try: result = await parse_document( session_id=request.session_id, doc_url=request.doc_url, file_name=request.file_name, dao_id=request.dao_id, user_id=request.user_id, output_mode=request.output_mode, metadata=request.metadata ) if not result.success: raise HTTPException(status_code=400, detail=result.error) # Convert QAItem to dict for JSON response qa_pairs_dict = None if result.qa_pairs: qa_pairs_dict = [{"question": qa.question, "answer": qa.answer} for qa in result.qa_pairs] return { "ok": True, "doc_id": result.doc_id, "qa_pairs": qa_pairs_dict, "markdown": result.markdown, "chunks_meta": result.chunks_meta, "raw": result.raw } except Exception as e: logger.error(f"Parse document error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/doc/parse/upload") async def parse_document_upload( file: UploadFile = File(...), session_id: str = Form(...), dao_id: str = Form(...), user_id: str = Form(...), output_mode: str = Form("qa_pairs") ): """ Parse a document from file upload. Accepts multipart/form-data with file and metadata. """ try: # Check file type if not file.filename or not file.filename.lower().endswith(".pdf"): raise HTTPException(status_code=400, detail="Only PDF files are supported") # For now, we need to upload file somewhere accessible # TODO: Implement file storage (S3, local storage, etc.) # For now, return error suggesting to use doc_url instead raise HTTPException( status_code=501, detail="File upload not yet implemented. Please use /api/doc/parse with doc_url instead." ) except HTTPException: raise except Exception as e: logger.error(f"Parse document upload error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/doc/ingest") async def ingest_document_endpoint(request: IngestDocumentRequest): """ Ingest document chunks into RAG/Memory. Can use doc_id from previous parse, or doc_url to parse and ingest. """ try: # If doc_id not provided, try to get from context doc_id = request.doc_id if not doc_id: doc_context = await get_doc_context(request.session_id) if doc_context: doc_id = doc_context.doc_id if not request.dao_id: request.dao_id = doc_context.dao_id if not request.user_id: request.user_id = doc_context.user_id result = await ingest_document( session_id=request.session_id, doc_id=doc_id, doc_url=request.doc_url, file_name=request.file_name, dao_id=request.dao_id, user_id=request.user_id, agent_id=request.agent_id, ) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "ok": True, "doc_id": result.doc_id, "ingested_chunks": result.ingested_chunks, "status": result.status } except Exception as e: logger.error(f"Ingest document error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/doc/ask") async def ask_about_document_endpoint(request: AskDocumentRequest): """ Ask a question about a document using RAG query. Uses doc_id from session context if not provided. """ try: # If doc_id not provided, try to get from context doc_id = request.doc_id if not doc_id: doc_context = await get_doc_context(request.session_id) if doc_context: doc_id = doc_context.doc_id if not request.dao_id: request.dao_id = doc_context.dao_id if not request.user_id: request.user_id = doc_context.user_id result = await ask_about_document( session_id=request.session_id, question=request.question, doc_id=doc_id, dao_id=request.dao_id, user_id=request.user_id, agent_id=request.agent_id, ) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "ok": True, "answer": result.answer, "doc_id": result.doc_id, "sources": result.sources } except Exception as e: logger.error(f"Ask document error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/doc/update") async def update_document_endpoint(request: UpdateDocumentRequest): """ Update a document and bump its version. If text is omitted and doc_url exists, text is re-parsed from the source document. """ try: result = await update_document( session_id=request.session_id, doc_id=request.doc_id, doc_url=request.doc_url, file_name=request.file_name, text=request.text, dao_id=request.dao_id, user_id=request.user_id, agent_id=request.agent_id, storage_ref=request.storage_ref, publish_artifact=request.publish_artifact, artifact_id=request.artifact_id, target_format=request.target_format, artifact_label=request.artifact_label, metadata=request.metadata, ) if not result.success: raise HTTPException(status_code=400, detail=result.error) response = { "ok": True, "doc_id": result.doc_id, "version_no": result.version_no, "version_id": result.version_id, "updated_chunks": result.updated_chunks, "status": result.status, "publish_error": result.publish_error, "artifact_id": result.artifact_id, "artifact_version_id": result.artifact_version_id, "artifact_storage_key": result.artifact_storage_key, "artifact_mime": result.artifact_mime, "artifact_download_url": result.artifact_download_url, } return response except HTTPException: raise except Exception as e: logger.error(f"Update document error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/doc/publish") async def publish_document_endpoint(request: PublishDocumentRequest): """ Publish current document text as physical file artifact version. """ try: result = await publish_document_artifact( session_id=request.session_id, doc_id=request.doc_id, doc_url=request.doc_url, file_name=request.file_name, text=request.text, dao_id=request.dao_id, user_id=request.user_id, artifact_id=request.artifact_id, target_format=request.target_format, artifact_label=request.artifact_label, metadata=request.metadata, ) if not result.success: raise HTTPException(status_code=400, detail=result.error) return { "ok": True, "artifact_id": result.artifact_id, "version_id": result.version_id, "storage_key": result.storage_key, "mime": result.mime, "file_name": result.file_name, "download_url": result.download_url, } except HTTPException: raise except Exception as e: logger.error(f"Publish document error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/doc/versions/{doc_id}") async def list_document_versions_endpoint(doc_id: str, agent_id: str = "daarwizz", limit: int = 20): """ List document versions for agent/doc pair. """ try: data = await list_document_versions(agent_id=agent_id, doc_id=doc_id, limit=limit) if not data.get("ok"): raise HTTPException(status_code=400, detail=data.get("error", "Failed to load versions")) return data except HTTPException: raise except Exception as e: logger.error(f"List document versions error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/doc/context/{session_id}") async def get_document_context(session_id: str): """ Get document context for a session. Returns the last parsed document ID and metadata for the session. """ try: context = await get_doc_context(session_id) if not context: raise HTTPException(status_code=404, detail="No document context found") return { "ok": True, "context": { "doc_id": context.doc_id, "dao_id": context.dao_id, "user_id": context.user_id, "doc_url": context.doc_url, "file_name": context.file_name, "saved_at": context.saved_at } } except HTTPException: raise except Exception as e: logger.error(f"Get document context error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/api/doc/artifacts/{artifact_id}/versions/{version_id}/download") async def download_artifact_version_via_gateway( artifact_id: str, version_id: str, filename: Optional[str] = None, inline: bool = False, ): """ Proxy download for artifact version to avoid exposing internal MinIO host to browser clients. """ aid = (artifact_id or "").strip() vid = (version_id or "").strip() if not aid or not vid: raise HTTPException(status_code=400, detail="artifact_id and version_id are required") try: async with httpx.AsyncClient(timeout=DOC_DOWNLOAD_TIMEOUT_SECONDS) as client: meta_resp = await client.get( f"{ARTIFACT_REGISTRY_URL}/artifacts/{aid}/versions/{vid}/download" ) if meta_resp.status_code >= 400: detail = "" try: detail = meta_resp.json().get("detail") # type: ignore[assignment] except Exception: detail = meta_resp.text[:200] raise HTTPException(status_code=meta_resp.status_code, detail=detail or "Version download info failed") meta = meta_resp.json() signed_url = (meta.get("url") or "").strip() if not signed_url: raise HTTPException(status_code=502, detail="artifact-registry returned empty download URL") file_resp = await client.get(signed_url) if file_resp.status_code >= 400: raise HTTPException(status_code=502, detail=f"Artifact storage download failed: {file_resp.status_code}") mime = (meta.get("mime") or file_resp.headers.get("content-type") or "application/octet-stream").strip() storage_key = str(meta.get("storage_key") or "") inferred_name = storage_key.rsplit("/", 1)[-1] if "/" in storage_key else storage_key out_name = (filename or inferred_name or f"{aid}_{vid}.bin").strip() out_name = re.sub(r"[^A-Za-z0-9._-]+", "_", out_name).strip("._") or f"{aid}_{vid}.bin" disposition = "inline" if inline else "attachment" headers = { "Content-Disposition": f'{disposition}; filename="{out_name}"', "Cache-Control": "private, max-age=60", } return Response(content=file_resp.content, media_type=mime, headers=headers) except HTTPException: raise except Exception as e: logger.error(f"Artifact version proxy download failed: aid={aid}, vid={vid}, err={e}", exc_info=True) raise HTTPException(status_code=500, detail="Artifact proxy download failed")