Files
microdao-daarion/site/cursor/rag_ingestion_events_task/index.html

1003 lines
38 KiB
HTML
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<!doctype html>
<html lang="en" class="no-js">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<link rel="canonical" href="https://IvanTytar.github.io/microdao-daarion/cursor/rag_ingestion_events_task/">
<link rel="icon" href="../../assets/images/favicon.png">
<meta name="generator" content="mkdocs-1.5.3, mkdocs-material-9.5.18">
<title>Task: Wire message.created and doc.upsert events into the RAG ingestion worker - DAARION Documentation</title>
<link rel="stylesheet" href="../../assets/stylesheets/main.66ac8b77.min.css">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto:300,300i,400,400i,700,700i%7CRoboto+Mono:400,400i,700,700i&display=fallback">
<style>:root{--md-text-font:"Roboto";--md-code-font:"Roboto Mono"}</style>
<script>__md_scope=new URL("../..",location),__md_hash=e=>[...e].reduce((e,_)=>(e<<5)-e+_.charCodeAt(0),0),__md_get=(e,_=localStorage,t=__md_scope)=>JSON.parse(_.getItem(t.pathname+"."+e)),__md_set=(e,_,t=localStorage,a=__md_scope)=>{try{t.setItem(a.pathname+"."+e,JSON.stringify(_))}catch(e){}}</script>
</head>
<body dir="ltr">
<input class="md-toggle" data-md-toggle="drawer" type="checkbox" id="__drawer" autocomplete="off">
<input class="md-toggle" data-md-toggle="search" type="checkbox" id="__search" autocomplete="off">
<label class="md-overlay" for="__drawer"></label>
<div data-md-component="skip">
<a href="#task-wire-messagecreated-and-docupsert-events-into-the-rag-ingestion-worker" class="md-skip">
Skip to content
</a>
</div>
<div data-md-component="announce">
</div>
<header class="md-header md-header--shadow" data-md-component="header">
<nav class="md-header__inner md-grid" aria-label="Header">
<a href="../.." title="DAARION Documentation" class="md-header__button md-logo" aria-label="DAARION Documentation" data-md-component="logo">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M12 8a3 3 0 0 0 3-3 3 3 0 0 0-3-3 3 3 0 0 0-3 3 3 3 0 0 0 3 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54Z"/></svg>
</a>
<label class="md-header__button md-icon" for="__drawer">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M3 6h18v2H3V6m0 5h18v2H3v-2m0 5h18v2H3v-2Z"/></svg>
</label>
<div class="md-header__title" data-md-component="header-title">
<div class="md-header__ellipsis">
<div class="md-header__topic">
<span class="md-ellipsis">
DAARION Documentation
</span>
</div>
<div class="md-header__topic" data-md-component="header-topic">
<span class="md-ellipsis">
Task: Wire message.created and doc.upsert events into the RAG ingestion worker
</span>
</div>
</div>
</div>
<script>var media,input,key,value,palette=__md_get("__palette");if(palette&&palette.color){"(prefers-color-scheme)"===palette.color.media&&(media=matchMedia("(prefers-color-scheme: light)"),input=document.querySelector(media.matches?"[data-md-color-media='(prefers-color-scheme: light)']":"[data-md-color-media='(prefers-color-scheme: dark)']"),palette.color.media=input.getAttribute("data-md-color-media"),palette.color.scheme=input.getAttribute("data-md-color-scheme"),palette.color.primary=input.getAttribute("data-md-color-primary"),palette.color.accent=input.getAttribute("data-md-color-accent"));for([key,value]of Object.entries(palette.color))document.body.setAttribute("data-md-color-"+key,value)}</script>
<label class="md-header__button md-icon" for="__search">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M9.5 3A6.5 6.5 0 0 1 16 9.5c0 1.61-.59 3.09-1.56 4.23l.27.27h.79l5 5-1.5 1.5-5-5v-.79l-.27-.27A6.516 6.516 0 0 1 9.5 16 6.5 6.5 0 0 1 3 9.5 6.5 6.5 0 0 1 9.5 3m0 2C7 5 5 7 5 9.5S7 14 9.5 14 14 12 14 9.5 12 5 9.5 5Z"/></svg>
</label>
<div class="md-search" data-md-component="search" role="dialog">
<label class="md-search__overlay" for="__search"></label>
<div class="md-search__inner" role="search">
<form class="md-search__form" name="search">
<input type="text" class="md-search__input" name="query" aria-label="Search" placeholder="Search" autocapitalize="off" autocorrect="off" autocomplete="off" spellcheck="false" data-md-component="search-query" required>
<label class="md-search__icon md-icon" for="__search">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M9.5 3A6.5 6.5 0 0 1 16 9.5c0 1.61-.59 3.09-1.56 4.23l.27.27h.79l5 5-1.5 1.5-5-5v-.79l-.27-.27A6.516 6.516 0 0 1 9.5 16 6.5 6.5 0 0 1 3 9.5 6.5 6.5 0 0 1 9.5 3m0 2C7 5 5 7 5 9.5S7 14 9.5 14 14 12 14 9.5 12 5 9.5 5Z"/></svg>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M20 11v2H8l5.5 5.5-1.42 1.42L4.16 12l7.92-7.92L13.5 5.5 8 11h12Z"/></svg>
</label>
<nav class="md-search__options" aria-label="Search">
<button type="reset" class="md-search__icon md-icon" title="Clear" aria-label="Clear" tabindex="-1">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M19 6.41 17.59 5 12 10.59 6.41 5 5 6.41 10.59 12 5 17.59 6.41 19 12 13.41 17.59 19 19 17.59 13.41 12 19 6.41Z"/></svg>
</button>
</nav>
</form>
<div class="md-search__output">
<div class="md-search__scrollwrap" data-md-scrollfix>
<div class="md-search-result" data-md-component="search-result">
<div class="md-search-result__meta">
Initializing search
</div>
<ol class="md-search-result__list" role="presentation"></ol>
</div>
</div>
</div>
</div>
</div>
</nav>
</header>
<div class="md-container" data-md-component="container">
<main class="md-main" data-md-component="main">
<div class="md-main__inner md-grid">
<div class="md-sidebar md-sidebar--primary" data-md-component="sidebar" data-md-type="navigation" >
<div class="md-sidebar__scrollwrap">
<div class="md-sidebar__inner">
<nav class="md-nav md-nav--primary" aria-label="Navigation" data-md-level="0">
<label class="md-nav__title" for="__drawer">
<a href="../.." title="DAARION Documentation" class="md-nav__button md-logo" aria-label="DAARION Documentation" data-md-component="logo">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M12 8a3 3 0 0 0 3-3 3 3 0 0 0-3-3 3 3 0 0 0-3 3 3 3 0 0 0 3 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54Z"/></svg>
</a>
DAARION Documentation
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../../public/" class="md-nav__link">
<span class="md-ellipsis">
Home
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../public/getting-started/" class="md-nav__link">
<span class="md-ellipsis">
Getting Started
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../public/architecture-overview/" class="md-nav__link">
<span class="md-ellipsis">
Architecture
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../public/daiS_daos_overview/" class="md-nav__link">
<span class="md-ellipsis">
DAIS & DAOS
</span>
</a>
</li>
<li class="md-nav__item md-nav__item--section md-nav__item--nested">
<input class="md-nav__toggle md-toggle " type="checkbox" id="__nav_5" >
<label class="md-nav__link" for="__nav_5" id="__nav_5_label" tabindex="">
<span class="md-ellipsis">
Internal
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<nav class="md-nav" data-md-level="1" aria-labelledby="__nav_5_label" aria-expanded="false">
<label class="md-nav__title" for="__nav_5">
<span class="md-nav__icon md-icon"></span>
Internal
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item md-nav__item--nested">
<input class="md-nav__toggle md-toggle " type="checkbox" id="__nav_5_1" >
<label class="md-nav__link" for="__nav_5_1" id="__nav_5_1_label" tabindex="0">
<span class="md-ellipsis">
Infra
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<nav class="md-nav" data-md-level="2" aria-labelledby="__nav_5_1_label" aria-expanded="false">
<label class="md-nav__title" for="__nav_5_1">
<span class="md-nav__icon md-icon"></span>
Infra
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../../internal/infra/INFRA_AUTOMATION_PACK_V1/" class="md-nav__link">
<span class="md-ellipsis">
Infra Automation Pack v1
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../internal/infra/monitoring_overview/" class="md-nav__link">
<span class="md-ellipsis">
Monitoring Overview
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../internal/infra/nodes_registry_v0/" class="md-nav__link">
<span class="md-ellipsis">
Nodes Registry v0
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item md-nav__item--nested">
<input class="md-nav__toggle md-toggle " type="checkbox" id="__nav_5_2" >
<label class="md-nav__link" for="__nav_5_2" id="__nav_5_2_label" tabindex="0">
<span class="md-ellipsis">
Specs
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<nav class="md-nav" data-md-level="2" aria-labelledby="__nav_5_2_label" aria-expanded="false">
<label class="md-nav__title" for="__nav_5_2">
<span class="md-nav__icon md-icon"></span>
Specs
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../../internal/specs/matrix_presence_aggregator/" class="md-nav__link">
<span class="md-ellipsis">
Matrix Presence Aggregator
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../internal/specs/city_map_spec/" class="md-nav__link">
<span class="md-ellipsis">
City Map Spec
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../internal/specs/node_join_protocol_draft/" class="md-nav__link">
<span class="md-ellipsis">
Node Join Protocol (Draft)
</span>
</a>
</li>
</ul>
</nav>
</li>
</ul>
</nav>
</li>
</ul>
</nav>
</div>
</div>
</div>
<div class="md-sidebar md-sidebar--secondary" data-md-component="sidebar" data-md-type="toc" >
<div class="md-sidebar__scrollwrap">
<div class="md-sidebar__inner">
<nav class="md-nav md-nav--secondary" aria-label="Table of contents">
<label class="md-nav__title" for="__toc">
<span class="md-nav__icon md-icon"></span>
Table of contents
</label>
<ul class="md-nav__list" data-md-component="toc" data-md-scrollfix>
<li class="md-nav__item">
<a href="#goal" class="md-nav__link">
<span class="md-ellipsis">
Goal
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#context" class="md-nav__link">
<span class="md-ellipsis">
Context
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#1-messagecreated" class="md-nav__link">
<span class="md-ellipsis">
1. Подія message.created
</span>
</a>
<nav class="md-nav" aria-label="1. Подія message.created">
<ul class="md-nav__list">
<li class="md-nav__item">
<a href="#11" class="md-nav__link">
<span class="md-ellipsis">
1.1. Очікуваний формат події
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#12-ingestchunk" class="md-nav__link">
<span class="md-ellipsis">
1.2. Нормалізація у IngestChunk
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#13-consumer" class="md-nav__link">
<span class="md-ellipsis">
1.3. Інтеграція в consumer
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item">
<a href="#2-docupsert" class="md-nav__link">
<span class="md-ellipsis">
2. Подія doc.upsert
</span>
</a>
<nav class="md-nav" aria-label="2. Подія doc.upsert">
<ul class="md-nav__list">
<li class="md-nav__item">
<a href="#21" class="md-nav__link">
<span class="md-ellipsis">
2.1. Очікуваний формат події
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#22-ingestchunk" class="md-nav__link">
<span class="md-ellipsis">
2.2. Нормалізація у IngestChunk
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#23-consumer" class="md-nav__link">
<span class="md-ellipsis">
2.3. Інтеграція в consumer
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item">
<a href="#3" class="md-nav__link">
<span class="md-ellipsis">
3. Ідемпотентність
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#4" class="md-nav__link">
<span class="md-ellipsis">
4. Тестування
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#files-to-touch-suggested" class="md-nav__link">
<span class="md-ellipsis">
Files to touch (suggested)
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#acceptance-criteria" class="md-nav__link">
<span class="md-ellipsis">
Acceptance criteria
</span>
</a>
</li>
</ul>
</nav>
</div>
</div>
</div>
<div class="md-content" data-md-component="content">
<article class="md-content__inner md-typeset">
<h1 id="task-wire-messagecreated-and-docupsert-events-into-the-rag-ingestion-worker">Task: Wire <code>message.created</code> and <code>doc.upsert</code> events into the RAG ingestion worker<a class="headerlink" href="#task-wire-messagecreated-and-docupsert-events-into-the-rag-ingestion-worker" title="Permanent link">&para;</a></h1>
<h2 id="goal">Goal<a class="headerlink" href="#goal" title="Permanent link">&para;</a></h2>
<p>Підключити реальні доменні події до RAG ingestion воркера так, щоб:</p>
<ul>
<li>Події <code>message.created</code> та <code>doc.upsert</code> автоматично потрапляли в RAG ingestion pipeline.</li>
<li>Вони нормалізувались у <code>IngestChunk</code> (текст + метадані).</li>
<li>Чанки індексувались в Milvus (векторний стор) і за потреби в Neo4j (граф контексту).</li>
<li>Обробка була <strong>ідемпотентною</strong> та стабільною (повтор подій не ламає індекс).</li>
</ul>
<p>Це продовження <code>rag_ingestion_worker_task.md</code>: там ми описали воркер, тут — як реально підвести його до подій <code>message.created</code> і <code>doc.upsert</code>.</p>
<hr />
<h2 id="context">Context<a class="headerlink" href="#context" title="Permanent link">&para;</a></h2>
<ul>
<li>Root: <code>microdao-daarion/</code></li>
<li>Ingestion worker: <code>services/rag-ingest-worker/</code> (згідно попередньої таски).</li>
<li>Event catalog: <code>docs/cursor/42_nats_event_streams_and_event_catalog.md</code> (описує NATS streams / subjects / event types).</li>
</ul>
<p>Ми вважаємо, що:</p>
<ul>
<li>Існує NATS (або інший) event bus.</li>
<li>Є події:</li>
<li><code>message.created</code> — створення повідомлення в чаті/каналі.</li>
<li><code>doc.upsert</code> — створення/оновлення документа (wiki, spec, тощо).</li>
<li>RAG ingestion worker вже має базові пайплайни (<code>normalization</code>, <code>embedding</code>, <code>index_milvus</code>, <code>index_neo4j</code>) — хоча б як скелет.</li>
</ul>
<p>Мета цієї задачі — <strong>підʼєднатися до реальних подій</strong> і забезпечити endtoend шлях:</p>
<p><code>event → IngestChunk → embedding → Milvus (+ Neo4j)</code>.</p>
<hr />
<h2 id="1-messagecreated">1. Подія <code>message.created</code><a class="headerlink" href="#1-messagecreated" title="Permanent link">&para;</a></h2>
<h3 id="11">1.1. Очікуваний формат події<a class="headerlink" href="#11" title="Permanent link">&para;</a></h3>
<p>Орієнтуючись на Event Catalog, нормальний payload для <code>message.created</code> має виглядати приблизно так (приклад, можна адаптувати до фактичного формату):</p>
<div class="codehilite"><pre><span></span><code><span class="p">{</span>
<span class="w"> </span><span class="nt">&quot;event_type&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;message.created&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;event_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;evt_123&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;occurred_at&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;2024-11-17T10:00:00Z&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;team_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;dao_greenfood&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;channel_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;tg:12345&quot;</span><span class="w"> </span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;user_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;tg:67890&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;agent_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;daarwizz&quot;</span><span class="p">,</span><span class="w"> </span>
<span class="w"> </span><span class="nt">&quot;payload&quot;</span><span class="p">:</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="nt">&quot;message_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;msg_abc&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;text&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;Текст повідомлення...&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;attachments&quot;</span><span class="p">:</span><span class="w"> </span><span class="p">[],</span>
<span class="w"> </span><span class="nt">&quot;tags&quot;</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="s2">&quot;onboarding&quot;</span><span class="p">,</span><span class="w"> </span><span class="s2">&quot;spec&quot;</span><span class="p">],</span>
<span class="w"> </span><span class="nt">&quot;visibility&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;public&quot;</span>
<span class="w"> </span><span class="p">}</span>
<span class="p">}</span>
</code></pre></div>
<p>Якщо реальний формат інший — <strong>не міняти продакшн‑події</strong>, а в нормалізації підлаштуватись під нього.</p>
<h3 id="12-ingestchunk">1.2. Нормалізація у <code>IngestChunk</code><a class="headerlink" href="#12-ingestchunk" title="Permanent link">&para;</a></h3>
<p>У <code>services/rag-ingest-worker/pipeline/normalization.py</code> додати/оновити функцію:</p>
<div class="codehilite"><pre><span></span><code><span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">normalize_message_created</span><span class="p">(</span><span class="n">event</span><span class="p">:</span> <span class="nb">dict</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">list</span><span class="p">[</span><span class="n">IngestChunk</span><span class="p">]:</span>
<span class="o">...</span>
</code></pre></div>
<p>Правила:</p>
<ul>
<li>Якщо <code>payload.text</code> порожній — можна або пропустити chunk, або створити chunk тільки з метаданими (краще пропустити).</li>
<li>Створити один або кілька <code>IngestChunk</code> (якщо треба розбити довгі повідомлення).</li>
</ul>
<p>Поля для <code>IngestChunk</code> (мінімум):</p>
<ul>
<li><code>chunk_id</code> — детермінований, напр.:</li>
<li><code>f"msg:{event['team_id']}:{payload['message_id']}:{chunk_index}"</code> і потім захешувати.</li>
<li><code>team_id</code> = <code>event.team_id</code>.</li>
<li><code>channel_id</code> = <code>event.channel_id</code>.</li>
<li><code>agent_id</code> = <code>event.agent_id</code> (якщо є).</li>
<li><code>source_type</code> = <code>"message"</code>.</li>
<li><code>source_id</code> = <code>payload.message_id</code>.</li>
<li><code>text</code> = фрагмент тексту.</li>
<li><code>tags</code> = <code>payload.tags</code> (якщо є) + можна додати автоматику (наприклад, <code>"chat"</code>).</li>
<li><code>visibility</code> = <code>payload.visibility</code> або <code>"public"</code> за замовчуванням.</li>
<li><code>created_at</code> = <code>event.occurred_at</code>.</li>
</ul>
<p>Ця функція <strong>не повинна знати</strong> про Milvus/Neo4j — лише повертати список <code>IngestChunk</code>.</p>
<h3 id="13-consumer">1.3. Інтеграція в consumer<a class="headerlink" href="#13-consumer" title="Permanent link">&para;</a></h3>
<p>У <code>services/rag-ingest-worker/events/consumer.py</code> (або де знаходиться логіка підписки на NATS):</p>
<ul>
<li>Додати підписку на subject / stream, де живуть <code>message.created</code>.</li>
<li>У callbackі:</li>
<li>Парсити JSON event.</li>
<li>Якщо <code>event_type == "message.created"</code>:<ul>
<li>Викликати <code>normalize_message_created(event)</code><code>chunks</code>.</li>
<li>Якщо <code>chunks</code> непорожні:</li>
<li>Пустити їх через <code>embedding.embed_chunks(chunks)</code>.</li>
<li>Далі через <code>index_milvus.upsert_chunks_to_milvus(...)</code>.</li>
<li>(Опційно) якщо потрібно, зробити <code>index_neo4j.update_graph_for_event(event, chunks)</code>.</li>
</ul>
</li>
</ul>
<p>Додати логи:</p>
<ul>
<li><code>logger.info("Ingested message.created", extra={"team_id": ..., "chunks": len(chunks)})</code>.</li>
</ul>
<p>Уважно обробити винятки (catch, log, ack або nack за обраною семантикою).</p>
<hr />
<h2 id="2-docupsert">2. Подія <code>doc.upsert</code><a class="headerlink" href="#2-docupsert" title="Permanent link">&para;</a></h2>
<h3 id="21">2.1. Очікуваний формат події<a class="headerlink" href="#21" title="Permanent link">&para;</a></h3>
<p>Аналогічно, з Event Catalog, <code>doc.upsert</code> може виглядати так:</p>
<div class="codehilite"><pre><span></span><code><span class="p">{</span>
<span class="w"> </span><span class="nt">&quot;event_type&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;doc.upsert&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;event_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;evt_456&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;occurred_at&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;2024-11-17T10:05:00Z&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;team_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;dao_greenfood&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;user_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;user:abc&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;agent_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;doc_agent&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;payload&quot;</span><span class="p">:</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="nt">&quot;doc_id&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;doc_123&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;title&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;Spec RAG Gateway&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;text&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;Довгий текст документа...&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;url&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;https://daarion.city/docs/doc_123&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;tags&quot;</span><span class="p">:</span><span class="w"> </span><span class="p">[</span><span class="s2">&quot;rag&quot;</span><span class="p">,</span><span class="w"> </span><span class="s2">&quot;architecture&quot;</span><span class="p">],</span>
<span class="w"> </span><span class="nt">&quot;visibility&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;public&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="nt">&quot;doc_type&quot;</span><span class="p">:</span><span class="w"> </span><span class="s2">&quot;wiki&quot;</span>
<span class="w"> </span><span class="p">}</span>
<span class="p">}</span>
</code></pre></div>
<h3 id="22-ingestchunk">2.2. Нормалізація у <code>IngestChunk</code><a class="headerlink" href="#22-ingestchunk" title="Permanent link">&para;</a></h3>
<p>У <code>pipeline/normalization.py</code> додати/оновити:</p>
<div class="codehilite"><pre><span></span><code><span class="k">async</span> <span class="k">def</span><span class="w"> </span><span class="nf">normalize_doc_upsert</span><span class="p">(</span><span class="n">event</span><span class="p">:</span> <span class="nb">dict</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">list</span><span class="p">[</span><span class="n">IngestChunk</span><span class="p">]:</span>
<span class="o">...</span>
</code></pre></div>
<p>Правила:</p>
<ul>
<li>Якщо <code>payload.text</code> дуже довгий — розбити на чанки (наприклад, по 5121024 токени/символи).</li>
<li>
<p>Для кожного чанку створити <code>IngestChunk</code>:</p>
</li>
<li>
<p><code>chunk_id</code> = <code>f"doc:{team_id}:{doc_id}:{chunk_index}"</code> → захешувати.</p>
</li>
<li><code>team_id</code> = <code>event.team_id</code>.</li>
<li><code>source_type</code> = <code>payload.doc_type</code> або <code>"doc"</code>.</li>
<li><code>source_id</code> = <code>payload.doc_id</code>.</li>
<li><code>text</code> = текст чанку.</li>
<li><code>tags</code> = <code>payload.tags</code> + <code>payload.doc_type</code>.</li>
<li><code>visibility</code> = <code>payload.visibility</code>.</li>
<li><code>created_at</code> = <code>event.occurred_at</code>.</li>
<li>За бажанням додати <code>project_id</code> / <code>channel_id</code>, якщо вони є.</li>
</ul>
<p>Ця функція також <strong>не індексує</strong> нічого безпосередньо, лише повертає список чанків.</p>
<h3 id="23-consumer">2.3. Інтеграція в consumer<a class="headerlink" href="#23-consumer" title="Permanent link">&para;</a></h3>
<p>В <code>events/consumer.py</code> (або еквівалентному модулі):</p>
<ul>
<li>Додати обробку <code>event_type == "doc.upsert"</code> аналогічно до <code>message.created</code>:</li>
<li><code>normalize_doc_upsert(event)</code><code>chunks</code>.</li>
<li><code>embed_chunks(chunks)</code> → вектори.</li>
<li><code>upsert_chunks_to_milvus(...)</code>.</li>
<li><code>update_graph_for_event(event, chunks)</code> — створити/оновити вузол <code>(:Doc)</code> і звʼязки, наприклад:<ul>
<li><code>(:Doc {doc_id})-[:MENTIONS]-&gt;(:Topic)</code></li>
<li><code>(:Doc)-[:BELONGS_TO]-&gt;(:MicroDAO)</code> тощо.</li>
</ul>
</li>
</ul>
<hr />
<h2 id="3">3. Ідемпотентність<a class="headerlink" href="#3" title="Permanent link">&para;</a></h2>
<p>Для обох подій (<code>message.created</code>, <code>doc.upsert</code>) забезпечити, щоб <strong>повторне програвання</strong> тієї ж події не створювало дублікатів:</p>
<ul>
<li>Використовувати <code>chunk_id</code> як primary key в Milvus (idempotent upsert).</li>
<li>Для Neo4j використовувати <code>MERGE</code> на основі унікальних ключів вузлів/ребер (наприклад, <code>doc_id</code>, <code>team_id</code>, <code>source_type</code>, <code>source_id</code>, <code>chunk_index</code>).</li>
</ul>
<p>Якщо вже закладено idempotent behavior в <code>index_milvus.py</code> / <code>index_neo4j.py</code>, просто використати ці поля.</p>
<hr />
<h2 id="4">4. Тестування<a class="headerlink" href="#4" title="Permanent link">&para;</a></h2>
<p>Перед тим, як вважати інтеграцію готовою, бажано:</p>
<ol>
<li>Написати мінімальні unitтести / doctestи для <code>normalize_message_created</code> і <code>normalize_doc_upsert</code> (навіть якщо без повноцінної CI):</li>
<li>
<p>Вхідний event → список <code>IngestChunk</code> з очікуваними полями.</p>
</li>
<li>
<p>Зробити простий manual test:</p>
</li>
<li>Опублікувати штучну <code>message.created</code> у devstream.</li>
<li>Переконатися по логах воркера, що:<ul>
<li>нормалізація відбулась,</li>
<li>чанк(и) відправлені в embedding і Milvus,</li>
<li>запис зʼявився в Milvus/Neo4j (якщо є доступ).</li>
</ul>
</li>
</ol>
<hr />
<h2 id="files-to-touch-suggested">Files to touch (suggested)<a class="headerlink" href="#files-to-touch-suggested" title="Permanent link">&para;</a></h2>
<blockquote>
<p>Шлях та назви можна адаптувати до фактичної структури, але головна ідея — рознести відповідальності.</p>
</blockquote>
<ul>
<li><code>services/rag-ingest-worker/events/consumer.py</code></li>
<li>Додати підписки/обробники для <code>message.created</code> і <code>doc.upsert</code>.</li>
<li>
<p>Виклики до <code>normalize_message_created</code> / <code>normalize_doc_upsert</code> + пайплайн embedding/indexing.</p>
</li>
<li>
<p><code>services/rag-ingest-worker/pipeline/normalization.py</code></p>
</li>
<li>
<p>Додати/оновити функції:</p>
<ul>
<li><code>normalize_message_created(event)</code></li>
<li><code>normalize_doc_upsert(event)</code></li>
</ul>
</li>
<li>
<p>(Опційно) <code>services/rag-ingest-worker/pipeline/index_neo4j.py</code></p>
</li>
<li>
<p>Додати/оновити логіку побудови графових вузлів/ребер для <code>Doc</code>, <code>Topic</code>, <code>Channel</code>, <code>MicroDAO</code> тощо.</p>
</li>
<li>
<p>Тести / приклади (якщо є тестовий пакет для сервісу).</p>
</li>
</ul>
<hr />
<h2 id="acceptance-criteria">Acceptance criteria<a class="headerlink" href="#acceptance-criteria" title="Permanent link">&para;</a></h2>
<ol>
<li>
<p>RAGingest worker підписаний на події типу <code>message.created</code> і <code>doc.upsert</code> (через NATS або інший bus), принаймні в devконфігурації.</p>
</li>
<li>
<p>Для <code>message.created</code> та <code>doc.upsert</code> існують функції нормалізації, які повертають <code>IngestChunk</code> з коректними полями (<code>team_id</code>, <code>source_type</code>, <code>source_id</code>, <code>visibility</code>, <code>tags</code>, <code>created_at</code>, тощо).</p>
</li>
<li>
<p>Чанки для цих подій проходять через embeddingпайплайн і індексуються в Milvus з ідемпотентною семантикою.</p>
</li>
<li>
<p>(За можливості) для <code>doc.upsert</code> оновлюється Neo4j граф (вузол <code>Doc</code> + базові звʼязки).</p>
</li>
<li>
<p>Повторне надсилання однієї й тієї ж події не створює дублікатів у Milvus/Neo4j (idempotent behavior).</p>
</li>
<li>
<p>Можна побачити в логах воркера, що події споживаються і конвеєр відпрацьовує (інформаційні логи з team_id, event_type, chunks_count).</p>
</li>
<li>
<p>Цей файл (<code>docs/cursor/rag_ingestion_events_task.md</code>) можна виконати через Cursor:</p>
</li>
</ol>
<p><code>bash
cursor task &lt; docs/cursor/rag_ingestion_events_task.md</code></p>
<p>і Cursor буде використовувати його як єдине джерело правди для інтеграції подій <code>message.created</code>/<code>doc.upsert</code> у ingestionворкер.</p>
</article>
</div>
<script>var target=document.getElementById(location.hash.slice(1));target&&target.name&&(target.checked=target.name.startsWith("__tabbed_"))</script>
</div>
</main>
<footer class="md-footer">
<div class="md-footer-meta md-typeset">
<div class="md-footer-meta__inner md-grid">
<div class="md-copyright">
Made with
<a href="https://squidfunk.github.io/mkdocs-material/" target="_blank" rel="noopener">
Material for MkDocs
</a>
</div>
</div>
</div>
</footer>
</div>
<div class="md-dialog" data-md-component="dialog">
<div class="md-dialog__inner md-typeset"></div>
</div>
<script id="__config" type="application/json">{"base": "../..", "features": ["navigation.sections", "navigation.instant", "content.code.copy"], "search": "../../assets/javascripts/workers/search.b8dbb3d2.min.js", "translations": {"clipboard.copied": "Copied to clipboard", "clipboard.copy": "Copy to clipboard", "search.result.more.one": "1 more on this page", "search.result.more.other": "# more on this page", "search.result.none": "No matching documents", "search.result.one": "1 matching document", "search.result.other": "# matching documents", "search.result.placeholder": "Type to start searching", "search.result.term.missing": "Missing", "select.version": "Select version"}}</script>
<script src="../../assets/javascripts/bundle.3220b9d7.min.js"></script>
</body>
</html>