From f53e71a0f4d93674b793d816e570defcb5cd932f Mon Sep 17 00:00:00 2001 From: NODA1 System Date: Sat, 21 Feb 2026 16:49:24 +0100 Subject: [PATCH] feat(docs): add versioned document update and versions APIs --- gateway-bot/http_api_doc.py | 81 +++++- gateway-bot/services/doc_service.py | 214 +++++++++++++++ services/router/main.py | 83 ++++++ services/router/memory_retrieval.py | 390 +++++++++++++++++++++++++++- 4 files changed, 764 insertions(+), 4 deletions(-) diff --git a/gateway-bot/http_api_doc.py b/gateway-bot/http_api_doc.py index 57ef7f89..61bb318c 100644 --- a/gateway-bot/http_api_doc.py +++ b/gateway-bot/http_api_doc.py @@ -6,6 +6,8 @@ Endpoints: - POST /api/doc/parse - Parse a document - POST /api/doc/ingest - Ingest document to RAG - POST /api/doc/ask - Ask question about document +- POST /api/doc/update - Update existing document text (versioned) +- GET /api/doc/versions/{doc_id} - List document versions """ import logging from typing import Optional, Dict, Any @@ -17,9 +19,12 @@ from services.doc_service import ( parse_document, ingest_document, ask_about_document, + update_document, + list_document_versions, get_doc_context, ParsedResult, IngestResult, + UpdateResult, QAResult, DocContext ) @@ -52,6 +57,7 @@ class IngestDocumentRequest(BaseModel): file_name: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None + agent_id: str = "daarwizz" class AskDocumentRequest(BaseModel): @@ -61,6 +67,21 @@ class AskDocumentRequest(BaseModel): doc_id: Optional[str] = None dao_id: Optional[str] = None user_id: Optional[str] = None + agent_id: str = "daarwizz" + + +class UpdateDocumentRequest(BaseModel): + """Request to update existing document content.""" + session_id: str + doc_id: Optional[str] = None + doc_url: Optional[str] = None + file_name: Optional[str] = None + text: Optional[str] = None + dao_id: Optional[str] = None + user_id: Optional[str] = None + agent_id: str = "daarwizz" + storage_ref: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None # ======================================== @@ -167,7 +188,8 @@ async def ingest_document_endpoint(request: IngestDocumentRequest): doc_url=request.doc_url, file_name=request.file_name, dao_id=request.dao_id, - user_id=request.user_id + user_id=request.user_id, + agent_id=request.agent_id, ) if not result.success: @@ -209,7 +231,8 @@ async def ask_about_document_endpoint(request: AskDocumentRequest): question=request.question, doc_id=doc_id, dao_id=request.dao_id, - user_id=request.user_id + user_id=request.user_id, + agent_id=request.agent_id, ) if not result.success: @@ -227,6 +250,59 @@ async def ask_about_document_endpoint(request: AskDocumentRequest): raise HTTPException(status_code=500, detail=str(e)) +@router.post("/api/doc/update") +async def update_document_endpoint(request: UpdateDocumentRequest): + """ + Update a document and bump its version. + If text is omitted and doc_url exists, text is re-parsed from the source document. + """ + try: + result = await update_document( + session_id=request.session_id, + doc_id=request.doc_id, + doc_url=request.doc_url, + file_name=request.file_name, + text=request.text, + dao_id=request.dao_id, + user_id=request.user_id, + agent_id=request.agent_id, + storage_ref=request.storage_ref, + metadata=request.metadata, + ) + if not result.success: + raise HTTPException(status_code=400, detail=result.error) + return { + "ok": True, + "doc_id": result.doc_id, + "version_no": result.version_no, + "version_id": result.version_id, + "updated_chunks": result.updated_chunks, + "status": result.status, + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Update document error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/api/doc/versions/{doc_id}") +async def list_document_versions_endpoint(doc_id: str, agent_id: str = "daarwizz", limit: int = 20): + """ + List document versions for agent/doc pair. + """ + try: + data = await list_document_versions(agent_id=agent_id, doc_id=doc_id, limit=limit) + if not data.get("ok"): + raise HTTPException(status_code=400, detail=data.get("error", "Failed to load versions")) + return data + except HTTPException: + raise + except Exception as e: + logger.error(f"List document versions error: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/api/doc/context/{session_id}") async def get_document_context(session_id: str): """ @@ -257,4 +333,3 @@ async def get_document_context(session_id: str): except Exception as e: logger.error(f"Get document context error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) - diff --git a/gateway-bot/services/doc_service.py b/gateway-bot/services/doc_service.py index ed936d69..040b01c3 100644 --- a/gateway-bot/services/doc_service.py +++ b/gateway-bot/services/doc_service.py @@ -51,6 +51,17 @@ class IngestResult(BaseModel): error: Optional[str] = None +class UpdateResult(BaseModel): + """Result of document update with version bump.""" + success: bool + doc_id: Optional[str] = None + version_no: Optional[int] = None + version_id: Optional[int] = None + updated_chunks: int = 0 + status: str = "unknown" + error: Optional[str] = None + + class QAResult(BaseModel): """Result of RAG query about a document""" success: bool @@ -106,6 +117,27 @@ class DocumentService: raise RuntimeError(f"Router error on {path}: {err}") return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"} + async def _router_get_json( + self, + path: str, + timeout: float = 30.0, + ) -> Dict[str, Any]: + import httpx + + base = ROUTER_URL.rstrip("/") + url = f"{base}{path}" + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.get(url) + body = {} + try: + body = resp.json() + except Exception: + body = {"ok": False, "error": f"Invalid JSON from router ({resp.status_code})"} + if resp.status_code >= 400: + err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}" + raise RuntimeError(f"Router error on {path}: {err}") + return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"} + def _is_excel_filename(self, file_name: Optional[str]) -> bool: if not file_name: return False @@ -572,6 +604,152 @@ class DocumentService: success=False, error=str(e) ) + + async def update_document( + self, + session_id: str, + doc_id: Optional[str] = None, + doc_url: Optional[str] = None, + file_name: Optional[str] = None, + text: Optional[str] = None, + dao_id: Optional[str] = None, + user_id: Optional[str] = None, + agent_id: str = "daarwizz", + storage_ref: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> UpdateResult: + """ + Update existing document content and bump version in router memory. + """ + try: + context = await self.get_doc_context(session_id) + if context: + if not doc_id: + doc_id = context.doc_id + if not doc_url: + doc_url = context.doc_url + if not file_name: + file_name = context.file_name + if not dao_id: + dao_id = context.dao_id + + if not doc_id: + return UpdateResult( + success=False, + status="failed", + error="No document context found. Provide doc_id or parse/ingest first.", + ) + + effective_text = (text or "").strip() + if not effective_text: + if not doc_url: + return UpdateResult( + success=False, + doc_id=doc_id, + status="failed", + error="No text or doc_url provided for update", + ) + parsed = await self.parse_document( + session_id=session_id, + doc_url=doc_url, + file_name=file_name or "document", + dao_id=dao_id or "", + user_id=user_id or "", + output_mode="markdown", + metadata={"source": self._extract_source(session_id), "mode": "update"}, + ) + if not parsed.success: + return UpdateResult( + success=False, + doc_id=doc_id, + status="failed", + error=parsed.error or "Document parse failed", + ) + effective_text = (parsed.markdown or "").strip() + + if not effective_text: + return UpdateResult( + success=False, + doc_id=doc_id, + status="failed", + error="No extractable text for update", + ) + + meta = { + "session_id": session_id, + "source": self._extract_source(session_id), + } + if isinstance(metadata, dict): + meta.update(metadata) + + response = await self._router_post_json( + "/v1/documents/update", + { + "agent_id": (agent_id or "daarwizz").lower(), + "doc_id": doc_id, + "file_name": file_name, + "text": effective_text, + "dao_id": dao_id, + "user_id": user_id, + "storage_ref": storage_ref, + "metadata": meta, + }, + timeout=90.0, + ) + + if not response.get("ok"): + return UpdateResult( + success=False, + doc_id=doc_id, + status="failed", + error=response.get("error", "Router update failed"), + ) + + await self.save_doc_context( + session_id=session_id, + doc_id=doc_id, + doc_url=doc_url, + file_name=file_name, + dao_id=dao_id, + user_id=user_id, + ) + + return UpdateResult( + success=True, + doc_id=response.get("doc_id") or doc_id, + version_no=int(response.get("version_no", 0) or 0) or None, + version_id=int(response.get("version_id", 0) or 0) or None, + updated_chunks=int(response.get("chunks_stored", 0) or 0), + status="updated", + ) + except Exception as e: + logger.error(f"Document update failed: {e}", exc_info=True) + return UpdateResult( + success=False, + doc_id=doc_id, + status="failed", + error=str(e), + ) + + async def list_document_versions( + self, + agent_id: str, + doc_id: str, + limit: int = 20, + ) -> Dict[str, Any]: + aid = (agent_id or "daarwizz").lower() + did = (doc_id or "").strip() + if not did: + return {"ok": False, "error": "doc_id is required", "items": []} + try: + response = await self._router_get_json( + f"/v1/documents/{did}/versions?agent_id={aid}&limit={max(1, min(int(limit or 20), 200))}", + timeout=30.0, + ) + return response if isinstance(response, dict) else {"ok": False, "error": "invalid_response", "items": []} + except Exception as e: + logger.error(f"list_document_versions failed: {e}") + return {"ok": False, "error": str(e), "items": []} async def ask_about_document( self, @@ -762,6 +940,42 @@ async def ask_about_document( ) +async def update_document( + session_id: str, + doc_id: Optional[str] = None, + doc_url: Optional[str] = None, + file_name: Optional[str] = None, + text: Optional[str] = None, + dao_id: Optional[str] = None, + user_id: Optional[str] = None, + agent_id: str = "daarwizz", + storage_ref: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> UpdateResult: + """Update document chunks and bump version.""" + return await doc_service.update_document( + session_id=session_id, + doc_id=doc_id, + doc_url=doc_url, + file_name=file_name, + text=text, + dao_id=dao_id, + user_id=user_id, + agent_id=agent_id, + storage_ref=storage_ref, + metadata=metadata, + ) + + +async def list_document_versions(agent_id: str, doc_id: str, limit: int = 20) -> Dict[str, Any]: + """List document versions from router.""" + return await doc_service.list_document_versions( + agent_id=agent_id, + doc_id=doc_id, + limit=limit, + ) + + async def save_doc_context( session_id: str, doc_id: str, diff --git a/services/router/main.py b/services/router/main.py index 8b8cb517..5ae92ba7 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -1256,6 +1256,18 @@ class DocumentQueryRequest(BaseModel): limit: int = 5 +class DocumentUpdateRequest(BaseModel): + """Update existing document text and bump version.""" + agent_id: str + doc_id: str + file_name: Optional[str] = None + text: str + dao_id: Optional[str] = None + user_id: Optional[str] = None + storage_ref: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + + class SharedMemoryReviewRequest(BaseModel): point_id: str approve: bool @@ -2976,6 +2988,7 @@ async def documents_query(request: DocumentQueryRequest): "doc_id": c_doc_id, "file_name": c_file, "chunk_index": c_idx, + "version_no": ch.get("version_no"), "score": round(c_score, 4), } ) @@ -3031,6 +3044,76 @@ async def documents_query(request: DocumentQueryRequest): } +@app.post("/v1/documents/update") +async def documents_update(request: DocumentUpdateRequest): + """ + Replace document chunks for doc_id with new text and create a new version row. + """ + if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval: + raise HTTPException(status_code=503, detail="Memory retrieval not available") + + agent_id = (request.agent_id or "").strip().lower() + if not agent_id: + raise HTTPException(status_code=400, detail="agent_id is required") + + doc_id = (request.doc_id or "").strip() + if not doc_id: + raise HTTPException(status_code=400, detail="doc_id is required") + + text = (request.text or "").strip() + if not text: + raise HTTPException(status_code=400, detail="text is required") + + result = await memory_retrieval.update_document_chunks( + agent_id=agent_id, + doc_id=doc_id, + file_name=request.file_name, + text=text, + dao_id=request.dao_id, + user_id=request.user_id, + metadata=request.metadata, + storage_ref=request.storage_ref, + ) + if not result.get("ok"): + return { + "ok": False, + "error": result.get("error", "update_failed"), + "doc_id": doc_id, + "collection": result.get("collection"), + } + return result + + +@app.get("/v1/documents/{doc_id}/versions") +async def documents_versions(doc_id: str, agent_id: str, limit: int = 20): + """ + List stored versions for a document. + """ + if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval: + raise HTTPException(status_code=503, detail="Memory retrieval not available") + + aid = (agent_id or "").strip().lower() + if not aid: + raise HTTPException(status_code=400, detail="agent_id is required") + + did = (doc_id or "").strip() + if not did: + raise HTTPException(status_code=400, detail="doc_id is required") + + items = await memory_retrieval.list_document_versions( + agent_id=aid, + doc_id=did, + limit=limit, + ) + return { + "ok": True, + "agent_id": aid, + "doc_id": did, + "total": len(items), + "items": items, + } + + @app.get("/v1/models") async def list_available_models(): """List all available models across backends""" diff --git a/services/router/memory_retrieval.py b/services/router/memory_retrieval.py index aeb96b07..bf1aaaea 100644 --- a/services/router/memory_retrieval.py +++ b/services/router/memory_retrieval.py @@ -19,10 +19,10 @@ import os import json import logging import re +import hashlib from typing import Optional, Dict, Any, List from dataclasses import dataclass, field from datetime import datetime -import hashlib import httpx import asyncpg @@ -40,6 +40,7 @@ NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "neo4j") PENDING_QUESTIONS_LIMIT = int(os.getenv("AGENT_PENDING_QUESTIONS_LIMIT", "5")) SHARED_AGRO_LIBRARY_ENABLED = os.getenv("AGROMATRIX_SHARED_LIBRARY_ENABLED", "true").lower() == "true" SHARED_AGRO_LIBRARY_REQUIRE_REVIEW = os.getenv("AGROMATRIX_SHARED_LIBRARY_REQUIRE_REVIEW", "true").lower() == "true" +DOC_VERSION_PREVIEW_CHARS = int(os.getenv("DOC_VERSION_PREVIEW_CHARS", "240")) @dataclass @@ -245,6 +246,26 @@ class MemoryRetrieval: ON agent_pending_questions (agent_id, channel, chat_id, user_id, status, created_at DESC); CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_pending_questions_unique_open ON agent_pending_questions (agent_id, channel, chat_id, user_id, question_fingerprint, status); + + CREATE TABLE IF NOT EXISTS agent_document_versions ( + id BIGSERIAL PRIMARY KEY, + agent_id TEXT NOT NULL, + doc_id TEXT NOT NULL, + version_no INTEGER NOT NULL, + text_hash TEXT NOT NULL, + text_len INTEGER NOT NULL DEFAULT 0, + text_preview TEXT, + file_name TEXT, + dao_id TEXT, + user_id TEXT, + storage_ref TEXT, + source TEXT NOT NULL DEFAULT 'ingest', + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE (agent_id, doc_id, version_no) + ); + CREATE INDEX IF NOT EXISTS idx_agent_document_versions_latest + ON agent_document_versions (agent_id, doc_id, version_no DESC); """ ) except Exception as e: @@ -1295,6 +1316,304 @@ class MemoryRetrieval: _push_current() return chunks + async def _next_document_version_no( + self, + agent_id: str, + doc_id: str, + ) -> int: + if self.pg_pool: + try: + async with self.pg_pool.acquire() as conn: + value = await conn.fetchval( + """ + SELECT COALESCE(MAX(version_no), 0) + 1 + FROM agent_document_versions + WHERE agent_id = $1 + AND doc_id = $2 + """, + (agent_id or "").lower(), + doc_id, + ) + return max(1, int(value or 1)) + except Exception as e: + logger.warning(f"next_document_version_no(pg) failed: {e}") + + # Fallback: infer from existing chunk payloads in Qdrant. + if self.qdrant_client: + try: + from qdrant_client.http import models as qmodels + + collection = f"{(agent_id or 'daarwizz').lower()}_docs" + points, _ = self.qdrant_client.scroll( + collection_name=collection, + scroll_filter=qmodels.Filter( + must=[ + qmodels.FieldCondition( + key="doc_id", + match=qmodels.MatchValue(value=doc_id), + ) + ] + ), + limit=256, + with_payload=True, + ) + current_max = 0 + for p in points or []: + payload = getattr(p, "payload", {}) or {} + ver = payload.get("version_no") + if isinstance(ver, int): + current_max = max(current_max, ver) + elif isinstance(ver, str) and ver.isdigit(): + current_max = max(current_max, int(ver)) + return current_max + 1 if current_max > 0 else 1 + except Exception as e: + logger.debug(f"next_document_version_no(qdrant) fallback failed: {e}") + + return 1 + + async def _latest_document_version_no( + self, + agent_id: str, + doc_id: str, + ) -> int: + nxt = await self._next_document_version_no(agent_id=agent_id, doc_id=doc_id) + return max(0, int(nxt) - 1) + + async def _record_document_version( + self, + agent_id: str, + doc_id: str, + version_no: int, + text: str, + file_name: Optional[str] = None, + dao_id: Optional[str] = None, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + source: str = "ingest", + storage_ref: Optional[str] = None, + ) -> Dict[str, Any]: + text_body = (text or "").strip() + text_hash = hashlib.sha256(text_body.encode("utf-8")).hexdigest() if text_body else "" + text_len = len(text_body) + preview = text_body[:DOC_VERSION_PREVIEW_CHARS] if text_body else "" + payload = metadata if isinstance(metadata, dict) else {} + + if not self.pg_pool: + return {"ok": True, "version_no": int(version_no), "id": None} + + try: + async with self.pg_pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO agent_document_versions + (agent_id, doc_id, version_no, text_hash, text_len, text_preview, + file_name, dao_id, user_id, storage_ref, source, metadata) + VALUES + ($1, $2, $3, $4, $5, $6, + $7, $8, $9, $10, $11, $12::jsonb) + ON CONFLICT (agent_id, doc_id, version_no) + DO UPDATE SET + text_hash = EXCLUDED.text_hash, + text_len = EXCLUDED.text_len, + text_preview = EXCLUDED.text_preview, + file_name = EXCLUDED.file_name, + dao_id = EXCLUDED.dao_id, + user_id = EXCLUDED.user_id, + storage_ref = EXCLUDED.storage_ref, + source = EXCLUDED.source, + metadata = EXCLUDED.metadata + RETURNING id, version_no + """, + (agent_id or "").lower(), + doc_id, + int(version_no), + text_hash, + int(text_len), + preview, + file_name, + dao_id, + user_id, + storage_ref, + source, + json.dumps(payload), + ) + return { + "ok": True, + "id": int(row["id"]) if row and row.get("id") is not None else None, + "version_no": int(row["version_no"]) if row and row.get("version_no") is not None else int(version_no), + } + except Exception as e: + logger.warning(f"record_document_version failed: {e}") + return {"ok": False, "error": str(e), "version_no": int(version_no)} + + async def list_document_versions( + self, + agent_id: str, + doc_id: str, + limit: int = 20, + ) -> List[Dict[str, Any]]: + rows_out: List[Dict[str, Any]] = [] + if self.pg_pool: + try: + async with self.pg_pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, agent_id, doc_id, version_no, text_hash, text_len, text_preview, + file_name, dao_id, user_id, storage_ref, source, metadata, created_at + FROM agent_document_versions + WHERE agent_id = $1 + AND doc_id = $2 + ORDER BY version_no DESC + LIMIT $3 + """, + (agent_id or "").lower(), + doc_id, + max(1, min(int(limit or 20), 200)), + ) + for r in rows: + meta_raw = r["metadata"] + if isinstance(meta_raw, dict): + meta_obj = meta_raw + elif isinstance(meta_raw, str): + try: + parsed = json.loads(meta_raw) + meta_obj = parsed if isinstance(parsed, dict) else {"raw": parsed} + except Exception: + meta_obj = {"raw": meta_raw} + else: + meta_obj = {} + rows_out.append( + { + "id": int(r["id"]), + "agent_id": r["agent_id"], + "doc_id": r["doc_id"], + "version_no": int(r["version_no"]), + "text_hash": r["text_hash"], + "text_len": int(r["text_len"] or 0), + "text_preview": r["text_preview"], + "file_name": r["file_name"], + "dao_id": r["dao_id"], + "user_id": r["user_id"], + "storage_ref": r["storage_ref"], + "source": r["source"], + "metadata": meta_obj, + "created_at": r["created_at"].isoformat() if r["created_at"] else None, + } + ) + return rows_out + except Exception as e: + logger.warning(f"list_document_versions failed: {e}") + + # PG unavailable fallback: aggregate distinct versions from Qdrant payloads. + if self.qdrant_client: + try: + from qdrant_client.http import models as qmodels + + collection = f"{(agent_id or 'daarwizz').lower()}_docs" + offset = None + seen: Dict[int, Dict[str, Any]] = {} + max_points = max(64, min(int(limit or 20) * 80, 4096)) + fetched = 0 + while fetched < max_points: + points, next_offset = self.qdrant_client.scroll( + collection_name=collection, + scroll_filter=qmodels.Filter( + must=[ + qmodels.FieldCondition( + key="doc_id", + match=qmodels.MatchValue(value=doc_id), + ) + ] + ), + offset=offset, + limit=256, + with_payload=True, + ) + if not points: + break + fetched += len(points) + for p in points: + payload = getattr(p, "payload", {}) or {} + ver_raw = payload.get("version_no") + if isinstance(ver_raw, int): + ver = ver_raw + elif isinstance(ver_raw, str) and ver_raw.isdigit(): + ver = int(ver_raw) + else: + ver = 1 + + existing = seen.get(ver) + ts = payload.get("timestamp") + if not existing or (ts and str(ts) > str(existing.get("created_at") or "")): + seen[ver] = { + "id": None, + "agent_id": (agent_id or "").lower(), + "doc_id": doc_id, + "version_no": int(ver), + "text_hash": None, + "text_len": None, + "text_preview": None, + "file_name": payload.get("file_name"), + "dao_id": payload.get("dao_id"), + "user_id": payload.get("user_id"), + "storage_ref": payload.get("storage_ref"), + "source": payload.get("source") or "ingest", + "metadata": payload.get("metadata") or {}, + "created_at": ts, + } + if not next_offset: + break + offset = next_offset + rows_out = sorted(seen.values(), key=lambda x: int(x.get("version_no") or 0), reverse=True)[: max(1, min(int(limit or 20), 200))] + except Exception: + pass + + return rows_out + + def _build_doc_filter( + self, + doc_id: str, + dao_id: Optional[str] = None, + ): + from qdrant_client.http import models as qmodels + + must_conditions = [ + qmodels.FieldCondition( + key="doc_id", + match=qmodels.MatchValue(value=doc_id), + ) + ] + if dao_id: + must_conditions.append( + qmodels.FieldCondition( + key="dao_id", + match=qmodels.MatchValue(value=dao_id), + ) + ) + return qmodels.Filter(must=must_conditions) + + def _delete_document_points( + self, + collection: str, + doc_id: str, + dao_id: Optional[str] = None, + ) -> bool: + if not self.qdrant_client: + return False + try: + from qdrant_client.http import models as qmodels + + self.qdrant_client.delete( + collection_name=collection, + points_selector=qmodels.FilterSelector( + filter=self._build_doc_filter(doc_id=doc_id, dao_id=dao_id) + ), + ) + return True + except Exception as e: + logger.warning(f"delete_document_points failed for {collection}/{doc_id}: {e}") + return False + async def ingest_document_chunks( self, agent_id: str, @@ -1304,6 +1623,10 @@ class MemoryRetrieval: dao_id: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, + replace_existing: bool = False, + version_no: Optional[int] = None, + source: str = "ingest", + storage_ref: Optional[str] = None, ) -> Dict[str, Any]: """ Ingest normalized document chunks into {agent_id}_docs collection. @@ -1341,6 +1664,7 @@ class MemoryRetrieval: logger.info(f"✅ Created collection: {collection}") total = len(chunks) + resolved_version_no = int(version_no or 0) or await self._next_document_version_no(agent_id=agent_id, doc_id=doc_id) for idx, chunk in enumerate(chunks): emb = await self.get_embedding(chunk[:2000]) if not emb: @@ -1355,6 +1679,9 @@ class MemoryRetrieval: "chunk_index": idx, "chunks_total": total, "type": "document_chunk", + "version_no": int(resolved_version_no), + "source": source, + "storage_ref": storage_ref, "timestamp": datetime.utcnow().isoformat(), } if isinstance(metadata, dict) and metadata: @@ -1370,18 +1697,70 @@ class MemoryRetrieval: if not stored_points: return {"ok": False, "error": "embedding_failed"} + # Keep previous versions in the same collection when updating. + # Query path will select only the latest version_no for doc_id. + self.qdrant_client.upsert(collection_name=collection, points=stored_points) + version_row = await self._record_document_version( + agent_id=agent_id, + doc_id=doc_id, + version_no=resolved_version_no, + text=body, + file_name=file_name, + dao_id=dao_id, + user_id=user_id, + metadata=metadata, + source=source, + storage_ref=storage_ref, + ) return { "ok": True, "doc_id": doc_id, + "version_no": int(resolved_version_no), + "version_id": version_row.get("id"), "chunks_total": len(chunks), "chunks_stored": len(stored_points), + "replaced_existing": bool(replace_existing), "collection": collection, } except Exception as e: logger.warning(f"ingest_document_chunks failed for {collection}: {e}") return {"ok": False, "error": str(e)} + async def update_document_chunks( + self, + agent_id: str, + doc_id: str, + file_name: Optional[str], + text: str, + dao_id: Optional[str] = None, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + storage_ref: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Update existing document content with version bump. + Keeps the same logical doc_id and replaces indexed chunks. + """ + next_version = await self._next_document_version_no(agent_id=agent_id, doc_id=doc_id) + result = await self.ingest_document_chunks( + agent_id=agent_id, + doc_id=doc_id, + file_name=file_name, + text=text, + dao_id=dao_id, + user_id=user_id, + metadata=metadata, + replace_existing=False, + version_no=next_version, + source="update", + storage_ref=storage_ref, + ) + if result.get("ok"): + result["updated"] = True + result["replaced_existing"] = True + return result + async def query_document_chunks( self, agent_id: str, @@ -1412,12 +1791,20 @@ class MemoryRetrieval: from qdrant_client.http import models as qmodels must_conditions = [] if doc_id: + latest_ver = await self._latest_document_version_no(agent_id=agent_id, doc_id=doc_id) must_conditions.append( qmodels.FieldCondition( key="doc_id", match=qmodels.MatchValue(value=doc_id), ) ) + if latest_ver > 0: + must_conditions.append( + qmodels.FieldCondition( + key="version_no", + match=qmodels.MatchValue(value=int(latest_ver)), + ) + ) if dao_id: must_conditions.append( qmodels.FieldCondition( @@ -1455,6 +1842,7 @@ class MemoryRetrieval: "file_name": payload.get("file_name"), "chunk_index": payload.get("chunk_index"), "chunks_total": payload.get("chunks_total"), + "version_no": payload.get("version_no"), } )