Files
microdao-daarion/services/rag-service/app/document_store.py
Apple 0c8bef82f4 feat: Add Alateya, Clan, Eonarch agents + fix gateway-router connection
## Agents Added
- Alateya: R&D, biotech, innovations
- Clan (Spirit): Community spirit agent
- Eonarch: Consciousness evolution agent

## Changes
- docker-compose.node1.yml: Added tokens for all 3 new agents
- gateway-bot/http_api.py: Added configs and webhook endpoints
- gateway-bot/clan_prompt.txt: New prompt file
- gateway-bot/eonarch_prompt.txt: New prompt file

## Fixes
- Fixed ROUTER_URL from :9102 to :8000 (internal container port)
- All 9 Telegram agents now working

## Documentation
- Created PROJECT-MASTER-INDEX.md - single entry point
- Added various status documents and scripts

Tokens configured:
- Helion, NUTRA, Agromatrix (existing)
- Alateya, Clan, Eonarch (new)
- Druid, GreenFood, DAARWIZZ (configured)
2026-01-28 06:40:34 -08:00

214 lines
7.7 KiB
Python

"""
Document Store for RAG Service
Uses PostgreSQL + pgvector via Haystack
"""
import logging
import json
import uuid
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
import psycopg2
try:
from haystack.document_stores import PGVectorDocumentStore # type: ignore
from haystack.schema import Document as HaystackDocument # type: ignore
except Exception:
PGVectorDocumentStore = None # type: ignore
HaystackDocument = None # type: ignore
@dataclass
class Document:
content: str
meta: Dict[str, Any]
embedding: Optional[List[float]] = None
id: Optional[str] = None
def _make_document(content: str, meta: Dict[str, Any], embedding: Optional[List[float]] = None, doc_id: Optional[str] = None):
if HaystackDocument:
return HaystackDocument(content=content, meta=meta, embedding=embedding, id=doc_id)
return Document(content=content, meta=meta, embedding=embedding, id=doc_id)
from app.core.config import settings
logger = logging.getLogger(__name__)
# Global document store instance
_document_store: Optional[PGVectorDocumentStore] = None
def get_document_store():
"""
Get or create PGVectorDocumentStore instance
Returns:
PGVectorDocumentStore configured with pgvector
"""
global _document_store
if _document_store is not None:
return _document_store
logger.info(f"Initializing PGVectorDocumentStore: table={settings.RAG_TABLE_NAME}")
logger.info(f"Connection: {settings.PG_DSN.split('@')[1] if '@' in settings.PG_DSN else 'hidden'}")
try:
if PGVectorDocumentStore:
_document_store = PGVectorDocumentStore(
connection_string=settings.PG_DSN,
embedding_dim=settings.EMBED_DIM,
table_name=settings.RAG_TABLE_NAME,
search_strategy=settings.SEARCH_STRATEGY,
recreate_table=False,
similarity="cosine",
)
logger.info("PGVectorDocumentStore initialized successfully")
return _document_store
_document_store = SimplePGVectorStore(
dsn=settings.PG_DSN,
table_name=settings.RAG_TABLE_NAME,
embedding_dim=settings.EMBED_DIM,
)
logger.info("SimplePGVectorStore initialized successfully")
return _document_store
except Exception as e:
logger.error(f"Failed to initialize DocumentStore: {e}", exc_info=True)
raise RuntimeError(f"DocumentStore initialization failed: {e}") from e
class SimplePGVectorStore:
def __init__(self, dsn: str, table_name: str, embedding_dim: int) -> None:
self.dsn = dsn.replace("postgresql+psycopg2", "postgresql")
self.table_name = table_name
self.embedding_dim = embedding_dim
self._ensure_table()
def _connect(self):
return psycopg2.connect(self.dsn)
def _ensure_table(self) -> None:
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
create table if not exists {self.table_name} (
id text primary key,
content text,
embedding vector({self.embedding_dim}),
meta jsonb
);
"""
)
cur.execute(
f"create index if not exists {self.table_name}_meta_gin on {self.table_name} using gin (meta);"
)
cur.execute(
f"create index if not exists {self.table_name}_embedding_idx on {self.table_name} using ivfflat (embedding vector_cosine_ops);"
)
conn.commit()
def _vec(self, embedding: List[float]) -> str:
return "[" + ",".join([str(x) for x in embedding]) + "]"
def write_documents(self, documents: List[Any]) -> None:
with self._connect() as conn:
with conn.cursor() as cur:
for doc in documents:
doc_id = getattr(doc, "id", None) or str(uuid.uuid4())
meta = getattr(doc, "meta", None) or {}
embedding = getattr(doc, "embedding", None)
cur.execute(
f"""
insert into {self.table_name} (id, content, embedding, meta)
values (%s, %s, %s, %s)
on conflict (id) do update set
content = excluded.content,
embedding = excluded.embedding,
meta = excluded.meta
""",
(doc_id, doc.content, self._vec(embedding), json.dumps(meta)),
)
conn.commit()
def delete_documents(self, filters: Optional[Dict[str, Any]] = None) -> None:
if not filters:
return
fingerprint = None
if "index_fingerprint" in filters:
value = filters["index_fingerprint"]
if isinstance(value, dict):
fingerprint = value.get("$eq")
else:
fingerprint = value
if not fingerprint:
return
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(
f"delete from {self.table_name} where meta->>'index_fingerprint' = %s",
(fingerprint,),
)
conn.commit()
def search(self, query_embedding: List[float], top_k: int = 5, filters: Optional[Dict[str, Any]] = None, return_embedding: bool = False):
where_clause, params = self._build_where(filters)
params.append(self._vec(query_embedding))
params.append(top_k)
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
select content, meta
from {self.table_name}
{where_clause}
order by embedding <=> %s
limit %s
""",
params,
)
rows = cur.fetchall()
return [_make_document(content=r[0], meta=r[1]) for r in rows]
def filter_documents(self, filters: Optional[Dict[str, Any]] = None, top_k: int = 5, return_embedding: bool = False):
where_clause, params = self._build_where(filters)
params.append(top_k)
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
select content, meta
from {self.table_name}
{where_clause}
limit %s
""",
params,
)
rows = cur.fetchall()
return [_make_document(content=r[0], meta=r[1]) for r in rows]
def _build_where(self, filters: Optional[Dict[str, Any]]) -> tuple[str, List[Any]]:
where_parts: List[str] = []
params: List[Any] = []
if filters:
for key in ["dao_id", "artifact_id", "brand_id", "project_id", "acl_ref"]:
if key in filters and filters[key] is not None:
value = filters[key]
if isinstance(value, list):
value = value[0] if value else None
if value is not None:
where_parts.append(f"meta->>'{key}' = %s")
params.append(value)
where_clause = f"where {' and '.join(where_parts)}" if where_parts else ""
return where_clause, params
def reset_document_store():
"""Reset global document store instance (for testing)"""
global _document_store
_document_store = None