Skip to content

Task: Channel-agnostic document workflow (PDF + RAG)

Goal

Make the document (PDF) parsing + RAG workflow channel-agnostic, so it can be reused by:

  • Telegram bots (DAARWIZZ, Helion)
  • Web applications
  • Mobile apps
  • Any other client via HTTP API

This task defines a shared doc_service, HTTP endpoints for non-Telegram clients, and integration of Telegram handlers with this shared layer.

NOTE: If this task is re-run on a repo where it is already implemented, it should be treated as a validation/refinement task. Existing structures (services, endpoints) SHOULD NOT be removed, only improved if necessary.


Context

Existing components (expected state)

  • Repo root: microdao-daarion/
  • Gateway service: gateway-bot/

Key files:

  • gateway-bot/http_api.py
  • Telegram handlers for DAARWIZZ (/telegram/webhook) and Helion (/helion/telegram/webhook).
  • Voice → STT flow (Whisper via STT_SERVICE_URL).
  • Discord handler.
  • Helper functions: get_telegram_file_path, send_telegram_message.

  • gateway-bot/memory_client.py

  • MemoryClient with methods:

    • get_context, save_chat_turn, create_dialog_summary, upsert_fact.
  • gateway-bot/app.py

  • FastAPI app, includes http_api.router as gateway_router.
  • CORS configuration.

Router + parser (already implemented in router project):

  • DAGI Router supports:
  • mode: "doc_parse" with provider parser → OCRProvider → parser-service (DotsOCR).
  • mode: "rag_query" for RAG questions.
  • parser-service is available at http://parser-service:9400.

The goal of this task is to:

  1. Add channel-agnostic document service into gateway-bot.
  2. Add /api/doc/* HTTP endpoints for web/mobile.
  3. Refactor Telegram handlers to use this service for PDF, /ingest, and RAG follow-ups.
  4. Store document context in Memory Service via fact_key = "doc_context:{session_id}".

Changes to implement

1. Create service: gateway-bot/services/doc_service.py

Create a new directory and file:

  • gateway-bot/services/__init__.py
  • gateway-bot/services/doc_service.py

1.1. Pydantic models

Define models:

  • QAItem — single Q&A pair
  • ParsedResult — result of document parsing
  • IngestResult — result of ingestion into RAG
  • QAResult — result of RAG query about a document
  • DocContext — stored document context

Example fields (can be extended as needed):

  • QAItem: question: str, answer: str
  • ParsedResult:
  • success: bool
  • doc_id: Optional[str]
  • qa_pairs: Optional[List[QAItem]]
  • markdown: Optional[str]
  • chunks_meta: Optional[Dict[str, Any]] (e.g., {"count": int, "chunks": [...]})
  • raw: Optional[Dict[str, Any]] (full payload from router)
  • error: Optional[str]
  • IngestResult:
  • success: bool
  • doc_id: Optional[str]
  • ingested_chunks: int
  • status: str
  • error: Optional[str]
  • QAResult:
  • success: bool
  • answer: Optional[str]
  • doc_id: Optional[str]
  • sources: Optional[List[Dict[str, Any]]]
  • error: Optional[str]
  • DocContext:
  • doc_id: str
  • dao_id: Optional[str]
  • user_id: Optional[str]
  • doc_url: Optional[str]
  • file_name: Optional[str]
  • saved_at: Optional[str]

1.2. DocumentService class

Implement DocumentService using router_client.send_to_router and memory_client:

Methods:

  • async def save_doc_context(session_id, doc_id, doc_url=None, file_name=None, dao_id=None) -> bool
  • Uses memory_client.upsert_fact with:
    • fact_key = f"doc_context:{session_id}"
    • fact_value_json = {"doc_id", "doc_url", "file_name", "dao_id", "saved_at"}.
  • Extract user_id from session_id (e.g., telegram:123user_id="123").

  • async def get_doc_context(session_id) -> Optional[DocContext]

  • Uses memory_client.get_fact(user_id, fact_key).
  • If fact_value_json exists, return DocContext(**fact_value_json).

  • async def parse_document(session_id, doc_url, file_name, dao_id, user_id, output_mode="qa_pairs", metadata=None) -> ParsedResult

  • Builds router request:
    • mode: "doc_parse"
    • agent: "parser"
    • metadata: includes source (derived from session_id), dao_id, user_id, session_id and optional metadata.
    • payload: includes doc_url, file_name, output_mode, dao_id, user_id.
  • Calls send_to_router.
  • On success:

    • Extract doc_id from response.
    • Call save_doc_context.
    • Map qa_pairs, markdown, chunks into ParsedResult.
  • async def ingest_document(session_id, doc_id=None, doc_url=None, file_name=None, dao_id=None, user_id=None) -> IngestResult

  • If doc_id is None, load from get_doc_context.
  • Build router request with mode: "doc_parse", payload.output_mode="chunks", payload.ingest=True and doc_url / doc_id.
  • Return IngestResult with ingested_chunks based on chunks length.

  • async def ask_about_document(session_id, question, doc_id=None, dao_id=None, user_id=None) -> QAResult

  • If doc_id is None, load from get_doc_context.
  • Build router request with mode: "rag_query" and payload containing question, dao_id, user_id, doc_id.
  • Return QAResult with answer and optional sources.

Provide small helper method:

  • _extract_source(session_id: str) -> str → returns first segment before : (e.g. "telegram", "web").

At bottom of the file, export convenience functions:

  • doc_service = DocumentService()
  • Top-level async wrappers:
  • parse_document(...), ingest_document(...), ask_about_document(...), save_doc_context(...), get_doc_context(...).

IMPORTANT: No Telegram-specific logic (emoji, message length, /ingest hints) in this file.


2. Extend MemoryClient: gateway-bot/memory_client.py

Add method:

async def get_fact(self, user_id: str, fact_key: str, team_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
    """Get single fact by key"""
  • Use Memory Service HTTP API, e.g.:
  • GET {base_url}/facts/{fact_key} with user_id and optional team_id in query params.
  • Return response.json() on 200, else None.

This method will be used by doc_service.get_doc_context.

Do not change existing public methods.


3. HTTP API for web/mobile: gateway-bot/http_api_doc.py

Create gateway-bot/http_api_doc.py with:

  • APIRouter() named router.
  • Import from services.doc_service:
  • parse_document, ingest_document, ask_about_document, get_doc_context, and models.

Endpoints:

  1. POST /api/doc/parse

Request (JSON body, Pydantic model ParseDocumentRequest):

  • session_id: str
  • doc_url: str
  • file_name: str
  • dao_id: str
  • user_id: str
  • output_mode: str = "qa_pairs"
  • metadata: Optional[Dict[str, Any]]

Behaviour:

  • Call parse_document(...) from doc_service.
  • On failure → HTTPException(status_code=400, detail=result.error).
  • On success → JSON with doc_id, qa_pairs (as list of dict), markdown, chunks_meta, raw.

  • POST /api/doc/ingest

Request (IngestDocumentRequest):

  • session_id: str
  • doc_id: Optional[str]
  • doc_url: Optional[str]
  • file_name: Optional[str]
  • dao_id: Optional[str]
  • user_id: Optional[str]

Behaviour:

  • If doc_id is missing, use get_doc_context(session_id).
  • Call ingest_document(...).
  • Return doc_id, ingested_chunks, status.

  • POST /api/doc/ask

Request (AskDocumentRequest):

  • session_id: str
  • question: str
  • doc_id: Optional[str]
  • dao_id: Optional[str]
  • user_id: Optional[str]

Behaviour:

  • If doc_id is missing, use get_doc_context(session_id).
  • Call ask_about_document(...).
  • Return answer, doc_id, and sources (if any).

  • GET /api/doc/context/{session_id}

Behaviour:

  • Use get_doc_context(session_id).
  • If missing → 404.
  • Else return doc_id, dao_id, user_id, doc_url, file_name, saved_at.

Optional: POST /api/doc/parse/upload stub for future file-upload handling (currently can return 501 with note to use doc_url).


4. Wire API into app: gateway-bot/app.py

Update app.py:

  • Import both routers:

python from http_api import router as gateway_router from http_api_doc import router as doc_router

  • Include them:

python app.include_router(gateway_router, prefix="", tags=["gateway"]) app.include_router(doc_router, prefix="", tags=["docs"])

  • Update root endpoint / to list new endpoints:

  • "POST /api/doc/parse"

  • "POST /api/doc/ingest"
  • "POST /api/doc/ask"
  • "GET /api/doc/context/{session_id}"

5. Refactor Telegram handlers: gateway-bot/http_api.py

Update http_api.py so Telegram uses doc_service for PDF/ingest/RAG, keeping existing chat/voice flows.

5.1. Imports and constants

  • Add imports:

python from services.doc_service import ( parse_document, ingest_document, ask_about_document, get_doc_context, )

  • Define Telegram length limits:

python TELEGRAM_MAX_MESSAGE_LENGTH = 4096 TELEGRAM_SAFE_LENGTH = 3500

5.2. DAARWIZZ /telegram/webhook

Inside telegram_webhook:

  1. /ingest command

  2. Check text from message: if starts with /ingest:

    • session_id = f"telegram:{chat_id}".
    • If message also contains a PDF document:
    • Use get_telegram_file_path(file_id) and correct bot token to build file_url.
    • await send_telegram_message(chat_id, "📥 Імпортую документ у RAG...").
    • Call ingest_document(session_id, doc_url=file_url, file_name=file_name, dao_id, user_id=f"tg:{user_id}").
    • Else:
    • Call ingest_document(session_id, dao_id=dao_id, user_id=f"tg:{user_id}") and rely on stored context.
    • Send success/failure message.
  3. PDF detection

  4. Check document = update.message.get("document").

  5. Determine is_pdf via mime_type and/or file_name.endswith(".pdf").
  6. If PDF:

    • Log file info.
    • Get file_path via get_telegram_file_path(file_id) + correct token → file_url.
    • Send "📄 Обробляю PDF-документ...".
    • session_id = f"telegram:{chat_id}".
    • Call parse_document(session_id, doc_url=file_url, file_name=file_name, dao_id, user_id=f"tg:{user_id}", output_mode="qa_pairs", metadata={"username": username, "chat_id": chat_id}).
    • On success, format:
    • Prefer Q&A (result.qa_pairs) → format_qa_response(...).
    • Else markdown → format_markdown_response(...).
    • Else chunks → format_chunks_response(...).
    • Append hint: "\n\n💡 _Використай /ingest для імпорту документа у RAG_".
    • Send response via send_telegram_message.
  7. RAG follow-up questions

  8. After computing text (from voice or direct text), before regular chat routing:

    • session_id = f"telegram:{chat_id}".
    • Load doc_context = await get_doc_context(session_id).
    • If doc_context.doc_id exists and text looks like a question (contains ? or Ukrainian question words):
    • Call ask_about_document(session_id, question=text, doc_id=doc_context.doc_id, dao_id=dao_id or doc_context.dao_id, user_id=f"tg:{user_id}").
    • If success, truncate answer to TELEGRAM_SAFE_LENGTH and send as Telegram message.
    • If RAG fails → fall back to normal chat routing.
  9. Keep voice + normal chat flows

  10. Existing STT flow and chat→router logic should remain as fallback for non-PDF / non-ingest / non-RAG messages.

5.3. Helion /helion/telegram/webhook

Mirror the same behaviours for Helion handler:

  • /ingest command support.
  • PDF detection and parse_document usage.
  • RAG follow-up via ask_about_document.
  • Use HELION_TELEGRAM_BOT_TOKEN for file download and message sending.
  • Preserve existing chat→router behaviour when doc flow does not apply.

5.4. Formatting helpers

Add helper functions at the bottom of http_api.py (Telegram-specific):

  • format_qa_response(qa_pairs: list, max_pairs: int = 5) -> str
  • Adds header, enumerates Q&A pairs, truncates long answers, respects TELEGRAM_SAFE_LENGTH.
  • format_markdown_response(markdown: str) -> str
  • Wraps markdown with header; truncates to TELEGRAM_SAFE_LENGTH and appends hint about /ingest if truncated.
  • format_chunks_response(chunks: list) -> str
  • Shows summary about number of chunks and previews first ~3.

IMPORTANT: These helpers handle Telegram-specific constraints and SHOULD NOT be moved into doc_service.


Acceptance criteria

  1. gateway-bot/services/doc_service.py exists and provides:
  2. parse_document, ingest_document, ask_about_document, save_doc_context, get_doc_context.
  3. Uses DAGI Router and Memory Service, with session_id-based context.

  4. gateway-bot/http_api_doc.py exists and defines:

  5. POST /api/doc/parse
  6. POST /api/doc/ingest
  7. POST /api/doc/ask
  8. GET /api/doc/context/{session_id}

  9. gateway-bot/app.py:

  10. Includes both http_api.router and http_api_doc.router.
  11. Root / lists new /api/doc/* endpoints.

  12. gateway-bot/memory_client.py:

  13. Includes get_fact(...) and existing methods still work.
  14. doc_service uses upsert_fact + get_fact for doc_context:{session_id}.

  15. gateway-bot/http_api.py:

  16. Telegram handlers use doc_service for:
    • PDF parsing,
    • /ingest command,
    • RAG follow-up questions.
  17. Continue to support existing voice→STT→chat flow and regular chat routing when doc flow isnt triggered.

  18. Web/mobile clients can call /api/doc/* to:

  19. Parse documents via doc_url.
  20. Ingest into RAG.
  21. Ask questions about the last parsed document for given session_id.

How to run this task with Cursor

From repo root (microdao-daarion):

cursor task < docs/cursor/channel_agnostic_doc_flow_task.md

Cursor should then:

  • Create/modify the files listed above.
  • Ensure implementation matches the described architecture and acceptance criteria.