feat(docs): add versioned document update and versions APIs

This commit is contained in:
NODA1 System
2026-02-21 16:49:24 +01:00
parent 5d52cf81c4
commit f53e71a0f4
4 changed files with 764 additions and 4 deletions

View File

@@ -6,6 +6,8 @@ Endpoints:
- POST /api/doc/parse - Parse a document
- POST /api/doc/ingest - Ingest document to RAG
- POST /api/doc/ask - Ask question about document
- POST /api/doc/update - Update existing document text (versioned)
- GET /api/doc/versions/{doc_id} - List document versions
"""
import logging
from typing import Optional, Dict, Any
@@ -17,9 +19,12 @@ from services.doc_service import (
parse_document,
ingest_document,
ask_about_document,
update_document,
list_document_versions,
get_doc_context,
ParsedResult,
IngestResult,
UpdateResult,
QAResult,
DocContext
)
@@ -52,6 +57,7 @@ class IngestDocumentRequest(BaseModel):
file_name: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: str = "daarwizz"
class AskDocumentRequest(BaseModel):
@@ -61,6 +67,21 @@ class AskDocumentRequest(BaseModel):
doc_id: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: str = "daarwizz"
class UpdateDocumentRequest(BaseModel):
"""Request to update existing document content."""
session_id: str
doc_id: Optional[str] = None
doc_url: Optional[str] = None
file_name: Optional[str] = None
text: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
agent_id: str = "daarwizz"
storage_ref: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
# ========================================
@@ -167,7 +188,8 @@ async def ingest_document_endpoint(request: IngestDocumentRequest):
doc_url=request.doc_url,
file_name=request.file_name,
dao_id=request.dao_id,
user_id=request.user_id
user_id=request.user_id,
agent_id=request.agent_id,
)
if not result.success:
@@ -209,7 +231,8 @@ async def ask_about_document_endpoint(request: AskDocumentRequest):
question=request.question,
doc_id=doc_id,
dao_id=request.dao_id,
user_id=request.user_id
user_id=request.user_id,
agent_id=request.agent_id,
)
if not result.success:
@@ -227,6 +250,59 @@ async def ask_about_document_endpoint(request: AskDocumentRequest):
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/doc/update")
async def update_document_endpoint(request: UpdateDocumentRequest):
"""
Update a document and bump its version.
If text is omitted and doc_url exists, text is re-parsed from the source document.
"""
try:
result = await update_document(
session_id=request.session_id,
doc_id=request.doc_id,
doc_url=request.doc_url,
file_name=request.file_name,
text=request.text,
dao_id=request.dao_id,
user_id=request.user_id,
agent_id=request.agent_id,
storage_ref=request.storage_ref,
metadata=request.metadata,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return {
"ok": True,
"doc_id": result.doc_id,
"version_no": result.version_no,
"version_id": result.version_id,
"updated_chunks": result.updated_chunks,
"status": result.status,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Update document error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/doc/versions/{doc_id}")
async def list_document_versions_endpoint(doc_id: str, agent_id: str = "daarwizz", limit: int = 20):
"""
List document versions for agent/doc pair.
"""
try:
data = await list_document_versions(agent_id=agent_id, doc_id=doc_id, limit=limit)
if not data.get("ok"):
raise HTTPException(status_code=400, detail=data.get("error", "Failed to load versions"))
return data
except HTTPException:
raise
except Exception as e:
logger.error(f"List document versions error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/doc/context/{session_id}")
async def get_document_context(session_id: str):
"""
@@ -257,4 +333,3 @@ async def get_document_context(session_id: str):
except Exception as e:
logger.error(f"Get document context error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -51,6 +51,17 @@ class IngestResult(BaseModel):
error: Optional[str] = None
class UpdateResult(BaseModel):
"""Result of document update with version bump."""
success: bool
doc_id: Optional[str] = None
version_no: Optional[int] = None
version_id: Optional[int] = None
updated_chunks: int = 0
status: str = "unknown"
error: Optional[str] = None
class QAResult(BaseModel):
"""Result of RAG query about a document"""
success: bool
@@ -106,6 +117,27 @@ class DocumentService:
raise RuntimeError(f"Router error on {path}: {err}")
return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"}
async def _router_get_json(
self,
path: str,
timeout: float = 30.0,
) -> Dict[str, Any]:
import httpx
base = ROUTER_URL.rstrip("/")
url = f"{base}{path}"
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url)
body = {}
try:
body = resp.json()
except Exception:
body = {"ok": False, "error": f"Invalid JSON from router ({resp.status_code})"}
if resp.status_code >= 400:
err = body.get("detail") or body.get("error") or f"HTTP {resp.status_code}"
raise RuntimeError(f"Router error on {path}: {err}")
return body if isinstance(body, dict) else {"ok": False, "error": "Invalid router response type"}
def _is_excel_filename(self, file_name: Optional[str]) -> bool:
if not file_name:
return False
@@ -572,6 +604,152 @@ class DocumentService:
success=False,
error=str(e)
)
async def update_document(
self,
session_id: str,
doc_id: Optional[str] = None,
doc_url: Optional[str] = None,
file_name: Optional[str] = None,
text: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
agent_id: str = "daarwizz",
storage_ref: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> UpdateResult:
"""
Update existing document content and bump version in router memory.
"""
try:
context = await self.get_doc_context(session_id)
if context:
if not doc_id:
doc_id = context.doc_id
if not doc_url:
doc_url = context.doc_url
if not file_name:
file_name = context.file_name
if not dao_id:
dao_id = context.dao_id
if not doc_id:
return UpdateResult(
success=False,
status="failed",
error="No document context found. Provide doc_id or parse/ingest first.",
)
effective_text = (text or "").strip()
if not effective_text:
if not doc_url:
return UpdateResult(
success=False,
doc_id=doc_id,
status="failed",
error="No text or doc_url provided for update",
)
parsed = await self.parse_document(
session_id=session_id,
doc_url=doc_url,
file_name=file_name or "document",
dao_id=dao_id or "",
user_id=user_id or "",
output_mode="markdown",
metadata={"source": self._extract_source(session_id), "mode": "update"},
)
if not parsed.success:
return UpdateResult(
success=False,
doc_id=doc_id,
status="failed",
error=parsed.error or "Document parse failed",
)
effective_text = (parsed.markdown or "").strip()
if not effective_text:
return UpdateResult(
success=False,
doc_id=doc_id,
status="failed",
error="No extractable text for update",
)
meta = {
"session_id": session_id,
"source": self._extract_source(session_id),
}
if isinstance(metadata, dict):
meta.update(metadata)
response = await self._router_post_json(
"/v1/documents/update",
{
"agent_id": (agent_id or "daarwizz").lower(),
"doc_id": doc_id,
"file_name": file_name,
"text": effective_text,
"dao_id": dao_id,
"user_id": user_id,
"storage_ref": storage_ref,
"metadata": meta,
},
timeout=90.0,
)
if not response.get("ok"):
return UpdateResult(
success=False,
doc_id=doc_id,
status="failed",
error=response.get("error", "Router update failed"),
)
await self.save_doc_context(
session_id=session_id,
doc_id=doc_id,
doc_url=doc_url,
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
)
return UpdateResult(
success=True,
doc_id=response.get("doc_id") or doc_id,
version_no=int(response.get("version_no", 0) or 0) or None,
version_id=int(response.get("version_id", 0) or 0) or None,
updated_chunks=int(response.get("chunks_stored", 0) or 0),
status="updated",
)
except Exception as e:
logger.error(f"Document update failed: {e}", exc_info=True)
return UpdateResult(
success=False,
doc_id=doc_id,
status="failed",
error=str(e),
)
async def list_document_versions(
self,
agent_id: str,
doc_id: str,
limit: int = 20,
) -> Dict[str, Any]:
aid = (agent_id or "daarwizz").lower()
did = (doc_id or "").strip()
if not did:
return {"ok": False, "error": "doc_id is required", "items": []}
try:
response = await self._router_get_json(
f"/v1/documents/{did}/versions?agent_id={aid}&limit={max(1, min(int(limit or 20), 200))}",
timeout=30.0,
)
return response if isinstance(response, dict) else {"ok": False, "error": "invalid_response", "items": []}
except Exception as e:
logger.error(f"list_document_versions failed: {e}")
return {"ok": False, "error": str(e), "items": []}
async def ask_about_document(
self,
@@ -762,6 +940,42 @@ async def ask_about_document(
)
async def update_document(
session_id: str,
doc_id: Optional[str] = None,
doc_url: Optional[str] = None,
file_name: Optional[str] = None,
text: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
agent_id: str = "daarwizz",
storage_ref: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> UpdateResult:
"""Update document chunks and bump version."""
return await doc_service.update_document(
session_id=session_id,
doc_id=doc_id,
doc_url=doc_url,
file_name=file_name,
text=text,
dao_id=dao_id,
user_id=user_id,
agent_id=agent_id,
storage_ref=storage_ref,
metadata=metadata,
)
async def list_document_versions(agent_id: str, doc_id: str, limit: int = 20) -> Dict[str, Any]:
"""List document versions from router."""
return await doc_service.list_document_versions(
agent_id=agent_id,
doc_id=doc_id,
limit=limit,
)
async def save_doc_context(
session_id: str,
doc_id: str,

View File

@@ -1256,6 +1256,18 @@ class DocumentQueryRequest(BaseModel):
limit: int = 5
class DocumentUpdateRequest(BaseModel):
"""Update existing document text and bump version."""
agent_id: str
doc_id: str
file_name: Optional[str] = None
text: str
dao_id: Optional[str] = None
user_id: Optional[str] = None
storage_ref: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class SharedMemoryReviewRequest(BaseModel):
point_id: str
approve: bool
@@ -2976,6 +2988,7 @@ async def documents_query(request: DocumentQueryRequest):
"doc_id": c_doc_id,
"file_name": c_file,
"chunk_index": c_idx,
"version_no": ch.get("version_no"),
"score": round(c_score, 4),
}
)
@@ -3031,6 +3044,76 @@ async def documents_query(request: DocumentQueryRequest):
}
@app.post("/v1/documents/update")
async def documents_update(request: DocumentUpdateRequest):
"""
Replace document chunks for doc_id with new text and create a new version row.
"""
if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval:
raise HTTPException(status_code=503, detail="Memory retrieval not available")
agent_id = (request.agent_id or "").strip().lower()
if not agent_id:
raise HTTPException(status_code=400, detail="agent_id is required")
doc_id = (request.doc_id or "").strip()
if not doc_id:
raise HTTPException(status_code=400, detail="doc_id is required")
text = (request.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="text is required")
result = await memory_retrieval.update_document_chunks(
agent_id=agent_id,
doc_id=doc_id,
file_name=request.file_name,
text=text,
dao_id=request.dao_id,
user_id=request.user_id,
metadata=request.metadata,
storage_ref=request.storage_ref,
)
if not result.get("ok"):
return {
"ok": False,
"error": result.get("error", "update_failed"),
"doc_id": doc_id,
"collection": result.get("collection"),
}
return result
@app.get("/v1/documents/{doc_id}/versions")
async def documents_versions(doc_id: str, agent_id: str, limit: int = 20):
"""
List stored versions for a document.
"""
if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval:
raise HTTPException(status_code=503, detail="Memory retrieval not available")
aid = (agent_id or "").strip().lower()
if not aid:
raise HTTPException(status_code=400, detail="agent_id is required")
did = (doc_id or "").strip()
if not did:
raise HTTPException(status_code=400, detail="doc_id is required")
items = await memory_retrieval.list_document_versions(
agent_id=aid,
doc_id=did,
limit=limit,
)
return {
"ok": True,
"agent_id": aid,
"doc_id": did,
"total": len(items),
"items": items,
}
@app.get("/v1/models")
async def list_available_models():
"""List all available models across backends"""

View File

@@ -19,10 +19,10 @@ import os
import json
import logging
import re
import hashlib
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
import httpx
import asyncpg
@@ -40,6 +40,7 @@ NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "neo4j")
PENDING_QUESTIONS_LIMIT = int(os.getenv("AGENT_PENDING_QUESTIONS_LIMIT", "5"))
SHARED_AGRO_LIBRARY_ENABLED = os.getenv("AGROMATRIX_SHARED_LIBRARY_ENABLED", "true").lower() == "true"
SHARED_AGRO_LIBRARY_REQUIRE_REVIEW = os.getenv("AGROMATRIX_SHARED_LIBRARY_REQUIRE_REVIEW", "true").lower() == "true"
DOC_VERSION_PREVIEW_CHARS = int(os.getenv("DOC_VERSION_PREVIEW_CHARS", "240"))
@dataclass
@@ -245,6 +246,26 @@ class MemoryRetrieval:
ON agent_pending_questions (agent_id, channel, chat_id, user_id, status, created_at DESC);
CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_pending_questions_unique_open
ON agent_pending_questions (agent_id, channel, chat_id, user_id, question_fingerprint, status);
CREATE TABLE IF NOT EXISTS agent_document_versions (
id BIGSERIAL PRIMARY KEY,
agent_id TEXT NOT NULL,
doc_id TEXT NOT NULL,
version_no INTEGER NOT NULL,
text_hash TEXT NOT NULL,
text_len INTEGER NOT NULL DEFAULT 0,
text_preview TEXT,
file_name TEXT,
dao_id TEXT,
user_id TEXT,
storage_ref TEXT,
source TEXT NOT NULL DEFAULT 'ingest',
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (agent_id, doc_id, version_no)
);
CREATE INDEX IF NOT EXISTS idx_agent_document_versions_latest
ON agent_document_versions (agent_id, doc_id, version_no DESC);
"""
)
except Exception as e:
@@ -1295,6 +1316,304 @@ class MemoryRetrieval:
_push_current()
return chunks
async def _next_document_version_no(
self,
agent_id: str,
doc_id: str,
) -> int:
if self.pg_pool:
try:
async with self.pg_pool.acquire() as conn:
value = await conn.fetchval(
"""
SELECT COALESCE(MAX(version_no), 0) + 1
FROM agent_document_versions
WHERE agent_id = $1
AND doc_id = $2
""",
(agent_id or "").lower(),
doc_id,
)
return max(1, int(value or 1))
except Exception as e:
logger.warning(f"next_document_version_no(pg) failed: {e}")
# Fallback: infer from existing chunk payloads in Qdrant.
if self.qdrant_client:
try:
from qdrant_client.http import models as qmodels
collection = f"{(agent_id or 'daarwizz').lower()}_docs"
points, _ = self.qdrant_client.scroll(
collection_name=collection,
scroll_filter=qmodels.Filter(
must=[
qmodels.FieldCondition(
key="doc_id",
match=qmodels.MatchValue(value=doc_id),
)
]
),
limit=256,
with_payload=True,
)
current_max = 0
for p in points or []:
payload = getattr(p, "payload", {}) or {}
ver = payload.get("version_no")
if isinstance(ver, int):
current_max = max(current_max, ver)
elif isinstance(ver, str) and ver.isdigit():
current_max = max(current_max, int(ver))
return current_max + 1 if current_max > 0 else 1
except Exception as e:
logger.debug(f"next_document_version_no(qdrant) fallback failed: {e}")
return 1
async def _latest_document_version_no(
self,
agent_id: str,
doc_id: str,
) -> int:
nxt = await self._next_document_version_no(agent_id=agent_id, doc_id=doc_id)
return max(0, int(nxt) - 1)
async def _record_document_version(
self,
agent_id: str,
doc_id: str,
version_no: int,
text: str,
file_name: Optional[str] = None,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
source: str = "ingest",
storage_ref: Optional[str] = None,
) -> Dict[str, Any]:
text_body = (text or "").strip()
text_hash = hashlib.sha256(text_body.encode("utf-8")).hexdigest() if text_body else ""
text_len = len(text_body)
preview = text_body[:DOC_VERSION_PREVIEW_CHARS] if text_body else ""
payload = metadata if isinstance(metadata, dict) else {}
if not self.pg_pool:
return {"ok": True, "version_no": int(version_no), "id": None}
try:
async with self.pg_pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO agent_document_versions
(agent_id, doc_id, version_no, text_hash, text_len, text_preview,
file_name, dao_id, user_id, storage_ref, source, metadata)
VALUES
($1, $2, $3, $4, $5, $6,
$7, $8, $9, $10, $11, $12::jsonb)
ON CONFLICT (agent_id, doc_id, version_no)
DO UPDATE SET
text_hash = EXCLUDED.text_hash,
text_len = EXCLUDED.text_len,
text_preview = EXCLUDED.text_preview,
file_name = EXCLUDED.file_name,
dao_id = EXCLUDED.dao_id,
user_id = EXCLUDED.user_id,
storage_ref = EXCLUDED.storage_ref,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
RETURNING id, version_no
""",
(agent_id or "").lower(),
doc_id,
int(version_no),
text_hash,
int(text_len),
preview,
file_name,
dao_id,
user_id,
storage_ref,
source,
json.dumps(payload),
)
return {
"ok": True,
"id": int(row["id"]) if row and row.get("id") is not None else None,
"version_no": int(row["version_no"]) if row and row.get("version_no") is not None else int(version_no),
}
except Exception as e:
logger.warning(f"record_document_version failed: {e}")
return {"ok": False, "error": str(e), "version_no": int(version_no)}
async def list_document_versions(
self,
agent_id: str,
doc_id: str,
limit: int = 20,
) -> List[Dict[str, Any]]:
rows_out: List[Dict[str, Any]] = []
if self.pg_pool:
try:
async with self.pg_pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, agent_id, doc_id, version_no, text_hash, text_len, text_preview,
file_name, dao_id, user_id, storage_ref, source, metadata, created_at
FROM agent_document_versions
WHERE agent_id = $1
AND doc_id = $2
ORDER BY version_no DESC
LIMIT $3
""",
(agent_id or "").lower(),
doc_id,
max(1, min(int(limit or 20), 200)),
)
for r in rows:
meta_raw = r["metadata"]
if isinstance(meta_raw, dict):
meta_obj = meta_raw
elif isinstance(meta_raw, str):
try:
parsed = json.loads(meta_raw)
meta_obj = parsed if isinstance(parsed, dict) else {"raw": parsed}
except Exception:
meta_obj = {"raw": meta_raw}
else:
meta_obj = {}
rows_out.append(
{
"id": int(r["id"]),
"agent_id": r["agent_id"],
"doc_id": r["doc_id"],
"version_no": int(r["version_no"]),
"text_hash": r["text_hash"],
"text_len": int(r["text_len"] or 0),
"text_preview": r["text_preview"],
"file_name": r["file_name"],
"dao_id": r["dao_id"],
"user_id": r["user_id"],
"storage_ref": r["storage_ref"],
"source": r["source"],
"metadata": meta_obj,
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
}
)
return rows_out
except Exception as e:
logger.warning(f"list_document_versions failed: {e}")
# PG unavailable fallback: aggregate distinct versions from Qdrant payloads.
if self.qdrant_client:
try:
from qdrant_client.http import models as qmodels
collection = f"{(agent_id or 'daarwizz').lower()}_docs"
offset = None
seen: Dict[int, Dict[str, Any]] = {}
max_points = max(64, min(int(limit or 20) * 80, 4096))
fetched = 0
while fetched < max_points:
points, next_offset = self.qdrant_client.scroll(
collection_name=collection,
scroll_filter=qmodels.Filter(
must=[
qmodels.FieldCondition(
key="doc_id",
match=qmodels.MatchValue(value=doc_id),
)
]
),
offset=offset,
limit=256,
with_payload=True,
)
if not points:
break
fetched += len(points)
for p in points:
payload = getattr(p, "payload", {}) or {}
ver_raw = payload.get("version_no")
if isinstance(ver_raw, int):
ver = ver_raw
elif isinstance(ver_raw, str) and ver_raw.isdigit():
ver = int(ver_raw)
else:
ver = 1
existing = seen.get(ver)
ts = payload.get("timestamp")
if not existing or (ts and str(ts) > str(existing.get("created_at") or "")):
seen[ver] = {
"id": None,
"agent_id": (agent_id or "").lower(),
"doc_id": doc_id,
"version_no": int(ver),
"text_hash": None,
"text_len": None,
"text_preview": None,
"file_name": payload.get("file_name"),
"dao_id": payload.get("dao_id"),
"user_id": payload.get("user_id"),
"storage_ref": payload.get("storage_ref"),
"source": payload.get("source") or "ingest",
"metadata": payload.get("metadata") or {},
"created_at": ts,
}
if not next_offset:
break
offset = next_offset
rows_out = sorted(seen.values(), key=lambda x: int(x.get("version_no") or 0), reverse=True)[: max(1, min(int(limit or 20), 200))]
except Exception:
pass
return rows_out
def _build_doc_filter(
self,
doc_id: str,
dao_id: Optional[str] = None,
):
from qdrant_client.http import models as qmodels
must_conditions = [
qmodels.FieldCondition(
key="doc_id",
match=qmodels.MatchValue(value=doc_id),
)
]
if dao_id:
must_conditions.append(
qmodels.FieldCondition(
key="dao_id",
match=qmodels.MatchValue(value=dao_id),
)
)
return qmodels.Filter(must=must_conditions)
def _delete_document_points(
self,
collection: str,
doc_id: str,
dao_id: Optional[str] = None,
) -> bool:
if not self.qdrant_client:
return False
try:
from qdrant_client.http import models as qmodels
self.qdrant_client.delete(
collection_name=collection,
points_selector=qmodels.FilterSelector(
filter=self._build_doc_filter(doc_id=doc_id, dao_id=dao_id)
),
)
return True
except Exception as e:
logger.warning(f"delete_document_points failed for {collection}/{doc_id}: {e}")
return False
async def ingest_document_chunks(
self,
agent_id: str,
@@ -1304,6 +1623,10 @@ class MemoryRetrieval:
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
replace_existing: bool = False,
version_no: Optional[int] = None,
source: str = "ingest",
storage_ref: Optional[str] = None,
) -> Dict[str, Any]:
"""
Ingest normalized document chunks into {agent_id}_docs collection.
@@ -1341,6 +1664,7 @@ class MemoryRetrieval:
logger.info(f"✅ Created collection: {collection}")
total = len(chunks)
resolved_version_no = int(version_no or 0) or await self._next_document_version_no(agent_id=agent_id, doc_id=doc_id)
for idx, chunk in enumerate(chunks):
emb = await self.get_embedding(chunk[:2000])
if not emb:
@@ -1355,6 +1679,9 @@ class MemoryRetrieval:
"chunk_index": idx,
"chunks_total": total,
"type": "document_chunk",
"version_no": int(resolved_version_no),
"source": source,
"storage_ref": storage_ref,
"timestamp": datetime.utcnow().isoformat(),
}
if isinstance(metadata, dict) and metadata:
@@ -1370,18 +1697,70 @@ class MemoryRetrieval:
if not stored_points:
return {"ok": False, "error": "embedding_failed"}
# Keep previous versions in the same collection when updating.
# Query path will select only the latest version_no for doc_id.
self.qdrant_client.upsert(collection_name=collection, points=stored_points)
version_row = await self._record_document_version(
agent_id=agent_id,
doc_id=doc_id,
version_no=resolved_version_no,
text=body,
file_name=file_name,
dao_id=dao_id,
user_id=user_id,
metadata=metadata,
source=source,
storage_ref=storage_ref,
)
return {
"ok": True,
"doc_id": doc_id,
"version_no": int(resolved_version_no),
"version_id": version_row.get("id"),
"chunks_total": len(chunks),
"chunks_stored": len(stored_points),
"replaced_existing": bool(replace_existing),
"collection": collection,
}
except Exception as e:
logger.warning(f"ingest_document_chunks failed for {collection}: {e}")
return {"ok": False, "error": str(e)}
async def update_document_chunks(
self,
agent_id: str,
doc_id: str,
file_name: Optional[str],
text: str,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
storage_ref: Optional[str] = None,
) -> Dict[str, Any]:
"""
Update existing document content with version bump.
Keeps the same logical doc_id and replaces indexed chunks.
"""
next_version = await self._next_document_version_no(agent_id=agent_id, doc_id=doc_id)
result = await self.ingest_document_chunks(
agent_id=agent_id,
doc_id=doc_id,
file_name=file_name,
text=text,
dao_id=dao_id,
user_id=user_id,
metadata=metadata,
replace_existing=False,
version_no=next_version,
source="update",
storage_ref=storage_ref,
)
if result.get("ok"):
result["updated"] = True
result["replaced_existing"] = True
return result
async def query_document_chunks(
self,
agent_id: str,
@@ -1412,12 +1791,20 @@ class MemoryRetrieval:
from qdrant_client.http import models as qmodels
must_conditions = []
if doc_id:
latest_ver = await self._latest_document_version_no(agent_id=agent_id, doc_id=doc_id)
must_conditions.append(
qmodels.FieldCondition(
key="doc_id",
match=qmodels.MatchValue(value=doc_id),
)
)
if latest_ver > 0:
must_conditions.append(
qmodels.FieldCondition(
key="version_no",
match=qmodels.MatchValue(value=int(latest_ver)),
)
)
if dao_id:
must_conditions.append(
qmodels.FieldCondition(
@@ -1455,6 +1842,7 @@ class MemoryRetrieval:
"file_name": payload.get("file_name"),
"chunk_index": payload.get("chunk_index"),
"chunks_total": payload.get("chunks_total"),
"version_no": payload.get("version_no"),
}
)