Files
microdao-daarion/docs/cursor/channel_agnostic_doc_flow_task.md
Apple 4601c6fca8 feat: add Vision Encoder service + Vision RAG implementation
- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
  - FastAPI app with text/image embedding endpoints (768-dim)
  - Docker support with NVIDIA GPU runtime
  - Port 8001, health checks, model info API

- Qdrant Vector Database integration
  - Port 6333/6334 (HTTP/gRPC)
  - Image embeddings storage (768-dim, Cosine distance)
  - Auto collection creation

- Vision RAG implementation
  - VisionEncoderClient (Python client for API)
  - Image Search module (text-to-image, image-to-image)
  - Vision RAG routing in DAGI Router (mode: image_search)
  - VisionEncoderProvider integration

- Documentation (5000+ lines)
  - SYSTEM-INVENTORY.md - Complete system inventory
  - VISION-ENCODER-STATUS.md - Service status
  - VISION-RAG-IMPLEMENTATION.md - Implementation details
  - vision_encoder_deployment_task.md - Deployment checklist
  - services/vision-encoder/README.md - Deployment guide
  - Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook

- Testing
  - test-vision-encoder.sh - Smoke tests (6 tests)
  - Unit tests for client, image search, routing

- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)

Status: Production Ready 
2025-11-17 05:24:36 -08:00

14 KiB
Raw Blame History

Task: Channel-agnostic document workflow (PDF + RAG)

Goal

Make the document (PDF) parsing + RAG workflow channel-agnostic, so it can be reused by:

  • Telegram bots (DAARWIZZ, Helion)
  • Web applications
  • Mobile apps
  • Any other client via HTTP API

This task defines a shared doc_service, HTTP endpoints for non-Telegram clients, and integration of Telegram handlers with this shared layer.

NOTE: If this task is re-run on a repo where it is already implemented, it should be treated as a validation/refinement task. Existing structures (services, endpoints) SHOULD NOT be removed, only improved if necessary.


Context

Existing components (expected state)

  • Repo root: microdao-daarion/
  • Gateway service: gateway-bot/

Key files:

  • gateway-bot/http_api.py

    • Telegram handlers for DAARWIZZ (/telegram/webhook) and Helion (/helion/telegram/webhook).
    • Voice → STT flow (Whisper via STT_SERVICE_URL).
    • Discord handler.
    • Helper functions: get_telegram_file_path, send_telegram_message.
  • gateway-bot/memory_client.py

    • MemoryClient with methods:
      • get_context, save_chat_turn, create_dialog_summary, upsert_fact.
  • gateway-bot/app.py

    • FastAPI app, includes http_api.router as gateway_router.
    • CORS configuration.

Router + parser (already implemented in router project):

  • DAGI Router supports:
    • mode: "doc_parse" with provider parser → OCRProvider → parser-service (DotsOCR).
    • mode: "rag_query" for RAG questions.
  • parser-service is available at http://parser-service:9400.

The goal of this task is to:

  1. Add channel-agnostic document service into gateway-bot.
  2. Add /api/doc/* HTTP endpoints for web/mobile.
  3. Refactor Telegram handlers to use this service for PDF, /ingest, and RAG follow-ups.
  4. Store document context in Memory Service via fact_key = "doc_context:{session_id}".

Changes to implement

1. Create service: gateway-bot/services/doc_service.py

Create a new directory and file:

  • gateway-bot/services/__init__.py
  • gateway-bot/services/doc_service.py

1.1. Pydantic models

Define models:

  • QAItem — single Q&A pair
  • ParsedResult — result of document parsing
  • IngestResult — result of ingestion into RAG
  • QAResult — result of RAG query about a document
  • DocContext — stored document context

Example fields (can be extended as needed):

  • QAItem: question: str, answer: str
  • ParsedResult:
    • success: bool
    • doc_id: Optional[str]
    • qa_pairs: Optional[List[QAItem]]
    • markdown: Optional[str]
    • chunks_meta: Optional[Dict[str, Any]] (e.g., {"count": int, "chunks": [...]})
    • raw: Optional[Dict[str, Any]] (full payload from router)
    • error: Optional[str]
  • IngestResult:
    • success: bool
    • doc_id: Optional[str]
    • ingested_chunks: int
    • status: str
    • error: Optional[str]
  • QAResult:
    • success: bool
    • answer: Optional[str]
    • doc_id: Optional[str]
    • sources: Optional[List[Dict[str, Any]]]
    • error: Optional[str]
  • DocContext:
    • doc_id: str
    • dao_id: Optional[str]
    • user_id: Optional[str]
    • doc_url: Optional[str]
    • file_name: Optional[str]
    • saved_at: Optional[str]

1.2. DocumentService class

Implement DocumentService using router_client.send_to_router and memory_client:

Methods:

  • async def save_doc_context(session_id, doc_id, doc_url=None, file_name=None, dao_id=None) -> bool

    • Uses memory_client.upsert_fact with:
      • fact_key = f"doc_context:{session_id}"
      • fact_value_json = {"doc_id", "doc_url", "file_name", "dao_id", "saved_at"}.
    • Extract user_id from session_id (e.g., telegram:123user_id="123").
  • async def get_doc_context(session_id) -> Optional[DocContext]

    • Uses memory_client.get_fact(user_id, fact_key).
    • If fact_value_json exists, return DocContext(**fact_value_json).
  • async def parse_document(session_id, doc_url, file_name, dao_id, user_id, output_mode="qa_pairs", metadata=None) -> ParsedResult

    • Builds router request:
      • mode: "doc_parse"
      • agent: "parser"
      • metadata: includes source (derived from session_id), dao_id, user_id, session_id and optional metadata.
      • payload: includes doc_url, file_name, output_mode, dao_id, user_id.
    • Calls send_to_router.
    • On success:
      • Extract doc_id from response.
      • Call save_doc_context.
      • Map qa_pairs, markdown, chunks into ParsedResult.
  • async def ingest_document(session_id, doc_id=None, doc_url=None, file_name=None, dao_id=None, user_id=None) -> IngestResult

    • If doc_id is None, load from get_doc_context.
    • Build router request with mode: "doc_parse", payload.output_mode="chunks", payload.ingest=True and doc_url / doc_id.
    • Return IngestResult with ingested_chunks based on chunks length.
  • async def ask_about_document(session_id, question, doc_id=None, dao_id=None, user_id=None) -> QAResult

    • If doc_id is None, load from get_doc_context.
    • Build router request with mode: "rag_query" and payload containing question, dao_id, user_id, doc_id.
    • Return QAResult with answer and optional sources.

Provide small helper method:

  • _extract_source(session_id: str) -> str → returns first segment before : (e.g. "telegram", "web").

At bottom of the file, export convenience functions:

  • doc_service = DocumentService()
  • Top-level async wrappers:
    • parse_document(...), ingest_document(...), ask_about_document(...), save_doc_context(...), get_doc_context(...).

IMPORTANT: No Telegram-specific logic (emoji, message length, /ingest hints) in this file.


2. Extend MemoryClient: gateway-bot/memory_client.py

Add method:

async def get_fact(self, user_id: str, fact_key: str, team_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
    """Get single fact by key"""
  • Use Memory Service HTTP API, e.g.:
    • GET {base_url}/facts/{fact_key} with user_id and optional team_id in query params.
    • Return response.json() on 200, else None.

This method will be used by doc_service.get_doc_context.

Do not change existing public methods.


3. HTTP API for web/mobile: gateway-bot/http_api_doc.py

Create gateway-bot/http_api_doc.py with:

  • APIRouter() named router.
  • Import from services.doc_service:
    • parse_document, ingest_document, ask_about_document, get_doc_context, and models.

Endpoints:

  1. POST /api/doc/parse

    Request (JSON body, Pydantic model ParseDocumentRequest):

    • session_id: str
    • doc_url: str
    • file_name: str
    • dao_id: str
    • user_id: str
    • output_mode: str = "qa_pairs"
    • metadata: Optional[Dict[str, Any]]

    Behaviour:

    • Call parse_document(...) from doc_service.
    • On failure → HTTPException(status_code=400, detail=result.error).
    • On success → JSON with doc_id, qa_pairs (as list of dict), markdown, chunks_meta, raw.
  2. POST /api/doc/ingest

    Request (IngestDocumentRequest):

    • session_id: str
    • doc_id: Optional[str]
    • doc_url: Optional[str]
    • file_name: Optional[str]
    • dao_id: Optional[str]
    • user_id: Optional[str]

    Behaviour:

    • If doc_id is missing, use get_doc_context(session_id).
    • Call ingest_document(...).
    • Return doc_id, ingested_chunks, status.
  3. POST /api/doc/ask

    Request (AskDocumentRequest):

    • session_id: str
    • question: str
    • doc_id: Optional[str]
    • dao_id: Optional[str]
    • user_id: Optional[str]

    Behaviour:

    • If doc_id is missing, use get_doc_context(session_id).
    • Call ask_about_document(...).
    • Return answer, doc_id, and sources (if any).
  4. GET /api/doc/context/{session_id}

    Behaviour:

    • Use get_doc_context(session_id).
    • If missing → 404.
    • Else return doc_id, dao_id, user_id, doc_url, file_name, saved_at.

Optional: POST /api/doc/parse/upload stub for future file-upload handling (currently can return 501 with note to use doc_url).


4. Wire API into app: gateway-bot/app.py

Update app.py:

  • Import both routers:

    from http_api import router as gateway_router
    from http_api_doc import router as doc_router
    
  • Include them:

    app.include_router(gateway_router, prefix="", tags=["gateway"])
    app.include_router(doc_router, prefix="", tags=["docs"])
    
  • Update root endpoint / to list new endpoints:

    • "POST /api/doc/parse"
    • "POST /api/doc/ingest"
    • "POST /api/doc/ask"
    • "GET /api/doc/context/{session_id}"

5. Refactor Telegram handlers: gateway-bot/http_api.py

Update http_api.py so Telegram uses doc_service for PDF/ingest/RAG, keeping existing chat/voice flows.

5.1. Imports and constants

  • Add imports:

    from services.doc_service import (
        parse_document,
        ingest_document,
        ask_about_document,
        get_doc_context,
    )
    
  • Define Telegram length limits:

    TELEGRAM_MAX_MESSAGE_LENGTH = 4096
    TELEGRAM_SAFE_LENGTH = 3500
    

5.2. DAARWIZZ /telegram/webhook

Inside telegram_webhook:

  1. /ingest command

    • Check text from message: if starts with /ingest:
      • session_id = f"telegram:{chat_id}".
      • If message also contains a PDF document:
        • Use get_telegram_file_path(file_id) and correct bot token to build file_url.
        • await send_telegram_message(chat_id, "📥 Імпортую документ у RAG...").
        • Call ingest_document(session_id, doc_url=file_url, file_name=file_name, dao_id, user_id=f"tg:{user_id}").
      • Else:
        • Call ingest_document(session_id, dao_id=dao_id, user_id=f"tg:{user_id}") and rely on stored context.
      • Send success/failure message.
  2. PDF detection

    • Check document = update.message.get("document").
    • Determine is_pdf via mime_type and/or file_name.endswith(".pdf").
    • If PDF:
      • Log file info.
      • Get file_path via get_telegram_file_path(file_id) + correct token → file_url.
      • Send "📄 Обробляю PDF-документ...".
      • session_id = f"telegram:{chat_id}".
      • Call parse_document(session_id, doc_url=file_url, file_name=file_name, dao_id, user_id=f"tg:{user_id}", output_mode="qa_pairs", metadata={"username": username, "chat_id": chat_id}).
      • On success, format:
        • Prefer Q&A (result.qa_pairs) → format_qa_response(...).
        • Else markdown → format_markdown_response(...).
        • Else chunks → format_chunks_response(...).
      • Append hint: "\n\n💡 _Використай /ingest для імпорту документа у RAG_".
      • Send response via send_telegram_message.
  3. RAG follow-up questions

    • After computing text (from voice or direct text), before regular chat routing:
      • session_id = f"telegram:{chat_id}".
      • Load doc_context = await get_doc_context(session_id).
      • If doc_context.doc_id exists and text looks like a question (contains ? or Ukrainian question words):
        • Call ask_about_document(session_id, question=text, doc_id=doc_context.doc_id, dao_id=dao_id or doc_context.dao_id, user_id=f"tg:{user_id}").
        • If success, truncate answer to TELEGRAM_SAFE_LENGTH and send as Telegram message.
        • If RAG fails → fall back to normal chat routing.
  4. Keep voice + normal chat flows

    • Existing STT flow and chat→router logic should remain as fallback for non-PDF / non-ingest / non-RAG messages.

5.3. Helion /helion/telegram/webhook

Mirror the same behaviours for Helion handler:

  • /ingest command support.
  • PDF detection and parse_document usage.
  • RAG follow-up via ask_about_document.
  • Use HELION_TELEGRAM_BOT_TOKEN for file download and message sending.
  • Preserve existing chat→router behaviour when doc flow does not apply.

5.4. Formatting helpers

Add helper functions at the bottom of http_api.py (Telegram-specific):

  • format_qa_response(qa_pairs: list, max_pairs: int = 5) -> str
    • Adds header, enumerates Q&A pairs, truncates long answers, respects TELEGRAM_SAFE_LENGTH.
  • format_markdown_response(markdown: str) -> str
    • Wraps markdown with header; truncates to TELEGRAM_SAFE_LENGTH and appends hint about /ingest if truncated.
  • format_chunks_response(chunks: list) -> str
    • Shows summary about number of chunks and previews first ~3.

IMPORTANT: These helpers handle Telegram-specific constraints and SHOULD NOT be moved into doc_service.


Acceptance criteria

  1. gateway-bot/services/doc_service.py exists and provides:

    • parse_document, ingest_document, ask_about_document, save_doc_context, get_doc_context.
    • Uses DAGI Router and Memory Service, with session_id-based context.
  2. gateway-bot/http_api_doc.py exists and defines:

    • POST /api/doc/parse
    • POST /api/doc/ingest
    • POST /api/doc/ask
    • GET /api/doc/context/{session_id}
  3. gateway-bot/app.py:

    • Includes both http_api.router and http_api_doc.router.
    • Root / lists new /api/doc/* endpoints.
  4. gateway-bot/memory_client.py:

    • Includes get_fact(...) and existing methods still work.
    • doc_service uses upsert_fact + get_fact for doc_context:{session_id}.
  5. gateway-bot/http_api.py:

    • Telegram handlers use doc_service for:
      • PDF parsing,
      • /ingest command,
      • RAG follow-up questions.
    • Continue to support existing voice→STT→chat flow and regular chat routing when doc flow isnt triggered.
  6. Web/mobile clients can call /api/doc/* to:

    • Parse documents via doc_url.
    • Ingest into RAG.
    • Ask questions about the last parsed document for given session_id.

How to run this task with Cursor

From repo root (microdao-daarion):

cursor task < docs/cursor/channel_agnostic_doc_flow_task.md

Cursor should then:

  • Create/modify the files listed above.
  • Ensure implementation matches the described architecture and acceptance criteria.