Files
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

758 lines
27 KiB
Python

"""
sofiia-console — Projects, Documents, Sessions, Dialog Map endpoints.
All endpoints are mounted on the main FastAPI app in main.py via:
app.include_router(docs_router)
Features:
- File upload with sha256, mime detection, size limits
- Projects CRUD
- Documents per project with keyword search
- Sessions with persistence (aiosqlite)
- Messages with branching (parent_msg_id)
- Dialog map (nodes + edges JSON)
- Session fork
"""
import hashlib
import io
import json
import logging
import mimetypes
import os
import re
import uuid
from pathlib import Path
from typing import List, Optional
import httpx
from fastapi import APIRouter, HTTPException, Query, Request, UploadFile, File
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from . import db as _db
logger = logging.getLogger(__name__)
docs_router = APIRouter(prefix="/api", tags=["projects-docs-sessions"])
# ── Config ────────────────────────────────────────────────────────────────────
_DATA_DIR = Path(os.getenv("SOFIIA_DATA_DIR", "/app/data"))
_UPLOADS_DIR = _DATA_DIR / "uploads"
_ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000")
_MAX_IMAGE_MB = int(os.getenv("UPLOAD_MAX_IMAGE_MB", "10"))
_MAX_VIDEO_MB = int(os.getenv("UPLOAD_MAX_VIDEO_MB", "200"))
_MAX_DOC_MB = int(os.getenv("UPLOAD_MAX_DOC_MB", "50"))
_USE_FABRIC_OCR = os.getenv("USE_FABRIC_OCR", "false").lower() == "true"
_USE_EMBEDDINGS = os.getenv("USE_EMBEDDINGS", "false").lower() == "true"
_ALLOWED_MIMES = {
# images
"image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp",
# video
"video/mp4", "video/mpeg", "video/webm", "video/quicktime",
# documents
"application/pdf",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"text/plain", "text/markdown", "text/csv",
"application/json",
"application/zip",
}
def _safe_filename(name: str) -> str:
"""Remove path traversal attempts and dangerous chars."""
name = os.path.basename(name)
name = re.sub(r"[^\w\-_.()]", "_", name)
return name[:128] or "upload"
def _size_limit_mb(mime: str) -> int:
if mime.startswith("image/"): return _MAX_IMAGE_MB
if mime.startswith("video/"): return _MAX_VIDEO_MB
return _MAX_DOC_MB
def _detect_mime(filename: str, data: bytes) -> str:
"""Detect MIME by magic bytes first, fall back to extension."""
try:
import magic
return magic.from_buffer(data[:2048], mime=True)
except Exception:
pass
guessed, _ = mimetypes.guess_type(filename)
return guessed or "application/octet-stream"
def _extract_text_simple(filename: str, data: bytes, mime: str) -> str:
"""Best-effort text extraction without external services."""
try:
if mime == "text/plain" or filename.endswith((".txt", ".md", ".markdown")):
return data.decode("utf-8", errors="replace")[:4096]
if mime == "application/json":
return data.decode("utf-8", errors="replace")[:4096]
if mime == "application/pdf":
try:
import pypdf
reader = pypdf.PdfReader(io.BytesIO(data))
text = "\n".join(p.extract_text() or "" for p in reader.pages[:10])
return text[:4096]
except Exception:
pass
if mime in (
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
):
try:
import docx
doc = docx.Document(io.BytesIO(data))
return "\n".join(p.text for p in doc.paragraphs)[:4096]
except Exception:
pass
except Exception as e:
logger.debug("extract_text_simple failed: %s", e)
return ""
# ── Projects ──────────────────────────────────────────────────────────────────
class ProjectCreate(BaseModel):
name: str
description: str = ""
class ProjectUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
@docs_router.get("/projects")
async def list_projects():
return await _db.list_projects()
@docs_router.post("/projects", status_code=201)
async def create_project(body: ProjectCreate):
if not body.name.strip():
raise HTTPException(status_code=400, detail="name is required")
result = await _db.create_project(body.name.strip(), body.description)
# Fire-and-forget: compute initial snapshot + signals so Portfolio is populated
import asyncio as _asyncio
async def _bootstrap_project(pid: str) -> None:
try:
await _db.compute_graph_snapshot(project_id=pid, window="7d")
except Exception:
pass
try:
await _db.recompute_graph_signals(project_id=pid, window="7d", dry_run=False)
except Exception:
pass
_asyncio.ensure_future(_bootstrap_project(result.get("project_id", "")))
return result
@docs_router.get("/projects/{project_id}")
async def get_project(project_id: str):
p = await _db.get_project(project_id)
if not p:
raise HTTPException(status_code=404, detail="Project not found")
return p
@docs_router.patch("/projects/{project_id}")
async def update_project(project_id: str, body: ProjectUpdate):
ok = await _db.update_project(project_id, name=body.name, description=body.description)
if not ok:
raise HTTPException(status_code=404, detail="Project not found or no changes")
return {"ok": True}
# ── File Upload ───────────────────────────────────────────────────────────────
@docs_router.post("/files/upload")
async def upload_file(
request: Request,
project_id: str = Query("default"),
title: str = Query(""),
tags: str = Query(""), # comma-separated
file: UploadFile = File(...),
):
"""Upload a file, extract text, store metadata.
Returns: {file_id, doc_id, sha256, mime, size_bytes, filename, preview_text}
"""
raw_name = _safe_filename(file.filename or "upload")
data = await file.read()
# Detect real mime from bytes
mime = _detect_mime(raw_name, data)
# Validate mime
if mime not in _ALLOWED_MIMES:
raise HTTPException(status_code=415, detail=f"Unsupported file type: {mime}")
# Size limits
size_mb = len(data) / (1024 * 1024)
limit_mb = _size_limit_mb(mime)
if size_mb > limit_mb:
raise HTTPException(
status_code=413,
detail=f"File too large: {size_mb:.1f}MB > {limit_mb}MB limit for {mime}",
)
# SHA-256 (content-addressed storage)
sha = hashlib.sha256(data).hexdigest()
# Store file (content-addressed)
_UPLOADS_DIR.mkdir(parents=True, exist_ok=True)
shard = sha[:2]
dest = _UPLOADS_DIR / shard / f"{sha}_{raw_name}"
dest.parent.mkdir(parents=True, exist_ok=True)
if not dest.exists():
dest.write_bytes(data)
file_id = sha[:16] # short reference
# Extract text
extracted = _extract_text_simple(raw_name, data, mime)
# Fabric OCR for images (feature flag)
if _USE_FABRIC_OCR and mime.startswith("image/") and not extracted:
try:
import base64 as _b64
router_url = os.getenv("ROUTER_URL", "http://router:8000")
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.post(
f"{router_url}/v1/capability/ocr",
json={"image_b64": _b64.b64encode(data).decode(), "filename": raw_name},
)
if r.status_code == 200:
extracted = r.json().get("text", "")[:4096]
except Exception as e:
logger.debug("Fabric OCR failed (skipping): %s", e)
# Parse tags
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
# Ensure project exists
if not await _db.get_project(project_id):
project_id = "default"
# Save to DB
doc = await _db.create_document(
project_id=project_id,
file_id=file_id,
sha256=sha,
mime=mime,
size_bytes=len(data),
filename=raw_name,
title=title or raw_name,
tags=tag_list,
extracted_text=extracted,
)
# Async ingest to Qdrant via Router (best-effort, non-blocking)
if _USE_EMBEDDINGS and extracted:
try:
router_url = os.getenv("ROUTER_URL", "http://router:8000")
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(f"{router_url}/v1/documents/ingest", json={
"agent_id": "sofiia",
"text": extracted,
"doc_id": doc["doc_id"],
"project_id": project_id,
"filename": raw_name,
"mime": mime,
"tags": tag_list,
})
except Exception as e:
logger.debug("Doc ingest (best-effort) failed: %s", e)
return {
**doc,
"preview_text": extracted[:300],
"storage_path": str(dest.relative_to(_DATA_DIR)),
}
# ── Documents ─────────────────────────────────────────────────────────────────
@docs_router.get("/projects/{project_id}/documents")
async def list_documents(project_id: str, limit: int = Query(50, ge=1, le=200)):
return await _db.list_documents(project_id, limit=limit)
@docs_router.get("/projects/{project_id}/documents/{doc_id}")
async def get_document(project_id: str, doc_id: str):
doc = await _db.get_document(doc_id)
if not doc or doc["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Document not found")
return doc
@docs_router.post("/projects/{project_id}/search")
async def search_project(project_id: str, request: Request):
body = await request.json()
query = body.get("query", "").strip()
if not query:
raise HTTPException(status_code=400, detail="query is required")
docs = await _db.search_documents(project_id, query, limit=body.get("limit", 20))
sessions = [] # Phase 2: semantic session search
return {"query": query, "documents": docs, "sessions": sessions}
@docs_router.get("/files/{file_id}/download")
async def download_file(file_id: str):
"""Download a file by its file_id (first 16 chars of sha256)."""
matches = list(_UPLOADS_DIR.rglob(f"{file_id}_*"))
if not matches:
raise HTTPException(status_code=404, detail="File not found")
path = matches[0]
return FileResponse(str(path), filename=path.name)
# ── Sessions ──────────────────────────────────────────────────────────────────
@docs_router.get("/sessions")
async def list_sessions(
project_id: str = Query("default"),
limit: int = Query(30, ge=1, le=100),
):
return await _db.list_sessions(project_id, limit=limit)
@docs_router.get("/sessions/{session_id}")
async def get_session(session_id: str):
s = await _db.get_session(session_id)
if not s:
raise HTTPException(status_code=404, detail="Session not found")
return s
@docs_router.patch("/sessions/{session_id}/title")
async def update_session_title(session_id: str, request: Request):
body = await request.json()
title = body.get("title", "").strip()
await _db.update_session_title(session_id, title)
return {"ok": True}
# ── Chat History ──────────────────────────────────────────────────────────────
@docs_router.get("/chat/history")
async def get_chat_history(
session_id: str = Query(...),
limit: int = Query(50, ge=1, le=200),
branch_label: Optional[str] = Query(None),
):
"""Load persisted message history for a session (for UI restore on page reload)."""
msgs = await _db.list_messages(session_id, limit=limit, branch_label=branch_label)
return {"session_id": session_id, "messages": msgs, "count": len(msgs)}
# ── Dialog Map ────────────────────────────────────────────────────────────────
@docs_router.get("/sessions/{session_id}/map")
async def get_dialog_map(session_id: str):
"""Return nodes and edges for dialog map visualization."""
return await _db.get_dialog_map(session_id)
class ForkRequest(BaseModel):
from_msg_id: str
new_title: str = ""
project_id: str = "default"
@docs_router.post("/sessions/{session_id}/fork")
async def fork_session(session_id: str, body: ForkRequest):
"""Fork a session from a specific message (creates new session with ancestor messages)."""
result = await _db.fork_session(
source_session_id=session_id,
from_msg_id=body.from_msg_id,
new_title=body.new_title,
project_id=body.project_id,
)
return result
# ── Delete endpoints ───────────────────────────────────────────────────────────
@docs_router.delete("/projects/{project_id}")
async def delete_project(project_id: str):
if project_id == "default":
raise HTTPException(status_code=400, detail="Cannot delete default project")
db = await _db.get_db()
await db.execute("DELETE FROM projects WHERE project_id=?", (project_id,))
await db.commit()
return {"ok": True}
@docs_router.delete("/projects/{project_id}/documents/{doc_id}")
async def delete_document(project_id: str, doc_id: str):
doc = await _db.get_document(doc_id)
if not doc or doc["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Document not found")
db = await _db.get_db()
await db.execute("DELETE FROM documents WHERE doc_id=?", (doc_id,))
await db.commit()
return {"ok": True}
# ── Tasks (Kanban) ─────────────────────────────────────────────────────────────
class TaskCreate(BaseModel):
title: str
description: str = ""
status: str = "backlog"
priority: str = "normal"
labels: List[str] = []
assignees: List[str] = []
due_at: Optional[str] = None
created_by: str = ""
class TaskUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
priority: Optional[str] = None
labels: Optional[List[str]] = None
assignees: Optional[List[str]] = None
due_at: Optional[str] = None
sort_key: Optional[float] = None
@docs_router.get("/projects/{project_id}/tasks")
async def list_tasks(
project_id: str,
status: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=500),
):
"""List tasks for a project, optionally filtered by status."""
return await _db.list_tasks(project_id, status=status, limit=limit)
@docs_router.post("/projects/{project_id}/tasks", status_code=201)
async def create_task(project_id: str, body: TaskCreate):
if not body.title.strip():
raise HTTPException(status_code=400, detail="title is required")
if not await _db.get_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
task = await _db.create_task(
project_id=project_id,
title=body.title.strip(),
description=body.description,
status=body.status,
priority=body.priority,
labels=body.labels,
assignees=body.assignees,
due_at=body.due_at,
created_by=body.created_by,
)
# Auto-upsert dialog node
await _db.upsert_dialog_node(
project_id=project_id,
node_type="task",
ref_id=task["task_id"],
title=task["title"],
summary=task["description"][:200],
props={"status": task["status"], "priority": task["priority"]},
)
return task
@docs_router.get("/projects/{project_id}/tasks/{task_id}")
async def get_task(project_id: str, task_id: str):
task = await _db.get_task(task_id)
if not task or task["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Task not found")
return task
@docs_router.patch("/projects/{project_id}/tasks/{task_id}")
async def update_task(project_id: str, task_id: str, body: TaskUpdate):
task = await _db.get_task(task_id)
if not task or task["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Task not found")
updates = body.model_dump(exclude_none=True)
ok = await _db.update_task(task_id, **updates)
if ok and "status" in updates:
await _db.upsert_dialog_node(
project_id=project_id,
node_type="task",
ref_id=task_id,
title=task["title"],
props={"status": updates["status"]},
)
return {"ok": ok}
@docs_router.delete("/projects/{project_id}/tasks/{task_id}")
async def delete_task(project_id: str, task_id: str):
task = await _db.get_task(task_id)
if not task or task["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Task not found")
ok = await _db.delete_task(task_id)
return {"ok": ok}
# ── Meetings ───────────────────────────────────────────────────────────────────
class MeetingCreate(BaseModel):
title: str
starts_at: str
agenda: str = ""
duration_min: int = 30
location: str = ""
attendees: List[str] = []
created_by: str = ""
class MeetingUpdate(BaseModel):
title: Optional[str] = None
agenda: Optional[str] = None
starts_at: Optional[str] = None
duration_min: Optional[int] = None
location: Optional[str] = None
attendees: Optional[List[str]] = None
@docs_router.get("/projects/{project_id}/meetings")
async def list_meetings(project_id: str, limit: int = Query(50, ge=1, le=200)):
return await _db.list_meetings(project_id, limit=limit)
@docs_router.post("/projects/{project_id}/meetings", status_code=201)
async def create_meeting(project_id: str, body: MeetingCreate):
if not body.title.strip():
raise HTTPException(status_code=400, detail="title is required")
if not body.starts_at:
raise HTTPException(status_code=400, detail="starts_at is required")
if not await _db.get_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
meeting = await _db.create_meeting(
project_id=project_id,
title=body.title.strip(),
starts_at=body.starts_at,
agenda=body.agenda,
duration_min=body.duration_min,
location=body.location,
attendees=body.attendees,
created_by=body.created_by,
)
# Auto-upsert dialog node
await _db.upsert_dialog_node(
project_id=project_id,
node_type="meeting",
ref_id=meeting["meeting_id"],
title=meeting["title"],
summary=meeting["agenda"][:200],
props={"starts_at": meeting["starts_at"], "duration_min": meeting["duration_min"]},
)
return meeting
@docs_router.get("/projects/{project_id}/meetings/{meeting_id}")
async def get_meeting(project_id: str, meeting_id: str):
m = await _db.get_meeting(meeting_id)
if not m or m["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Meeting not found")
return m
@docs_router.patch("/projects/{project_id}/meetings/{meeting_id}")
async def update_meeting(project_id: str, meeting_id: str, body: MeetingUpdate):
m = await _db.get_meeting(meeting_id)
if not m or m["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Meeting not found")
updates = body.model_dump(exclude_none=True)
ok = await _db.update_meeting(meeting_id, **updates)
return {"ok": ok}
@docs_router.delete("/projects/{project_id}/meetings/{meeting_id}")
async def delete_meeting(project_id: str, meeting_id: str):
m = await _db.get_meeting(meeting_id)
if not m or m["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Meeting not found")
ok = await _db.delete_meeting(meeting_id)
return {"ok": ok}
# ── Dialog Map (Project-level graph) ─────────────────────────────────────────
@docs_router.get("/projects/{project_id}/dialog-map")
async def get_project_dialog_map(project_id: str):
"""Return canonical dialog graph for the project (all entity nodes + edges)."""
return await _db.get_project_dialog_map(project_id)
class LinkCreate(BaseModel):
from_type: str
from_id: str
to_type: str
to_id: str
edge_type: str = "references"
props: dict = {}
created_by: str = ""
@docs_router.post("/projects/{project_id}/dialog/link", status_code=201)
async def create_dialog_link(project_id: str, body: LinkCreate):
"""Create a dialog edge between two entities (auto-resolves/creates nodes)."""
if not await _db.get_project(project_id):
raise HTTPException(status_code=404, detail="Project not found")
# Resolve or create from_node
from_node = await _db.upsert_dialog_node(
project_id=project_id,
node_type=body.from_type,
ref_id=body.from_id,
title=f"{body.from_type}:{body.from_id[:8]}",
created_by=body.created_by,
)
# Resolve or create to_node
to_node = await _db.upsert_dialog_node(
project_id=project_id,
node_type=body.to_type,
ref_id=body.to_id,
title=f"{body.to_type}:{body.to_id[:8]}",
created_by=body.created_by,
)
edge = await _db.create_dialog_edge(
project_id=project_id,
from_node_id=from_node["node_id"],
to_node_id=to_node["node_id"],
edge_type=body.edge_type,
props=body.props,
created_by=body.created_by,
)
# Also persist as entity_link
await _db.create_entity_link(
project_id=project_id,
from_type=body.from_type, from_id=body.from_id,
to_type=body.to_type, to_id=body.to_id,
link_type=body.edge_type,
created_by=body.created_by,
)
return {
"ok": True,
"from_node": from_node,
"to_node": to_node,
"edge": edge,
}
@docs_router.get("/projects/{project_id}/dialog/views")
async def list_dialog_views(project_id: str):
return await _db.list_dialog_views(project_id)
class DialogViewSave(BaseModel):
name: str
filters: dict = {}
layout: dict = {}
@docs_router.put("/projects/{project_id}/dialog/views/{name}")
async def save_dialog_view(project_id: str, name: str, body: DialogViewSave):
view = await _db.upsert_dialog_view(
project_id=project_id,
name=name,
filters=body.filters,
layout=body.layout,
)
return view
# ── Doc Versions ──────────────────────────────────────────────────────────────
class DocUpdateRequest(BaseModel):
content_md: str
author_id: str = "system"
reason: str = ""
dry_run: bool = False
@docs_router.post("/projects/{project_id}/documents/{doc_id}/update")
async def update_document_version(project_id: str, doc_id: str, body: DocUpdateRequest):
"""Update document text and create a new version (idempotent by content hash).
dry_run=True: returns computed version_hash + diff_preview without writing.
"""
import hashlib, difflib
doc = await _db.get_document(doc_id)
if not doc or doc["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Document not found")
content = body.content_md.strip()
version_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
# Get latest version for diff
existing = await _db.list_doc_versions(doc_id, limit=1)
prev_content = ""
if existing:
prev_content = (await _db.get_doc_version_content(existing[0]["version_id"])) or ""
diff_lines = list(difflib.unified_diff(
prev_content.splitlines(), content.splitlines(),
fromfile="previous", tofile="updated", lineterm="", n=3,
))
diff_text = "\n".join(diff_lines[:80]) # cap for response
will_change = content != prev_content
if body.dry_run or not will_change:
return {
"ok": True,
"dry_run": body.dry_run,
"will_change": will_change,
"version_hash": version_hash,
"diff_text": diff_text,
}
new_ver = await _db.save_doc_version(doc_id, content, author_id=body.author_id)
return {
"ok": True,
"dry_run": False,
"will_change": True,
"version_hash": version_hash,
"version_id": new_ver["version_id"],
"created_at": new_ver["created_at"],
"diff_text": diff_text,
"reason": body.reason,
}
@docs_router.get("/projects/{project_id}/documents/{doc_id}/versions")
async def list_doc_versions(project_id: str, doc_id: str, limit: int = Query(20)):
doc = await _db.get_document(doc_id)
if not doc or doc["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Document not found")
return await _db.list_doc_versions(doc_id, limit=limit)
class DocVersionRestore(BaseModel):
version_id: str
author_id: str = "system"
@docs_router.post("/projects/{project_id}/documents/{doc_id}/restore")
async def restore_doc_version(project_id: str, doc_id: str, body: DocVersionRestore):
doc = await _db.get_document(doc_id)
if not doc or doc["project_id"] != project_id:
raise HTTPException(status_code=404, detail="Document not found")
content = await _db.get_doc_version_content(body.version_id)
if content is None:
raise HTTPException(status_code=404, detail="Version not found")
# Save restored content as new version
new_ver = await _db.save_doc_version(doc_id, content, author_id=body.author_id)
return {"ok": True, "new_version": new_ver, "restored_from": body.version_id}