feat(docs): add standard file processing and router document ingest/query

This commit is contained in:
NODA1 System
2026-02-21 14:02:59 +01:00
parent 3e3546ea89
commit 5d52cf81c4
7 changed files with 755 additions and 104 deletions

View File

@@ -1235,6 +1235,27 @@ class InferResponse(BaseModel):
file_mime: Optional[str] = None
class DocumentIngestRequest(BaseModel):
"""Ingest document text into agent-specific docs collection."""
agent_id: str
doc_id: str
file_name: Optional[str] = None
text: str
dao_id: Optional[str] = None
user_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class DocumentQueryRequest(BaseModel):
"""Query document context from agent-specific docs collection."""
agent_id: str
question: str
doc_id: Optional[str] = None
dao_id: Optional[str] = None
user_id: Optional[str] = None
limit: int = 5
class SharedMemoryReviewRequest(BaseModel):
point_id: str
approve: bool
@@ -2867,6 +2888,149 @@ async def agent_infer(agent_id: str, request: InferRequest):
)
@app.post("/v1/documents/ingest")
async def documents_ingest(request: DocumentIngestRequest):
"""
Ingest raw document text into Qdrant {agent_id}_docs.
"""
if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval:
raise HTTPException(status_code=503, detail="Memory retrieval not available")
agent_id = (request.agent_id or "").strip().lower()
if not agent_id:
raise HTTPException(status_code=400, detail="agent_id is required")
text = (request.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="text is required")
doc_id = (request.doc_id or "").strip()
if not doc_id:
# Fallback should be deterministic for same text + file
seed = f"{agent_id}:{request.file_name or ''}:{text[:400]}"
doc_id = hashlib.md5(seed.encode("utf-8")).hexdigest()[:16]
result = await memory_retrieval.ingest_document_chunks(
agent_id=agent_id,
doc_id=doc_id,
file_name=request.file_name,
text=text,
dao_id=request.dao_id,
user_id=request.user_id,
metadata=request.metadata,
)
if not result.get("ok"):
return {
"ok": False,
"error": result.get("error", "ingest_failed"),
"doc_id": doc_id,
"collection": result.get("collection"),
}
return result
@app.post("/v1/documents/query")
async def documents_query(request: DocumentQueryRequest):
"""
Query ingested document chunks and synthesize source-locked answer.
"""
if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval:
raise HTTPException(status_code=503, detail="Memory retrieval not available")
agent_id = (request.agent_id or "").strip().lower()
if not agent_id:
raise HTTPException(status_code=400, detail="agent_id is required")
question = (request.question or "").strip()
if not question:
raise HTTPException(status_code=400, detail="question is required")
lookup = await memory_retrieval.query_document_chunks(
agent_id=agent_id,
question=question,
doc_id=request.doc_id,
dao_id=request.dao_id,
limit=request.limit,
)
chunks = lookup.get("chunks") or []
if not chunks:
return {
"ok": False,
"error": lookup.get("error", "no_relevant_chunks"),
"data": {
"answer": None,
"citations": [],
"doc_id": request.doc_id,
},
}
citations: List[Dict[str, Any]] = []
context_blocks: List[str] = []
for i, ch in enumerate(chunks, start=1):
c_doc_id = ch.get("doc_id") or request.doc_id
c_file = ch.get("file_name")
c_idx = ch.get("chunk_index")
c_score = float(ch.get("score", 0.0) or 0.0)
citations.append(
{
"doc_id": c_doc_id,
"file_name": c_file,
"chunk_index": c_idx,
"score": round(c_score, 4),
}
)
src = []
if c_file:
src.append(f"file={c_file}")
if c_idx is not None:
src.append(f"chunk={int(c_idx) + 1}")
src_label = ", ".join(src) if src else "chunk"
context_blocks.append(f"[{i}] ({src_label}) {str(ch.get('text') or '').strip()[:1400]}")
answer_text = ""
try:
llm_req = InternalLLMRequest(
prompt=(
"Питання користувача:\n"
f"{question}\n\n"
"Контекст із документа (дозволено використовувати ТІЛЬКИ його):\n"
+ "\n\n".join(context_blocks)
+ "\n\n"
"Правила відповіді:\n"
"1) Відповідай лише на основі наведеного контексту.\n"
"2) Якщо даних недостатньо, прямо скажи: 'Недостатньо даних у документі'.\n"
"3) В кінці додай коротке посилання на джерело у форматі [source: N].\n"
),
llm_profile="reasoning",
max_tokens=320,
temperature=0.1,
role_context="Document QA source-locked",
metadata={"agent_id": agent_id, "mode": "documents_query"},
)
llm_resp = await internal_llm_complete(llm_req)
answer_text = (llm_resp.text or "").strip()
except Exception as e:
logger.warning(f"documents_query LLM synthesis failed: {e}")
if not answer_text:
top = chunks[0]
answer_text = (
"Знайшов релевантний фрагмент у документі, але не вдалося сформувати підсумок. "
f"Ось ключовий уривок:\n{str(top.get('text') or '').strip()[:1200]}"
)
return {
"ok": True,
"data": {
"answer": answer_text,
"citations": citations,
"doc_id": request.doc_id or chunks[0].get("doc_id"),
"chunks_used": len(chunks),
"collection": lookup.get("collection"),
},
}
@app.get("/v1/models")
async def list_available_models():
"""List all available models across backends"""

View File

@@ -1237,6 +1237,234 @@ class MemoryRetrieval:
logger.warning(f"review_shared_pending_case failed: {e}")
return {"ok": False, "error": str(e)}
def _chunk_document_text(
self,
text: str,
chunk_chars: int = 1200,
overlap_chars: int = 180,
) -> List[str]:
"""
Split document text into overlap-aware chunks for RAG indexing.
Keeps paragraph structure when possible.
"""
raw = re.sub(r"\r\n?", "\n", text or "").strip()
if not raw:
return []
paragraphs = [p.strip() for p in re.split(r"\n{2,}", raw) if p and p.strip()]
if not paragraphs:
return []
chunks: List[str] = []
current = ""
max_hard = max(chunk_chars, 600)
def _push_current() -> None:
nonlocal current
if current and len(current.strip()) >= 20:
chunks.append(current.strip())
current = ""
for para in paragraphs:
if len(para) > max_hard * 2:
_push_current()
i = 0
step = max_hard - max(80, min(overlap_chars, max_hard // 2))
while i < len(para):
part = para[i : i + max_hard]
if len(part.strip()) >= 20:
chunks.append(part.strip())
i += max(1, step)
continue
candidate = f"{current}\n\n{para}".strip() if current else para
if len(candidate) <= max_hard:
current = candidate
continue
_push_current()
if overlap_chars > 0 and chunks:
tail = chunks[-1][-overlap_chars:]
current = f"{tail}\n\n{para}".strip()
if len(current) > max_hard:
_push_current()
current = para
else:
current = para
_push_current()
return chunks
async def ingest_document_chunks(
self,
agent_id: str,
doc_id: str,
file_name: Optional[str],
text: str,
dao_id: Optional[str] = None,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Ingest normalized document chunks into {agent_id}_docs collection.
"""
if not self.qdrant_client:
return {"ok": False, "error": "qdrant_unavailable"}
if not COHERE_API_KEY:
return {"ok": False, "error": "cohere_unavailable"}
body = (text or "").strip()
if not body:
return {"ok": False, "error": "empty_document"}
chunks = self._chunk_document_text(body)
if not chunks:
return {"ok": False, "error": "no_chunks"}
collection = f"{(agent_id or 'daarwizz').lower()}_docs"
stored_points = []
try:
from qdrant_client.http import models as qmodels
import uuid
try:
self.qdrant_client.get_collection(collection)
except Exception:
self.qdrant_client.create_collection(
collection_name=collection,
vectors_config=qmodels.VectorParams(
size=1024,
distance=qmodels.Distance.COSINE,
),
)
logger.info(f"✅ Created collection: {collection}")
total = len(chunks)
for idx, chunk in enumerate(chunks):
emb = await self.get_embedding(chunk[:2000])
if not emb:
continue
payload: Dict[str, Any] = {
"text": chunk[:6000],
"doc_id": doc_id,
"file_name": file_name,
"agent_id": (agent_id or "").lower(),
"dao_id": dao_id,
"user_id": user_id,
"chunk_index": idx,
"chunks_total": total,
"type": "document_chunk",
"timestamp": datetime.utcnow().isoformat(),
}
if isinstance(metadata, dict) and metadata:
payload["metadata"] = metadata
stored_points.append(
qmodels.PointStruct(
id=str(uuid.uuid4()),
vector=emb,
payload=payload,
)
)
if not stored_points:
return {"ok": False, "error": "embedding_failed"}
self.qdrant_client.upsert(collection_name=collection, points=stored_points)
return {
"ok": True,
"doc_id": doc_id,
"chunks_total": len(chunks),
"chunks_stored": len(stored_points),
"collection": collection,
}
except Exception as e:
logger.warning(f"ingest_document_chunks failed for {collection}: {e}")
return {"ok": False, "error": str(e)}
async def query_document_chunks(
self,
agent_id: str,
question: str,
doc_id: Optional[str] = None,
dao_id: Optional[str] = None,
limit: int = 5,
) -> Dict[str, Any]:
"""
Retrieve top document chunks from {agent_id}_docs for a question.
"""
if not self.qdrant_client:
return {"ok": False, "error": "qdrant_unavailable", "chunks": []}
if not COHERE_API_KEY:
return {"ok": False, "error": "cohere_unavailable", "chunks": []}
q = (question or "").strip()
if not q:
return {"ok": False, "error": "empty_question", "chunks": []}
embedding = await self.get_embedding(q[:2000])
if not embedding:
return {"ok": False, "error": "embedding_failed", "chunks": []}
collection = f"{(agent_id or 'daarwizz').lower()}_docs"
try:
from qdrant_client.http import models as qmodels
must_conditions = []
if doc_id:
must_conditions.append(
qmodels.FieldCondition(
key="doc_id",
match=qmodels.MatchValue(value=doc_id),
)
)
if dao_id:
must_conditions.append(
qmodels.FieldCondition(
key="dao_id",
match=qmodels.MatchValue(value=dao_id),
)
)
query_filter = qmodels.Filter(must=must_conditions) if must_conditions else None
rows = self.qdrant_client.search(
collection_name=collection,
query_vector=embedding,
query_filter=query_filter,
limit=max(1, min(int(limit or 5), 12)),
with_payload=True,
)
except Exception as e:
logger.debug(f"query_document_chunks search failed for {collection}: {e}")
return {"ok": False, "error": "search_failed", "chunks": [], "collection": collection}
hits: List[Dict[str, Any]] = []
for row in rows or []:
score = float(getattr(row, "score", 0.0) or 0.0)
if score < 0.30:
continue
payload = getattr(row, "payload", {}) or {}
text = str(payload.get("text") or "").strip()
if len(text) < 10:
continue
hits.append(
{
"text": text,
"score": score,
"doc_id": payload.get("doc_id"),
"file_name": payload.get("file_name"),
"chunk_index": payload.get("chunk_index"),
"chunks_total": payload.get("chunks_total"),
}
)
return {
"ok": bool(hits),
"chunks": hits,
"collection": collection,
"doc_id": doc_id,
}
async def store_interaction(
self,
channel: str,