## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
468 lines
16 KiB
Python
468 lines
16 KiB
Python
"""
|
|
index-doc-worker
|
|
- Downloads source document from MinIO
|
|
- Parses to parsed_json
|
|
- Chunks and indexes into RAG
|
|
- Registers parsed + chunks versions in artifact-registry
|
|
"""
|
|
|
|
import asyncio
|
|
import csv
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import httpx
|
|
import pdfplumber
|
|
from docx import Document
|
|
import openpyxl
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
from nats.aio.client import Client as NATS
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
|
|
REGISTRY_URL = os.getenv("ARTIFACT_REGISTRY_URL", "http://artifact-registry:9220").rstrip("/")
|
|
RAG_URL = os.getenv("RAG_SERVICE_URL", "http://rag-service:9500").rstrip("/")
|
|
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
|
|
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
|
|
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin")
|
|
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "artifacts")
|
|
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
|
|
|
|
MAX_FILE_BYTES = int(os.getenv("INDEX_DOC_MAX_BYTES", str(50 * 1024 * 1024)))
|
|
CHUNK_SIZE = int(os.getenv("INDEX_DOC_CHUNK_SIZE", "1000"))
|
|
CHUNK_OVERLAP = int(os.getenv("INDEX_DOC_CHUNK_OVERLAP", "150"))
|
|
|
|
PARSER_VERSION = "v1-pdfplumber-docx"
|
|
CHUNKER_VERSION = "v1-basic"
|
|
EMBEDDER_VERSION = os.getenv("EMBEDDER_VERSION", "rag-default")
|
|
|
|
minio_client = Minio(
|
|
MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
secure=MINIO_SECURE,
|
|
)
|
|
|
|
|
|
def _sha256(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def _fingerprint(source_sha: str) -> str:
|
|
raw = f"{PARSER_VERSION}|{CHUNKER_VERSION}|{EMBEDDER_VERSION}|{source_sha}"
|
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _ensure_meta_dict(meta: Any) -> Dict[str, Any]:
|
|
if isinstance(meta, dict):
|
|
return meta
|
|
if isinstance(meta, str):
|
|
try:
|
|
return json.loads(meta)
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _chunk_text(text: str) -> List[Dict]:
|
|
if not text:
|
|
return []
|
|
chunks = []
|
|
start = 0
|
|
length = len(text)
|
|
while start < length:
|
|
end = min(start + CHUNK_SIZE, length)
|
|
chunks.append({
|
|
"text": text[start:end],
|
|
"offset_start": start,
|
|
"offset_end": end,
|
|
})
|
|
if end == length:
|
|
break
|
|
start = end - CHUNK_OVERLAP
|
|
if start < 0:
|
|
start = 0
|
|
return chunks
|
|
|
|
|
|
def _parsed_from_text(text: str, source_name: Optional[str] = None) -> Dict:
|
|
blocks = [{"type": "paragraph", "text": text}]
|
|
if source_name:
|
|
blocks[0]["source_filename"] = source_name
|
|
return {"pages": [{"page_num": 1, "blocks": blocks}]}
|
|
|
|
|
|
def _parse_pdf(data: bytes) -> Dict:
|
|
pages = []
|
|
with pdfplumber.open(io.BytesIO(data)) as pdf:
|
|
for idx, page in enumerate(pdf.pages, start=1):
|
|
text = page.extract_text() or ""
|
|
pages.append({"page_num": idx, "blocks": [{"type": "paragraph", "text": text}]})
|
|
return {"pages": pages}
|
|
|
|
|
|
def _parse_docx(data: bytes) -> Dict:
|
|
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as tmp:
|
|
tmp.write(data)
|
|
tmp_path = Path(tmp.name)
|
|
try:
|
|
doc = Document(str(tmp_path))
|
|
text = "\n".join([p.text for p in doc.paragraphs])
|
|
return _parsed_from_text(text)
|
|
finally:
|
|
tmp_path.unlink(missing_ok=True)
|
|
|
|
|
|
def _parse_csv(data: bytes) -> Dict:
|
|
text = data.decode("utf-8", errors="ignore")
|
|
reader = csv.reader(io.StringIO(text))
|
|
rows = ["\t".join(row) for row in reader]
|
|
return _parsed_from_text("\n".join(rows))
|
|
|
|
|
|
def _parse_xlsx(data: bytes) -> Dict:
|
|
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
|
|
tmp.write(data)
|
|
tmp_path = Path(tmp.name)
|
|
try:
|
|
wb = openpyxl.load_workbook(str(tmp_path), data_only=True)
|
|
parts = []
|
|
for sheet in wb.sheetnames:
|
|
parts.append(f"--- Sheet: {sheet} ---")
|
|
ws = wb[sheet]
|
|
for row in ws.iter_rows(values_only=True):
|
|
parts.append("\t".join(["" if v is None else str(v) for v in row]))
|
|
return _parsed_from_text("\n".join(parts))
|
|
finally:
|
|
tmp_path.unlink(missing_ok=True)
|
|
|
|
|
|
def _parse_text(data: bytes) -> Dict:
|
|
text = data.decode("utf-8", errors="ignore")
|
|
return _parsed_from_text(text)
|
|
|
|
|
|
def _parse_zip(data: bytes) -> Dict:
|
|
import zipfile
|
|
pages = []
|
|
with zipfile.ZipFile(io.BytesIO(data)) as zf:
|
|
for member in zf.infolist():
|
|
if member.is_dir():
|
|
continue
|
|
name = member.filename
|
|
try:
|
|
content = zf.read(member)
|
|
except Exception:
|
|
continue
|
|
lower = name.lower()
|
|
if lower.endswith(".pdf"):
|
|
parsed = _parse_pdf(content)
|
|
elif lower.endswith(".docx"):
|
|
parsed = _parse_docx(content)
|
|
elif lower.endswith(".csv"):
|
|
parsed = _parse_csv(content)
|
|
elif lower.endswith(".xlsx"):
|
|
parsed = _parse_xlsx(content)
|
|
elif lower.endswith(".md") or lower.endswith(".txt"):
|
|
parsed = _parse_text(content)
|
|
else:
|
|
continue
|
|
for page in parsed.get("pages", []):
|
|
for block in page.get("blocks", []):
|
|
block["source_filename"] = name
|
|
pages.extend(parsed.get("pages", []))
|
|
return {"pages": pages}
|
|
|
|
|
|
def _parse_by_mime(data: bytes, mime: str, filename: Optional[str]) -> Dict:
|
|
lower = (filename or "").lower()
|
|
if mime == "application/pdf" or lower.endswith(".pdf"):
|
|
return _parse_pdf(data)
|
|
if mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or lower.endswith(".docx"):
|
|
return _parse_docx(data)
|
|
if mime in {"text/markdown", "text/plain"} or lower.endswith(".md") or lower.endswith(".txt"):
|
|
return _parse_text(data)
|
|
if mime == "text/csv" or lower.endswith(".csv"):
|
|
return _parse_csv(data)
|
|
if mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" or lower.endswith(".xlsx"):
|
|
return _parse_xlsx(data)
|
|
if mime == "application/zip" or lower.endswith(".zip"):
|
|
return _parse_zip(data)
|
|
return _parse_text(data)
|
|
|
|
|
|
def _chunks_jsonl(chunks: List[Dict], meta: Dict) -> str:
|
|
lines = []
|
|
for idx, chunk in enumerate(chunks, start=1):
|
|
item = {
|
|
"chunk_id": f"chunk_{idx}",
|
|
"content": chunk["text"],
|
|
"meta": meta,
|
|
"offset_start": chunk.get("offset_start"),
|
|
"offset_end": chunk.get("offset_end"),
|
|
}
|
|
lines.append(json.dumps(item, ensure_ascii=False))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _parsed_to_blocks_text(parsed: Dict) -> str:
|
|
parts = []
|
|
for page in parsed.get("pages", []):
|
|
for block in page.get("blocks", []):
|
|
text = block.get("text", "")
|
|
if text:
|
|
parts.append(text)
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
def _mask_error(text: str) -> str:
|
|
return (text or "")[:400]
|
|
|
|
|
|
async def _get_versions(artifact_id: str) -> List[Dict]:
|
|
async with httpx.AsyncClient(timeout=20.0) as client:
|
|
resp = await client.get(f"{REGISTRY_URL}/artifacts/{artifact_id}/versions")
|
|
resp.raise_for_status()
|
|
return resp.json().get("items", [])
|
|
|
|
|
|
async def _check_rag_health() -> None:
|
|
for attempt in range(1, 8):
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
resp = await client.get(f"{RAG_URL}/health")
|
|
if resp.status_code == 200:
|
|
logger.info("RAG healthcheck OK")
|
|
return
|
|
logger.warning("RAG healthcheck failed: %s", resp.status_code)
|
|
except Exception as e:
|
|
logger.warning("RAG healthcheck error (attempt %s): %s", attempt, e)
|
|
await asyncio.sleep(3)
|
|
logger.warning("RAG healthcheck did not pass; continuing anyway")
|
|
|
|
|
|
async def _add_version(artifact_id: str, payload: Dict) -> Dict:
|
|
async with httpx.AsyncClient(timeout=20.0) as client:
|
|
resp = await client.post(f"{REGISTRY_URL}/artifacts/{artifact_id}/versions", json=payload)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def _job_done(job_id: str, note: str, meta_json: Optional[Dict] = None) -> None:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
await client.post(
|
|
f"{REGISTRY_URL}/jobs/{job_id}/done",
|
|
json={"note": note, "meta_json": meta_json or {}},
|
|
)
|
|
|
|
|
|
async def _job_fail(job_id: str, error_text: str) -> None:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
await client.post(f"{REGISTRY_URL}/jobs/{job_id}/fail", json={"error_text": _mask_error(error_text)})
|
|
|
|
|
|
async def _rag_upsert(chunks: List[Dict]) -> Dict:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
resp = await client.post(f"{RAG_URL}/index/upsert", json={"chunks": chunks})
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def _rag_delete_fingerprint(fingerprint: str) -> None:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(
|
|
f"{RAG_URL}/index/delete_by_fingerprint",
|
|
json={"fingerprint": fingerprint},
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
|
|
async def _handle_job(data: Dict) -> None:
|
|
job_id = data.get("job_id")
|
|
artifact_id = data.get("artifact_id")
|
|
input_version_id = data.get("input_version_id")
|
|
acl_ref = data.get("acl_ref")
|
|
brand_id = data.get("brand_id")
|
|
project_id = data.get("project_id")
|
|
force = bool(data.get("force"))
|
|
|
|
if not job_id or not artifact_id or not input_version_id:
|
|
logger.error("Invalid job payload: %s", data)
|
|
return
|
|
|
|
try:
|
|
versions = await _get_versions(artifact_id)
|
|
source = next((v for v in versions if v.get("id") == input_version_id), None)
|
|
if not source:
|
|
await _job_fail(job_id, "Source version not found")
|
|
return
|
|
|
|
storage_key = source.get("storage_key")
|
|
mime = source.get("mime", "application/octet-stream")
|
|
if not storage_key:
|
|
await _job_fail(job_id, "Missing storage_key")
|
|
return
|
|
|
|
obj = minio_client.get_object(MINIO_BUCKET, storage_key)
|
|
data_bytes = b"".join(list(obj.stream(32 * 1024)))
|
|
if len(data_bytes) > MAX_FILE_BYTES:
|
|
await _job_fail(job_id, "File too large")
|
|
return
|
|
|
|
source_sha = source.get("sha256") or _sha256(data_bytes)
|
|
index_fingerprint = _fingerprint(source_sha)
|
|
|
|
# Idempotency
|
|
for v in versions:
|
|
meta = _ensure_meta_dict(v.get("meta_json"))
|
|
if meta.get("index_fingerprint") == index_fingerprint:
|
|
await _job_done(job_id, "already indexed")
|
|
return
|
|
|
|
source_meta = _ensure_meta_dict(source.get("meta_json"))
|
|
parsed = _parse_by_mime(data_bytes, mime, source_meta.get("file_name"))
|
|
parsed["metadata"] = {
|
|
"acl_ref": acl_ref,
|
|
"brand_id": brand_id,
|
|
"project_id": project_id,
|
|
"source_version_id": input_version_id,
|
|
}
|
|
full_text = _parsed_to_blocks_text(parsed)
|
|
chunks = _chunk_text(full_text)
|
|
|
|
# Store parsed.json
|
|
parsed_bytes = json.dumps(parsed, ensure_ascii=False).encode("utf-8")
|
|
parsed_key = f"artifacts/{artifact_id}/versions/{input_version_id}/parsed.json"
|
|
minio_client.put_object(
|
|
MINIO_BUCKET,
|
|
parsed_key,
|
|
data=io.BytesIO(parsed_bytes),
|
|
length=len(parsed_bytes),
|
|
content_type="application/json",
|
|
)
|
|
|
|
# Store chunks.jsonl
|
|
chunks_meta = {
|
|
"artifact_id": artifact_id,
|
|
"source_version_id": input_version_id,
|
|
"acl_ref": acl_ref,
|
|
"brand_id": brand_id,
|
|
"project_id": project_id,
|
|
}
|
|
chunks_text = _chunks_jsonl(chunks, chunks_meta)
|
|
chunks_bytes = chunks_text.encode("utf-8")
|
|
chunks_key = f"artifacts/{artifact_id}/versions/{input_version_id}/chunks.jsonl"
|
|
minio_client.put_object(
|
|
MINIO_BUCKET,
|
|
chunks_key,
|
|
data=io.BytesIO(chunks_bytes),
|
|
length=len(chunks_bytes),
|
|
content_type="application/x-ndjson",
|
|
)
|
|
|
|
parsed_version = await _add_version(
|
|
artifact_id,
|
|
{
|
|
"storage_key": parsed_key,
|
|
"sha256": _sha256(parsed_bytes),
|
|
"mime": "application/json",
|
|
"size_bytes": len(parsed_bytes),
|
|
"label": "parsed",
|
|
"meta_json": {
|
|
"index_fingerprint": index_fingerprint,
|
|
"parser_version": PARSER_VERSION,
|
|
"chunker_version": CHUNKER_VERSION,
|
|
"source_version_id": input_version_id,
|
|
"pages": len(parsed.get("pages", [])),
|
|
},
|
|
},
|
|
)
|
|
|
|
chunks_version = await _add_version(
|
|
artifact_id,
|
|
{
|
|
"storage_key": chunks_key,
|
|
"sha256": _sha256(chunks_bytes),
|
|
"mime": "application/x-ndjson",
|
|
"size_bytes": len(chunks_bytes),
|
|
"label": "chunks",
|
|
"meta_json": {
|
|
"index_fingerprint": index_fingerprint,
|
|
"chunks_count": len(chunks),
|
|
"source_version_id": input_version_id,
|
|
},
|
|
},
|
|
)
|
|
|
|
# Index into RAG
|
|
if force:
|
|
await _rag_delete_fingerprint(index_fingerprint)
|
|
chunks_payload = []
|
|
for idx, chunk in enumerate(chunks, start=1):
|
|
chunks_payload.append({
|
|
"content": chunk["text"],
|
|
"meta": {
|
|
"acl_ref": acl_ref,
|
|
"brand_id": brand_id,
|
|
"project_id": project_id,
|
|
"artifact_id": artifact_id,
|
|
"source_version_id": input_version_id,
|
|
"chunk_id": f"chunk_{idx}",
|
|
"fingerprint": index_fingerprint,
|
|
"index_fingerprint": index_fingerprint,
|
|
"offset_start": chunk.get("offset_start"),
|
|
"offset_end": chunk.get("offset_end"),
|
|
},
|
|
})
|
|
await _rag_upsert(chunks_payload)
|
|
|
|
await _job_done(
|
|
job_id,
|
|
f"indexed chunks={len(chunks)}",
|
|
meta_json={
|
|
"chunks_count": len(chunks),
|
|
"parser_version": PARSER_VERSION,
|
|
"chunker_version": CHUNKER_VERSION,
|
|
"embedder_version": EMBEDDER_VERSION,
|
|
"source_version_id": input_version_id,
|
|
"parsed_version_id": parsed_version.get("version_id"),
|
|
"chunks_version_id": chunks_version.get("version_id"),
|
|
"fingerprint": index_fingerprint,
|
|
},
|
|
)
|
|
except Exception as e:
|
|
logger.exception("index_doc failed")
|
|
await _job_fail(job_id, str(e))
|
|
|
|
|
|
async def main() -> None:
|
|
await _check_rag_health()
|
|
nc = NATS()
|
|
await nc.connect(servers=[NATS_URL])
|
|
sub = await nc.subscribe("artifact.job.index_doc.requested")
|
|
while True:
|
|
try:
|
|
msg = await sub.next_msg(timeout=60)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
payload = msg.data.decode("utf-8")
|
|
data = json.loads(payload)
|
|
except Exception:
|
|
logger.exception("Invalid message payload")
|
|
continue
|
|
await _handle_job(data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|