Files
microdao-daarion/services/index-doc-worker/app/main.py
Apple 0c8bef82f4 feat: Add Alateya, Clan, Eonarch agents + fix gateway-router connection
## Agents Added
- Alateya: R&D, biotech, innovations
- Clan (Spirit): Community spirit agent
- Eonarch: Consciousness evolution agent

## Changes
- docker-compose.node1.yml: Added tokens for all 3 new agents
- gateway-bot/http_api.py: Added configs and webhook endpoints
- gateway-bot/clan_prompt.txt: New prompt file
- gateway-bot/eonarch_prompt.txt: New prompt file

## Fixes
- Fixed ROUTER_URL from :9102 to :8000 (internal container port)
- All 9 Telegram agents now working

## Documentation
- Created PROJECT-MASTER-INDEX.md - single entry point
- Added various status documents and scripts

Tokens configured:
- Helion, NUTRA, Agromatrix (existing)
- Alateya, Clan, Eonarch (new)
- Druid, GreenFood, DAARWIZZ (configured)
2026-01-28 06:40:34 -08:00

468 lines
16 KiB
Python

"""
index-doc-worker
- Downloads source document from MinIO
- Parses to parsed_json
- Chunks and indexes into RAG
- Registers parsed + chunks versions in artifact-registry
"""
import asyncio
import csv
import hashlib
import io
import json
import logging
import os
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional
import httpx
import pdfplumber
from docx import Document
import openpyxl
from minio import Minio
from minio.error import S3Error
from nats.aio.client import Client as NATS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
REGISTRY_URL = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9220").rstrip("/")
RAG_URL = os.getenv("RAG_SERVICE_URL", "http://rag-service:9500").rstrip("/")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "artifacts")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
MAX_FILE_BYTES = int(os.getenv("INDEX_DOC_MAX_BYTES", str(50 * 1024 * 1024)))
CHUNK_SIZE = int(os.getenv("INDEX_DOC_CHUNK_SIZE", "1000"))
CHUNK_OVERLAP = int(os.getenv("INDEX_DOC_CHUNK_OVERLAP", "150"))
PARSER_VERSION = "v1-pdfplumber-docx"
CHUNKER_VERSION = "v1-basic"
EMBEDDER_VERSION = os.getenv("EMBEDDER_VERSION", "rag-default")
minio_client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE,
)
def _sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _fingerprint(source_sha: str) -> str:
raw = f"{PARSER_VERSION}|{CHUNKER_VERSION}|{EMBEDDER_VERSION}|{source_sha}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _ensure_meta_dict(meta: Any) -> Dict[str, Any]:
if isinstance(meta, dict):
return meta
if isinstance(meta, str):
try:
return json.loads(meta)
except Exception:
return {}
return {}
def _chunk_text(text: str) -> List[Dict]:
if not text:
return []
chunks = []
start = 0
length = len(text)
while start < length:
end = min(start + CHUNK_SIZE, length)
chunks.append({
"text": text[start:end],
"offset_start": start,
"offset_end": end,
})
if end == length:
break
start = end - CHUNK_OVERLAP
if start < 0:
start = 0
return chunks
def _parsed_from_text(text: str, source_name: Optional[str] = None) -> Dict:
blocks = [{"type": "paragraph", "text": text}]
if source_name:
blocks[0]["source_filename"] = source_name
return {"pages": [{"page_num": 1, "blocks": blocks}]}
def _parse_pdf(data: bytes) -> Dict:
pages = []
with pdfplumber.open(io.BytesIO(data)) as pdf:
for idx, page in enumerate(pdf.pages, start=1):
text = page.extract_text() or ""
pages.append({"page_num": idx, "blocks": [{"type": "paragraph", "text": text}]})
return {"pages": pages}
def _parse_docx(data: bytes) -> Dict:
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as tmp:
tmp.write(data)
tmp_path = Path(tmp.name)
try:
doc = Document(str(tmp_path))
text = "\n".join([p.text for p in doc.paragraphs])
return _parsed_from_text(text)
finally:
tmp_path.unlink(missing_ok=True)
def _parse_csv(data: bytes) -> Dict:
text = data.decode("utf-8", errors="ignore")
reader = csv.reader(io.StringIO(text))
rows = ["\t".join(row) for row in reader]
return _parsed_from_text("\n".join(rows))
def _parse_xlsx(data: bytes) -> Dict:
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
tmp.write(data)
tmp_path = Path(tmp.name)
try:
wb = openpyxl.load_workbook(str(tmp_path), data_only=True)
parts = []
for sheet in wb.sheetnames:
parts.append(f"--- Sheet: {sheet} ---")
ws = wb[sheet]
for row in ws.iter_rows(values_only=True):
parts.append("\t".join(["" if v is None else str(v) for v in row]))
return _parsed_from_text("\n".join(parts))
finally:
tmp_path.unlink(missing_ok=True)
def _parse_text(data: bytes) -> Dict:
text = data.decode("utf-8", errors="ignore")
return _parsed_from_text(text)
def _parse_zip(data: bytes) -> Dict:
import zipfile
pages = []
with zipfile.ZipFile(io.BytesIO(data)) as zf:
for member in zf.infolist():
if member.is_dir():
continue
name = member.filename
try:
content = zf.read(member)
except Exception:
continue
lower = name.lower()
if lower.endswith(".pdf"):
parsed = _parse_pdf(content)
elif lower.endswith(".docx"):
parsed = _parse_docx(content)
elif lower.endswith(".csv"):
parsed = _parse_csv(content)
elif lower.endswith(".xlsx"):
parsed = _parse_xlsx(content)
elif lower.endswith(".md") or lower.endswith(".txt"):
parsed = _parse_text(content)
else:
continue
for page in parsed.get("pages", []):
for block in page.get("blocks", []):
block["source_filename"] = name
pages.extend(parsed.get("pages", []))
return {"pages": pages}
def _parse_by_mime(data: bytes, mime: str, filename: Optional[str]) -> Dict:
lower = (filename or "").lower()
if mime == "application/pdf" or lower.endswith(".pdf"):
return _parse_pdf(data)
if mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or lower.endswith(".docx"):
return _parse_docx(data)
if mime in {"text/markdown", "text/plain"} or lower.endswith(".md") or lower.endswith(".txt"):
return _parse_text(data)
if mime == "text/csv" or lower.endswith(".csv"):
return _parse_csv(data)
if mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" or lower.endswith(".xlsx"):
return _parse_xlsx(data)
if mime == "application/zip" or lower.endswith(".zip"):
return _parse_zip(data)
return _parse_text(data)
def _chunks_jsonl(chunks: List[Dict], meta: Dict) -> str:
lines = []
for idx, chunk in enumerate(chunks, start=1):
item = {
"chunk_id": f"chunk_{idx}",
"content": chunk["text"],
"meta": meta,
"offset_start": chunk.get("offset_start"),
"offset_end": chunk.get("offset_end"),
}
lines.append(json.dumps(item, ensure_ascii=False))
return "\n".join(lines)
def _parsed_to_blocks_text(parsed: Dict) -> str:
parts = []
for page in parsed.get("pages", []):
for block in page.get("blocks", []):
text = block.get("text", "")
if text:
parts.append(text)
return "\n\n".join(parts)
def _mask_error(text: str) -> str:
return (text or "")[:400]
async def _get_versions(artifact_id: str) -> List[Dict]:
async with httpx.AsyncClient(timeout=20.0) as client:
resp = await client.get(f"{REGISTRY_URL}/artifacts/{artifact_id}/versions")
resp.raise_for_status()
return resp.json().get("items", [])
async def _check_rag_health() -> None:
for attempt in range(1, 8):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(f"{RAG_URL}/health")
if resp.status_code == 200:
logger.info("RAG healthcheck OK")
return
logger.warning("RAG healthcheck failed: %s", resp.status_code)
except Exception as e:
logger.warning("RAG healthcheck error (attempt %s): %s", attempt, e)
await asyncio.sleep(3)
logger.warning("RAG healthcheck did not pass; continuing anyway")
async def _add_version(artifact_id: str, payload: Dict) -> Dict:
async with httpx.AsyncClient(timeout=20.0) as client:
resp = await client.post(f"{REGISTRY_URL}/artifacts/{artifact_id}/versions", json=payload)
resp.raise_for_status()
return resp.json()
async def _job_done(job_id: str, note: str, meta_json: Optional[Dict] = None) -> None:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"{REGISTRY_URL}/jobs/{job_id}/done",
json={"note": note, "meta_json": meta_json or {}},
)
async def _job_fail(job_id: str, error_text: str) -> None:
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(f"{REGISTRY_URL}/jobs/{job_id}/fail", json={"error_text": _mask_error(error_text)})
async def _rag_upsert(chunks: List[Dict]) -> Dict:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(f"{RAG_URL}/index/upsert", json={"chunks": chunks})
resp.raise_for_status()
return resp.json()
async def _rag_delete_fingerprint(fingerprint: str) -> None:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{RAG_URL}/index/delete_by_fingerprint",
json={"fingerprint": fingerprint},
)
resp.raise_for_status()
async def _handle_job(data: Dict) -> None:
job_id = data.get("job_id")
artifact_id = data.get("artifact_id")
input_version_id = data.get("input_version_id")
acl_ref = data.get("acl_ref")
brand_id = data.get("brand_id")
project_id = data.get("project_id")
force = bool(data.get("force"))
if not job_id or not artifact_id or not input_version_id:
logger.error("Invalid job payload: %s", data)
return
try:
versions = await _get_versions(artifact_id)
source = next((v for v in versions if v.get("id") == input_version_id), None)
if not source:
await _job_fail(job_id, "Source version not found")
return
storage_key = source.get("storage_key")
mime = source.get("mime", "application/octet-stream")
if not storage_key:
await _job_fail(job_id, "Missing storage_key")
return
obj = minio_client.get_object(MINIO_BUCKET, storage_key)
data_bytes = b"".join(list(obj.stream(32 * 1024)))
if len(data_bytes) > MAX_FILE_BYTES:
await _job_fail(job_id, "File too large")
return
source_sha = source.get("sha256") or _sha256(data_bytes)
index_fingerprint = _fingerprint(source_sha)
# Idempotency
for v in versions:
meta = _ensure_meta_dict(v.get("meta_json"))
if meta.get("index_fingerprint") == index_fingerprint:
await _job_done(job_id, "already indexed")
return
source_meta = _ensure_meta_dict(source.get("meta_json"))
parsed = _parse_by_mime(data_bytes, mime, source_meta.get("file_name"))
parsed["metadata"] = {
"acl_ref": acl_ref,
"brand_id": brand_id,
"project_id": project_id,
"source_version_id": input_version_id,
}
full_text = _parsed_to_blocks_text(parsed)
chunks = _chunk_text(full_text)
# Store parsed.json
parsed_bytes = json.dumps(parsed, ensure_ascii=False).encode("utf-8")
parsed_key = f"artifacts/{artifact_id}/versions/{input_version_id}/parsed.json"
minio_client.put_object(
MINIO_BUCKET,
parsed_key,
data=io.BytesIO(parsed_bytes),
length=len(parsed_bytes),
content_type="application/json",
)
# Store chunks.jsonl
chunks_meta = {
"artifact_id": artifact_id,
"source_version_id": input_version_id,
"acl_ref": acl_ref,
"brand_id": brand_id,
"project_id": project_id,
}
chunks_text = _chunks_jsonl(chunks, chunks_meta)
chunks_bytes = chunks_text.encode("utf-8")
chunks_key = f"artifacts/{artifact_id}/versions/{input_version_id}/chunks.jsonl"
minio_client.put_object(
MINIO_BUCKET,
chunks_key,
data=io.BytesIO(chunks_bytes),
length=len(chunks_bytes),
content_type="application/x-ndjson",
)
parsed_version = await _add_version(
artifact_id,
{
"storage_key": parsed_key,
"sha256": _sha256(parsed_bytes),
"mime": "application/json",
"size_bytes": len(parsed_bytes),
"label": "parsed",
"meta_json": {
"index_fingerprint": index_fingerprint,
"parser_version": PARSER_VERSION,
"chunker_version": CHUNKER_VERSION,
"source_version_id": input_version_id,
"pages": len(parsed.get("pages", [])),
},
},
)
chunks_version = await _add_version(
artifact_id,
{
"storage_key": chunks_key,
"sha256": _sha256(chunks_bytes),
"mime": "application/x-ndjson",
"size_bytes": len(chunks_bytes),
"label": "chunks",
"meta_json": {
"index_fingerprint": index_fingerprint,
"chunks_count": len(chunks),
"source_version_id": input_version_id,
},
},
)
# Index into RAG
if force:
await _rag_delete_fingerprint(index_fingerprint)
chunks_payload = []
for idx, chunk in enumerate(chunks, start=1):
chunks_payload.append({
"content": chunk["text"],
"meta": {
"acl_ref": acl_ref,
"brand_id": brand_id,
"project_id": project_id,
"artifact_id": artifact_id,
"source_version_id": input_version_id,
"chunk_id": f"chunk_{idx}",
"fingerprint": index_fingerprint,
"index_fingerprint": index_fingerprint,
"offset_start": chunk.get("offset_start"),
"offset_end": chunk.get("offset_end"),
},
})
await _rag_upsert(chunks_payload)
await _job_done(
job_id,
f"indexed chunks={len(chunks)}",
meta_json={
"chunks_count": len(chunks),
"parser_version": PARSER_VERSION,
"chunker_version": CHUNKER_VERSION,
"embedder_version": EMBEDDER_VERSION,
"source_version_id": input_version_id,
"parsed_version_id": parsed_version.get("version_id"),
"chunks_version_id": chunks_version.get("version_id"),
"fingerprint": index_fingerprint,
},
)
except Exception as e:
logger.exception("index_doc failed")
await _job_fail(job_id, str(e))
async def main() -> None:
await _check_rag_health()
nc = NATS()
await nc.connect(servers=[NATS_URL])
sub = await nc.subscribe("artifact.job.index_doc.requested")
while True:
try:
msg = await sub.next_msg(timeout=60)
except Exception:
continue
try:
payload = msg.data.decode("utf-8")
data = json.loads(payload)
except Exception:
logger.exception("Invalid message payload")
continue
await _handle_job(data)
if __name__ == "__main__":
asyncio.run(main())