""" index-doc-worker - Downloads source document from MinIO - Parses to parsed_json - Chunks and indexes into RAG - Registers parsed + chunks versions in artifact-registry """ import asyncio import csv import hashlib import io import json import logging import os import tempfile from pathlib import Path from typing import Any, Dict, List, Optional import httpx import pdfplumber from docx import Document import openpyxl from minio import Minio from minio.error import S3Error from nats.aio.client import Client as NATS logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) NATS_URL = os.getenv("NATS_URL", "nats://nats:4222") REGISTRY_URL = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9220").rstrip("/") RAG_URL = os.getenv("RAG_SERVICE_URL", "http://rag-service:9500").rstrip("/") MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000") MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin") MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin") MINIO_BUCKET = os.getenv("MINIO_BUCKET", "artifacts") MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true" MAX_FILE_BYTES = int(os.getenv("INDEX_DOC_MAX_BYTES", str(50 * 1024 * 1024))) CHUNK_SIZE = int(os.getenv("INDEX_DOC_CHUNK_SIZE", "1000")) CHUNK_OVERLAP = int(os.getenv("INDEX_DOC_CHUNK_OVERLAP", "150")) PARSER_VERSION = "v1-pdfplumber-docx" CHUNKER_VERSION = "v1-basic" EMBEDDER_VERSION = os.getenv("EMBEDDER_VERSION", "rag-default") minio_client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_SECURE, ) def _sha256(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def _fingerprint(source_sha: str) -> str: raw = f"{PARSER_VERSION}|{CHUNKER_VERSION}|{EMBEDDER_VERSION}|{source_sha}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _ensure_meta_dict(meta: Any) -> Dict[str, Any]: if isinstance(meta, dict): return meta if isinstance(meta, str): try: return json.loads(meta) except Exception: return {} return {} def _chunk_text(text: str) -> List[Dict]: if not text: return [] chunks = [] start = 0 length = len(text) while start < length: end = min(start + CHUNK_SIZE, length) chunks.append({ "text": text[start:end], "offset_start": start, "offset_end": end, }) if end == length: break start = end - CHUNK_OVERLAP if start < 0: start = 0 return chunks def _parsed_from_text(text: str, source_name: Optional[str] = None) -> Dict: blocks = [{"type": "paragraph", "text": text}] if source_name: blocks[0]["source_filename"] = source_name return {"pages": [{"page_num": 1, "blocks": blocks}]} def _parse_pdf(data: bytes) -> Dict: pages = [] with pdfplumber.open(io.BytesIO(data)) as pdf: for idx, page in enumerate(pdf.pages, start=1): text = page.extract_text() or "" pages.append({"page_num": idx, "blocks": [{"type": "paragraph", "text": text}]}) return {"pages": pages} def _parse_docx(data: bytes) -> Dict: with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as tmp: tmp.write(data) tmp_path = Path(tmp.name) try: doc = Document(str(tmp_path)) text = "\n".join([p.text for p in doc.paragraphs]) return _parsed_from_text(text) finally: tmp_path.unlink(missing_ok=True) def _parse_csv(data: bytes) -> Dict: text = data.decode("utf-8", errors="ignore") reader = csv.reader(io.StringIO(text)) rows = ["\t".join(row) for row in reader] return _parsed_from_text("\n".join(rows)) def _parse_xlsx(data: bytes) -> Dict: with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp: tmp.write(data) tmp_path = Path(tmp.name) try: wb = openpyxl.load_workbook(str(tmp_path), data_only=True) parts = [] for sheet in wb.sheetnames: parts.append(f"--- Sheet: {sheet} ---") ws = wb[sheet] for row in ws.iter_rows(values_only=True): parts.append("\t".join(["" if v is None else str(v) for v in row])) return _parsed_from_text("\n".join(parts)) finally: tmp_path.unlink(missing_ok=True) def _parse_text(data: bytes) -> Dict: text = data.decode("utf-8", errors="ignore") return _parsed_from_text(text) def _parse_zip(data: bytes) -> Dict: import zipfile pages = [] with zipfile.ZipFile(io.BytesIO(data)) as zf: for member in zf.infolist(): if member.is_dir(): continue name = member.filename try: content = zf.read(member) except Exception: continue lower = name.lower() if lower.endswith(".pdf"): parsed = _parse_pdf(content) elif lower.endswith(".docx"): parsed = _parse_docx(content) elif lower.endswith(".csv"): parsed = _parse_csv(content) elif lower.endswith(".xlsx"): parsed = _parse_xlsx(content) elif lower.endswith(".md") or lower.endswith(".txt"): parsed = _parse_text(content) else: continue for page in parsed.get("pages", []): for block in page.get("blocks", []): block["source_filename"] = name pages.extend(parsed.get("pages", [])) return {"pages": pages} def _parse_by_mime(data: bytes, mime: str, filename: Optional[str]) -> Dict: lower = (filename or "").lower() if mime == "application/pdf" or lower.endswith(".pdf"): return _parse_pdf(data) if mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or lower.endswith(".docx"): return _parse_docx(data) if mime in {"text/markdown", "text/plain"} or lower.endswith(".md") or lower.endswith(".txt"): return _parse_text(data) if mime == "text/csv" or lower.endswith(".csv"): return _parse_csv(data) if mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" or lower.endswith(".xlsx"): return _parse_xlsx(data) if mime == "application/zip" or lower.endswith(".zip"): return _parse_zip(data) return _parse_text(data) def _chunks_jsonl(chunks: List[Dict], meta: Dict) -> str: lines = [] for idx, chunk in enumerate(chunks, start=1): item = { "chunk_id": f"chunk_{idx}", "content": chunk["text"], "meta": meta, "offset_start": chunk.get("offset_start"), "offset_end": chunk.get("offset_end"), } lines.append(json.dumps(item, ensure_ascii=False)) return "\n".join(lines) def _parsed_to_blocks_text(parsed: Dict) -> str: parts = [] for page in parsed.get("pages", []): for block in page.get("blocks", []): text = block.get("text", "") if text: parts.append(text) return "\n\n".join(parts) def _mask_error(text: str) -> str: return (text or "")[:400] async def _get_versions(artifact_id: str) -> List[Dict]: async with httpx.AsyncClient(timeout=20.0) as client: resp = await client.get(f"{REGISTRY_URL}/artifacts/{artifact_id}/versions") resp.raise_for_status() return resp.json().get("items", []) async def _check_rag_health() -> None: for attempt in range(1, 8): try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(f"{RAG_URL}/health") if resp.status_code == 200: logger.info("RAG healthcheck OK") return logger.warning("RAG healthcheck failed: %s", resp.status_code) except Exception as e: logger.warning("RAG healthcheck error (attempt %s): %s", attempt, e) await asyncio.sleep(3) logger.warning("RAG healthcheck did not pass; continuing anyway") async def _add_version(artifact_id: str, payload: Dict) -> Dict: async with httpx.AsyncClient(timeout=20.0) as client: resp = await client.post(f"{REGISTRY_URL}/artifacts/{artifact_id}/versions", json=payload) resp.raise_for_status() return resp.json() async def _job_done(job_id: str, note: str, meta_json: Optional[Dict] = None) -> None: async with httpx.AsyncClient(timeout=10.0) as client: await client.post( f"{REGISTRY_URL}/jobs/{job_id}/done", json={"note": note, "meta_json": meta_json or {}}, ) async def _job_fail(job_id: str, error_text: str) -> None: async with httpx.AsyncClient(timeout=10.0) as client: await client.post(f"{REGISTRY_URL}/jobs/{job_id}/fail", json={"error_text": _mask_error(error_text)}) async def _rag_upsert(chunks: List[Dict]) -> Dict: async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post(f"{RAG_URL}/index/upsert", json={"chunks": chunks}) resp.raise_for_status() return resp.json() async def _rag_delete_fingerprint(fingerprint: str) -> None: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{RAG_URL}/index/delete_by_fingerprint", json={"fingerprint": fingerprint}, ) resp.raise_for_status() async def _handle_job(data: Dict) -> None: job_id = data.get("job_id") artifact_id = data.get("artifact_id") input_version_id = data.get("input_version_id") acl_ref = data.get("acl_ref") brand_id = data.get("brand_id") project_id = data.get("project_id") force = bool(data.get("force")) if not job_id or not artifact_id or not input_version_id: logger.error("Invalid job payload: %s", data) return try: versions = await _get_versions(artifact_id) source = next((v for v in versions if v.get("id") == input_version_id), None) if not source: await _job_fail(job_id, "Source version not found") return storage_key = source.get("storage_key") mime = source.get("mime", "application/octet-stream") if not storage_key: await _job_fail(job_id, "Missing storage_key") return obj = minio_client.get_object(MINIO_BUCKET, storage_key) data_bytes = b"".join(list(obj.stream(32 * 1024))) if len(data_bytes) > MAX_FILE_BYTES: await _job_fail(job_id, "File too large") return source_sha = source.get("sha256") or _sha256(data_bytes) index_fingerprint = _fingerprint(source_sha) # Idempotency for v in versions: meta = _ensure_meta_dict(v.get("meta_json")) if meta.get("index_fingerprint") == index_fingerprint: await _job_done(job_id, "already indexed") return source_meta = _ensure_meta_dict(source.get("meta_json")) parsed = _parse_by_mime(data_bytes, mime, source_meta.get("file_name")) parsed["metadata"] = { "acl_ref": acl_ref, "brand_id": brand_id, "project_id": project_id, "source_version_id": input_version_id, } full_text = _parsed_to_blocks_text(parsed) chunks = _chunk_text(full_text) # Store parsed.json parsed_bytes = json.dumps(parsed, ensure_ascii=False).encode("utf-8") parsed_key = f"artifacts/{artifact_id}/versions/{input_version_id}/parsed.json" minio_client.put_object( MINIO_BUCKET, parsed_key, data=io.BytesIO(parsed_bytes), length=len(parsed_bytes), content_type="application/json", ) # Store chunks.jsonl chunks_meta = { "artifact_id": artifact_id, "source_version_id": input_version_id, "acl_ref": acl_ref, "brand_id": brand_id, "project_id": project_id, } chunks_text = _chunks_jsonl(chunks, chunks_meta) chunks_bytes = chunks_text.encode("utf-8") chunks_key = f"artifacts/{artifact_id}/versions/{input_version_id}/chunks.jsonl" minio_client.put_object( MINIO_BUCKET, chunks_key, data=io.BytesIO(chunks_bytes), length=len(chunks_bytes), content_type="application/x-ndjson", ) parsed_version = await _add_version( artifact_id, { "storage_key": parsed_key, "sha256": _sha256(parsed_bytes), "mime": "application/json", "size_bytes": len(parsed_bytes), "label": "parsed", "meta_json": { "index_fingerprint": index_fingerprint, "parser_version": PARSER_VERSION, "chunker_version": CHUNKER_VERSION, "source_version_id": input_version_id, "pages": len(parsed.get("pages", [])), }, }, ) chunks_version = await _add_version( artifact_id, { "storage_key": chunks_key, "sha256": _sha256(chunks_bytes), "mime": "application/x-ndjson", "size_bytes": len(chunks_bytes), "label": "chunks", "meta_json": { "index_fingerprint": index_fingerprint, "chunks_count": len(chunks), "source_version_id": input_version_id, }, }, ) # Index into RAG if force: await _rag_delete_fingerprint(index_fingerprint) chunks_payload = [] for idx, chunk in enumerate(chunks, start=1): chunks_payload.append({ "content": chunk["text"], "meta": { "acl_ref": acl_ref, "brand_id": brand_id, "project_id": project_id, "artifact_id": artifact_id, "source_version_id": input_version_id, "chunk_id": f"chunk_{idx}", "fingerprint": index_fingerprint, "index_fingerprint": index_fingerprint, "offset_start": chunk.get("offset_start"), "offset_end": chunk.get("offset_end"), }, }) await _rag_upsert(chunks_payload) await _job_done( job_id, f"indexed chunks={len(chunks)}", meta_json={ "chunks_count": len(chunks), "parser_version": PARSER_VERSION, "chunker_version": CHUNKER_VERSION, "embedder_version": EMBEDDER_VERSION, "source_version_id": input_version_id, "parsed_version_id": parsed_version.get("version_id"), "chunks_version_id": chunks_version.get("version_id"), "fingerprint": index_fingerprint, }, ) except Exception as e: logger.exception("index_doc failed") await _job_fail(job_id, str(e)) async def main() -> None: await _check_rag_health() nc = NATS() await nc.connect(servers=[NATS_URL]) sub = await nc.subscribe("artifact.job.index_doc.requested") while True: try: msg = await sub.next_msg(timeout=60) except Exception: continue try: payload = msg.data.decode("utf-8") data = json.loads(payload) except Exception: logger.exception("Invalid message payload") continue await _handle_job(data) if __name__ == "__main__": asyncio.run(main())