FTS path: score = bm25(docs_chunks_fts), ORDER BY score ASC; LIKE fallback: score null; test asserts score key present Made-with: Cursor
276 lines
9.6 KiB
Python
276 lines
9.6 KiB
Python
"""
|
||
Docs index store — SQLite tables docs_files, docs_chunks, docs_chunks_fts (FTS5).
|
||
Read-only API: search, preview, raw. Index build in docs_index.py.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from . import db as _db
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Max snippet length per fragment
|
||
_SNIPPET_LEN = 120
|
||
_CHUNK_SIZE = 3500 # ~2–4KB target
|
||
|
||
|
||
def _doc_type_from_path(path: str) -> str:
|
||
p = path.replace("\\", "/")
|
||
if "/runbook/" in p or p.startswith("runbook/"):
|
||
return "runbook"
|
||
if "/release/" in p or p.startswith("release/"):
|
||
return "release"
|
||
return "spec" if "/docs/" in p or p.startswith("docs/") else "misc"
|
||
|
||
|
||
def _extract_title(content: str, path: str) -> str:
|
||
"""First # heading or filename."""
|
||
for line in content.splitlines():
|
||
line = line.strip()
|
||
if line.startswith("# "):
|
||
return line[2:].strip()[:200]
|
||
return Path(path).stem.replace("-", " ").replace("_", " ").title()[:200]
|
||
|
||
|
||
async def clear_docs_index() -> None:
|
||
"""Remove all docs_files, docs_chunks, FTS rows, and index meta."""
|
||
conn = await _db.get_db()
|
||
await conn.execute("DELETE FROM docs_chunks_fts")
|
||
await conn.execute("DELETE FROM docs_chunks")
|
||
await conn.execute("DELETE FROM docs_files")
|
||
await conn.execute("DELETE FROM docs_index_meta")
|
||
await conn.commit()
|
||
logger.info("Docs index cleared.")
|
||
|
||
|
||
async def set_docs_index_meta(docs_root: str, last_indexed_at: str, sha: str = "") -> None:
|
||
"""Write meta after rebuild. last_indexed_at: ISO or epoch string."""
|
||
conn = await _db.get_db()
|
||
for key, value in [("docs_root", docs_root), ("last_indexed_at", last_indexed_at), ("sha", sha)]:
|
||
await conn.execute(
|
||
"INSERT OR REPLACE INTO docs_index_meta(key, value) VALUES (?,?)",
|
||
(key, value),
|
||
)
|
||
await conn.commit()
|
||
|
||
|
||
async def get_docs_index_status() -> Dict[str, Any]:
|
||
"""Return indexed_files, indexed_chunks, last_indexed_at, docs_root, fts_available."""
|
||
conn = await _db.get_db()
|
||
files_row = None
|
||
chunks_row = None
|
||
async with conn.execute("SELECT COUNT(*) FROM docs_files") as cur:
|
||
files_row = await cur.fetchone()
|
||
async with conn.execute("SELECT COUNT(*) FROM docs_chunks") as cur:
|
||
chunks_row = await cur.fetchone()
|
||
indexed_files = int(files_row[0]) if files_row else 0
|
||
indexed_chunks = int(chunks_row[0]) if chunks_row else 0
|
||
|
||
meta = {}
|
||
async with conn.execute("SELECT key, value FROM docs_index_meta") as cur:
|
||
async for row in cur:
|
||
meta[row[0]] = row[1]
|
||
last_indexed_at = meta.get("last_indexed_at") or None
|
||
docs_root = meta.get("docs_root") or ""
|
||
|
||
fts_available = False
|
||
if indexed_chunks > 0:
|
||
try:
|
||
async with conn.execute("SELECT 1 FROM docs_chunks_fts LIMIT 1") as cur:
|
||
fts_available = (await cur.fetchone()) is not None
|
||
except Exception:
|
||
pass
|
||
|
||
return {
|
||
"docs_root": docs_root,
|
||
"indexed_files": indexed_files,
|
||
"indexed_chunks": indexed_chunks,
|
||
"last_indexed_at": last_indexed_at,
|
||
"fts_available": fts_available,
|
||
}
|
||
|
||
|
||
async def insert_docs_file(path: str, mtime: float, content: str) -> None:
|
||
"""Register one file and its chunks. Caller ensures path is normalized."""
|
||
conn = await _db.get_db()
|
||
sha = hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]
|
||
title = _extract_title(content, path)
|
||
doc_type = _doc_type_from_path(path)
|
||
await conn.execute(
|
||
"INSERT OR REPLACE INTO docs_files(path, mtime, sha, title, doc_type) VALUES (?,?,?,?,?)",
|
||
(path, mtime, sha, title, doc_type),
|
||
)
|
||
await conn.execute("DELETE FROM docs_chunks WHERE path = ?", (path,))
|
||
await conn.execute("DELETE FROM docs_chunks_fts WHERE path = ?", (path,))
|
||
await conn.commit()
|
||
|
||
chunks = _chunk_content(content, path)
|
||
for i, (heading, text) in enumerate(chunks):
|
||
chunk_id = f"{path}:{i}"
|
||
await conn.execute(
|
||
"INSERT INTO docs_chunks(id, path, heading, chunk_index, content) VALUES (?,?,?,?,?)",
|
||
(chunk_id, path, heading, i, text),
|
||
)
|
||
await conn.execute(
|
||
"INSERT INTO docs_chunks_fts(chunk_id, path, heading, content) VALUES (?,?,?,?)",
|
||
(chunk_id, path, heading, text),
|
||
)
|
||
await conn.commit()
|
||
|
||
|
||
def _chunk_content(content: str, path: str) -> List[tuple]:
|
||
"""Split by headers, then by size. Returns [(heading, text), ...]."""
|
||
sections: List[tuple] = []
|
||
current_heading = ""
|
||
current_lines: List[str] = []
|
||
current_size = 0
|
||
|
||
for line in content.splitlines():
|
||
stripped = line.strip()
|
||
if stripped.startswith("# "):
|
||
if current_lines:
|
||
sections.append((current_heading, "\n".join(current_lines)))
|
||
current_heading = stripped[2:].strip()[:300]
|
||
current_lines = [line]
|
||
current_size = len(line) + 1
|
||
elif stripped.startswith("## "):
|
||
if current_lines:
|
||
sections.append((current_heading, "\n".join(current_lines)))
|
||
current_heading = stripped[3:].strip()[:300]
|
||
current_lines = [line]
|
||
current_size = len(line) + 1
|
||
elif stripped.startswith("### "):
|
||
if current_lines:
|
||
sections.append((current_heading, "\n".join(current_lines)))
|
||
current_heading = stripped[4:].strip()[:300]
|
||
current_lines = [line]
|
||
current_size = len(line) + 1
|
||
else:
|
||
current_lines.append(line)
|
||
current_size += len(line) + 1
|
||
if current_size >= _CHUNK_SIZE:
|
||
sections.append((current_heading, "\n".join(current_lines)))
|
||
current_lines = []
|
||
current_size = 0
|
||
if current_lines:
|
||
sections.append((current_heading, "\n".join(current_lines)))
|
||
return sections
|
||
|
||
|
||
async def search_docs(
|
||
q: str,
|
||
doc_type: Optional[str] = None,
|
||
limit: int = 10,
|
||
) -> List[Dict[str, Any]]:
|
||
"""FTS search. Returns [{path, title, snippet, score}]. Uses FTS5 if available."""
|
||
conn = await _db.get_db()
|
||
# Sanitize FTS5 query: wrap in quotes for phrase or use simple terms
|
||
q_clean = q.strip().replace("\"", " ")[:200]
|
||
if not q_clean:
|
||
return []
|
||
|
||
# Filter by doc_type via join with docs_files
|
||
type_filter = "AND f.doc_type = ?" if doc_type else ""
|
||
params: List[Any] = [q_clean]
|
||
if doc_type:
|
||
params.append(doc_type)
|
||
params.append(limit)
|
||
|
||
try:
|
||
# FTS5: bm25 (lower = better), snippet; ORDER BY bm25 ASC for best-first
|
||
async with conn.execute(
|
||
f"""
|
||
SELECT f.path, f.title, snippet(docs_chunks_fts, 3, '**', '**', '...', {_SNIPPET_LEN//5}) AS snippet,
|
||
bm25(docs_chunks_fts) AS score
|
||
FROM docs_chunks_fts AS fts
|
||
JOIN docs_files f ON f.path = fts.path
|
||
WHERE docs_chunks_fts MATCH ? {type_filter}
|
||
ORDER BY bm25(docs_chunks_fts) ASC
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
) as cur:
|
||
rows = await cur.fetchall()
|
||
except Exception as e:
|
||
logger.warning("FTS search failed, fallback to LIKE: %s", e)
|
||
return await _search_docs_like(q_clean, doc_type, limit)
|
||
|
||
result = []
|
||
for r in (rows or []):
|
||
score_val = float(r[3]) if r[3] is not None and len(r) > 3 else 0.0
|
||
result.append({
|
||
"path": r[0],
|
||
"title": r[1] or "",
|
||
"snippet": (r[2] or "").strip(),
|
||
"score": score_val,
|
||
})
|
||
if not result:
|
||
return await _search_docs_like(q_clean, doc_type, limit)
|
||
return result
|
||
|
||
|
||
async def _search_docs_like(
|
||
q: str,
|
||
doc_type: Optional[str],
|
||
limit: int,
|
||
) -> List[Dict[str, Any]]:
|
||
"""Fallback when FTS5 unavailable: LIKE on content."""
|
||
conn = await _db.get_db()
|
||
like = f"%{q}%"
|
||
params: List[Any] = [like]
|
||
if doc_type:
|
||
params.append(doc_type)
|
||
params.append(limit)
|
||
type_sql = "AND f.doc_type = ?" if doc_type else ""
|
||
async with conn.execute(
|
||
f"""
|
||
SELECT DISTINCT f.path, f.title, substr(c.content, 1, {_SNIPPET_LEN}) AS snippet
|
||
FROM docs_chunks c
|
||
JOIN docs_files f ON f.path = c.path
|
||
WHERE c.content LIKE ? {type_sql}
|
||
LIMIT ?
|
||
""",
|
||
params,
|
||
) as cur:
|
||
rows = await cur.fetchall()
|
||
return [
|
||
{"path": r[0], "title": r[1] or "", "snippet": (r[2] or "").strip(), "score": None}
|
||
for r in (rows or [])
|
||
]
|
||
|
||
|
||
async def get_preview(path: str) -> Optional[Dict[str, Any]]:
|
||
"""Return path, title, sections (heading + short excerpt)."""
|
||
conn = await _db.get_db()
|
||
async with conn.execute("SELECT path, title FROM docs_files WHERE path = ?", (path,)) as cur:
|
||
row = await cur.fetchone()
|
||
if not row:
|
||
return None
|
||
async with conn.execute(
|
||
"SELECT heading, content FROM docs_chunks WHERE path = ? ORDER BY chunk_index",
|
||
(path,),
|
||
) as cur:
|
||
rows = await cur.fetchall()
|
||
sections = [
|
||
{"heading": r[0] or "", "excerpt": (r[1] or "")[:400].strip()}
|
||
for r in (rows or [])
|
||
]
|
||
return {"path": row[0], "title": row[1] or "", "sections": sections}
|
||
|
||
|
||
async def get_raw(path: str) -> Optional[str]:
|
||
"""Return full content of first chunk or concatenated chunks (best-effort)."""
|
||
conn = await _db.get_db()
|
||
async with conn.execute(
|
||
"SELECT content FROM docs_chunks WHERE path = ? ORDER BY chunk_index",
|
||
(path,),
|
||
) as cur:
|
||
rows = await cur.fetchall()
|
||
if not rows:
|
||
return None
|
||
return "\n\n".join(r[0] or "" for r in rows)
|