## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
214 lines
7.7 KiB
Python
214 lines
7.7 KiB
Python
"""
|
|
Document Store for RAG Service
|
|
Uses PostgreSQL + pgvector via Haystack
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
import psycopg2
|
|
|
|
try:
|
|
from haystack.document_stores import PGVectorDocumentStore # type: ignore
|
|
from haystack.schema import Document as HaystackDocument # type: ignore
|
|
except Exception:
|
|
PGVectorDocumentStore = None # type: ignore
|
|
HaystackDocument = None # type: ignore
|
|
|
|
|
|
@dataclass
|
|
class Document:
|
|
content: str
|
|
meta: Dict[str, Any]
|
|
embedding: Optional[List[float]] = None
|
|
id: Optional[str] = None
|
|
|
|
|
|
def _make_document(content: str, meta: Dict[str, Any], embedding: Optional[List[float]] = None, doc_id: Optional[str] = None):
|
|
if HaystackDocument:
|
|
return HaystackDocument(content=content, meta=meta, embedding=embedding, id=doc_id)
|
|
return Document(content=content, meta=meta, embedding=embedding, id=doc_id)
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global document store instance
|
|
_document_store: Optional[PGVectorDocumentStore] = None
|
|
|
|
|
|
def get_document_store():
|
|
"""
|
|
Get or create PGVectorDocumentStore instance
|
|
|
|
Returns:
|
|
PGVectorDocumentStore configured with pgvector
|
|
"""
|
|
global _document_store
|
|
|
|
if _document_store is not None:
|
|
return _document_store
|
|
|
|
logger.info(f"Initializing PGVectorDocumentStore: table={settings.RAG_TABLE_NAME}")
|
|
logger.info(f"Connection: {settings.PG_DSN.split('@')[1] if '@' in settings.PG_DSN else 'hidden'}")
|
|
|
|
try:
|
|
if PGVectorDocumentStore:
|
|
_document_store = PGVectorDocumentStore(
|
|
connection_string=settings.PG_DSN,
|
|
embedding_dim=settings.EMBED_DIM,
|
|
table_name=settings.RAG_TABLE_NAME,
|
|
search_strategy=settings.SEARCH_STRATEGY,
|
|
recreate_table=False,
|
|
similarity="cosine",
|
|
)
|
|
logger.info("PGVectorDocumentStore initialized successfully")
|
|
return _document_store
|
|
|
|
_document_store = SimplePGVectorStore(
|
|
dsn=settings.PG_DSN,
|
|
table_name=settings.RAG_TABLE_NAME,
|
|
embedding_dim=settings.EMBED_DIM,
|
|
)
|
|
logger.info("SimplePGVectorStore initialized successfully")
|
|
return _document_store
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize DocumentStore: {e}", exc_info=True)
|
|
raise RuntimeError(f"DocumentStore initialization failed: {e}") from e
|
|
|
|
|
|
class SimplePGVectorStore:
|
|
def __init__(self, dsn: str, table_name: str, embedding_dim: int) -> None:
|
|
self.dsn = dsn.replace("postgresql+psycopg2", "postgresql")
|
|
self.table_name = table_name
|
|
self.embedding_dim = embedding_dim
|
|
self._ensure_table()
|
|
|
|
def _connect(self):
|
|
return psycopg2.connect(self.dsn)
|
|
|
|
def _ensure_table(self) -> None:
|
|
with self._connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""
|
|
create table if not exists {self.table_name} (
|
|
id text primary key,
|
|
content text,
|
|
embedding vector({self.embedding_dim}),
|
|
meta jsonb
|
|
);
|
|
"""
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists {self.table_name}_meta_gin on {self.table_name} using gin (meta);"
|
|
)
|
|
cur.execute(
|
|
f"create index if not exists {self.table_name}_embedding_idx on {self.table_name} using ivfflat (embedding vector_cosine_ops);"
|
|
)
|
|
conn.commit()
|
|
|
|
def _vec(self, embedding: List[float]) -> str:
|
|
return "[" + ",".join([str(x) for x in embedding]) + "]"
|
|
|
|
def write_documents(self, documents: List[Any]) -> None:
|
|
with self._connect() as conn:
|
|
with conn.cursor() as cur:
|
|
for doc in documents:
|
|
doc_id = getattr(doc, "id", None) or str(uuid.uuid4())
|
|
meta = getattr(doc, "meta", None) or {}
|
|
embedding = getattr(doc, "embedding", None)
|
|
cur.execute(
|
|
f"""
|
|
insert into {self.table_name} (id, content, embedding, meta)
|
|
values (%s, %s, %s, %s)
|
|
on conflict (id) do update set
|
|
content = excluded.content,
|
|
embedding = excluded.embedding,
|
|
meta = excluded.meta
|
|
""",
|
|
(doc_id, doc.content, self._vec(embedding), json.dumps(meta)),
|
|
)
|
|
conn.commit()
|
|
|
|
def delete_documents(self, filters: Optional[Dict[str, Any]] = None) -> None:
|
|
if not filters:
|
|
return
|
|
fingerprint = None
|
|
if "index_fingerprint" in filters:
|
|
value = filters["index_fingerprint"]
|
|
if isinstance(value, dict):
|
|
fingerprint = value.get("$eq")
|
|
else:
|
|
fingerprint = value
|
|
if not fingerprint:
|
|
return
|
|
with self._connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"delete from {self.table_name} where meta->>'index_fingerprint' = %s",
|
|
(fingerprint,),
|
|
)
|
|
conn.commit()
|
|
|
|
def search(self, query_embedding: List[float], top_k: int = 5, filters: Optional[Dict[str, Any]] = None, return_embedding: bool = False):
|
|
where_clause, params = self._build_where(filters)
|
|
params.append(self._vec(query_embedding))
|
|
params.append(top_k)
|
|
with self._connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""
|
|
select content, meta
|
|
from {self.table_name}
|
|
{where_clause}
|
|
order by embedding <=> %s
|
|
limit %s
|
|
""",
|
|
params,
|
|
)
|
|
rows = cur.fetchall()
|
|
return [_make_document(content=r[0], meta=r[1]) for r in rows]
|
|
|
|
def filter_documents(self, filters: Optional[Dict[str, Any]] = None, top_k: int = 5, return_embedding: bool = False):
|
|
where_clause, params = self._build_where(filters)
|
|
params.append(top_k)
|
|
with self._connect() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""
|
|
select content, meta
|
|
from {self.table_name}
|
|
{where_clause}
|
|
limit %s
|
|
""",
|
|
params,
|
|
)
|
|
rows = cur.fetchall()
|
|
return [_make_document(content=r[0], meta=r[1]) for r in rows]
|
|
|
|
def _build_where(self, filters: Optional[Dict[str, Any]]) -> tuple[str, List[Any]]:
|
|
where_parts: List[str] = []
|
|
params: List[Any] = []
|
|
if filters:
|
|
for key in ["dao_id", "artifact_id", "brand_id", "project_id", "acl_ref"]:
|
|
if key in filters and filters[key] is not None:
|
|
value = filters[key]
|
|
if isinstance(value, list):
|
|
value = value[0] if value else None
|
|
if value is not None:
|
|
where_parts.append(f"meta->>'{key}' = %s")
|
|
params.append(value)
|
|
where_clause = f"where {' and '.join(where_parts)}" if where_parts else ""
|
|
return where_clause, params
|
|
|
|
|
|
def reset_document_store():
|
|
"""Reset global document store instance (for testing)"""
|
|
global _document_store
|
|
_document_store = None
|
|
|