""" Docs index store — SQLite tables docs_files, docs_chunks, docs_chunks_fts (FTS5). Read-only API: search, preview, raw. Index build in docs_index.py. """ from __future__ import annotations import hashlib import logging from pathlib import Path from typing import Any, Dict, List, Optional from . import db as _db logger = logging.getLogger(__name__) # Max snippet length per fragment _SNIPPET_LEN = 120 _CHUNK_SIZE = 3500 # ~2–4KB target def _doc_type_from_path(path: str) -> str: p = path.replace("\\", "/") if "/runbook/" in p or p.startswith("runbook/"): return "runbook" if "/release/" in p or p.startswith("release/"): return "release" return "spec" if "/docs/" in p or p.startswith("docs/") else "misc" def _extract_title(content: str, path: str) -> str: """First # heading or filename.""" for line in content.splitlines(): line = line.strip() if line.startswith("# "): return line[2:].strip()[:200] return Path(path).stem.replace("-", " ").replace("_", " ").title()[:200] async def clear_docs_index() -> None: """Remove all docs_files, docs_chunks, and FTS rows.""" conn = await _db.get_db() await conn.execute("DELETE FROM docs_chunks_fts") await conn.execute("DELETE FROM docs_chunks") await conn.execute("DELETE FROM docs_files") await conn.commit() logger.info("Docs index cleared.") async def insert_docs_file(path: str, mtime: float, content: str) -> None: """Register one file and its chunks. Caller ensures path is normalized.""" conn = await _db.get_db() sha = hashlib.sha256(content.encode("utf-8")).hexdigest()[:16] title = _extract_title(content, path) doc_type = _doc_type_from_path(path) await conn.execute( "INSERT OR REPLACE INTO docs_files(path, mtime, sha, title, doc_type) VALUES (?,?,?,?,?)", (path, mtime, sha, title, doc_type), ) await conn.execute("DELETE FROM docs_chunks WHERE path = ?", (path,)) await conn.execute("DELETE FROM docs_chunks_fts WHERE path = ?", (path,)) await conn.commit() chunks = _chunk_content(content, path) for i, (heading, text) in enumerate(chunks): chunk_id = f"{path}:{i}" await conn.execute( "INSERT INTO docs_chunks(id, path, heading, chunk_index, content) VALUES (?,?,?,?,?)", (chunk_id, path, heading, i, text), ) await conn.execute( "INSERT INTO docs_chunks_fts(chunk_id, path, heading, content) VALUES (?,?,?,?)", (chunk_id, path, heading, text), ) await conn.commit() def _chunk_content(content: str, path: str) -> List[tuple]: """Split by headers, then by size. Returns [(heading, text), ...].""" sections: List[tuple] = [] current_heading = "" current_lines: List[str] = [] current_size = 0 for line in content.splitlines(): stripped = line.strip() if stripped.startswith("# "): if current_lines: sections.append((current_heading, "\n".join(current_lines))) current_heading = stripped[2:].strip()[:300] current_lines = [line] current_size = len(line) + 1 elif stripped.startswith("## "): if current_lines: sections.append((current_heading, "\n".join(current_lines))) current_heading = stripped[3:].strip()[:300] current_lines = [line] current_size = len(line) + 1 elif stripped.startswith("### "): if current_lines: sections.append((current_heading, "\n".join(current_lines))) current_heading = stripped[4:].strip()[:300] current_lines = [line] current_size = len(line) + 1 else: current_lines.append(line) current_size += len(line) + 1 if current_size >= _CHUNK_SIZE: sections.append((current_heading, "\n".join(current_lines))) current_lines = [] current_size = 0 if current_lines: sections.append((current_heading, "\n".join(current_lines))) return sections async def search_docs( q: str, doc_type: Optional[str] = None, limit: int = 10, ) -> List[Dict[str, Any]]: """FTS search. Returns [{path, title, snippet, score}]. Uses FTS5 if available.""" conn = await _db.get_db() # Sanitize FTS5 query: wrap in quotes for phrase or use simple terms q_clean = q.strip().replace("\"", " ")[:200] if not q_clean: return [] # Filter by doc_type via join with docs_files type_filter = "AND f.doc_type = ?" if doc_type else "" params: List[Any] = [q_clean] if doc_type: params.append(doc_type) params.append(limit) try: # FTS5: snippet(build, col_idx, left, right, ellipsis, max_tokens) # columns: 0=chunk_id, 1=path, 2=heading, 3=content → snippet column 3 async with conn.execute( f""" SELECT f.path, f.title, snippet(docs_chunks_fts, 3, '**', '**', '...', {_SNIPPET_LEN//5}) AS snippet FROM docs_chunks_fts AS fts JOIN docs_files f ON f.path = fts.path WHERE docs_chunks_fts MATCH ? {type_filter} LIMIT ? """, params, ) as cur: rows = await cur.fetchall() except Exception as e: logger.warning("FTS search failed, fallback to LIKE: %s", e) return await _search_docs_like(q_clean, doc_type, limit) result = [ {"path": r[0], "title": r[1] or "", "snippet": (r[2] or "").strip(), "score": 1.0} for r in (rows or []) ] if not result: return await _search_docs_like(q_clean, doc_type, limit) return result async def _search_docs_like( q: str, doc_type: Optional[str], limit: int, ) -> List[Dict[str, Any]]: """Fallback when FTS5 unavailable: LIKE on content.""" conn = await _db.get_db() like = f"%{q}%" params: List[Any] = [like] if doc_type: params.append(doc_type) params.append(limit) type_sql = "AND f.doc_type = ?" if doc_type else "" async with conn.execute( f""" SELECT DISTINCT f.path, f.title, substr(c.content, 1, {_SNIPPET_LEN}) AS snippet FROM docs_chunks c JOIN docs_files f ON f.path = c.path WHERE c.content LIKE ? {type_sql} LIMIT ? """, params, ) as cur: rows = await cur.fetchall() return [ {"path": r[0], "title": r[1] or "", "snippet": (r[2] or "").strip(), "score": 0.5} for r in (rows or []) ] async def get_preview(path: str) -> Optional[Dict[str, Any]]: """Return path, title, sections (heading + short excerpt).""" conn = await _db.get_db() async with conn.execute("SELECT path, title FROM docs_files WHERE path = ?", (path,)) as cur: row = await cur.fetchone() if not row: return None async with conn.execute( "SELECT heading, content FROM docs_chunks WHERE path = ? ORDER BY chunk_index", (path,), ) as cur: rows = await cur.fetchall() sections = [ {"heading": r[0] or "", "excerpt": (r[1] or "")[:400].strip()} for r in (rows or []) ] return {"path": row[0], "title": row[1] or "", "sections": sections} async def get_raw(path: str) -> Optional[str]: """Return full content of first chunk or concatenated chunks (best-effort).""" conn = await _db.get_db() async with conn.execute( "SELECT content FROM docs_chunks WHERE path = ? ORDER BY chunk_index", (path,), ) as cur: rows = await cur.fetchall() if not rows: return None return "\n\n".join(r[0] or "" for r in rows)