import json import os import sqlite3 import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Optional from fastapi import Depends, FastAPI, Header, HTTPException, Query DB_PATH = Path(os.getenv("ONEOK_CRM_DB_PATH", "/data/oneok_crm.sqlite")) API_KEY = os.getenv("ONEOK_ADAPTER_API_KEY", "").strip() app = FastAPI(title="1OK CRM Adapter", version="1.0.0") def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() def _connect() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def _init_db() -> None: conn = _connect() try: cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS clients ( id TEXT PRIMARY KEY, full_name TEXT, phone TEXT, email TEXT, preferred_contact TEXT, notes TEXT, payload_json TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS sites ( id TEXT PRIMARY KEY, client_id TEXT, address_text TEXT, payload_json TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS window_units ( id TEXT PRIMARY KEY, site_id TEXT, label TEXT, payload_json TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS quotes ( id TEXT PRIMARY KEY, client_id TEXT, site_id TEXT, payload_json TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, site_id TEXT, job_type TEXT, status TEXT, payload_json TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) conn.commit() finally: conn.close() def _auth(authorization: Optional[str] = Header(default=None)) -> None: if not API_KEY: return expected = f"Bearer {API_KEY}" if authorization != expected: raise HTTPException(status_code=401, detail="Unauthorized") def _upsert_generic( table: str, payload: Dict[str, Any], *, id_field: str = "id", indexed_fields: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: entity_id = str(payload.get(id_field) or uuid.uuid4()) now = _utc_now() merged = dict(payload) merged[id_field] = entity_id idx = indexed_fields or {} conn = _connect() try: cur = conn.cursor() cur.execute(f"SELECT id FROM {table} WHERE id = ?", (entity_id,)) exists = cur.fetchone() is not None if table == "clients": cur.execute( """ INSERT INTO clients(id, full_name, phone, email, preferred_contact, notes, payload_json, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET full_name=excluded.full_name, phone=excluded.phone, email=excluded.email, preferred_contact=excluded.preferred_contact, notes=excluded.notes, payload_json=excluded.payload_json, updated_at=excluded.updated_at """, ( entity_id, str(idx.get("full_name") or ""), str(idx.get("phone") or ""), str(idx.get("email") or ""), str(idx.get("preferred_contact") or ""), str(idx.get("notes") or ""), json.dumps(merged, ensure_ascii=False), now, now, ), ) elif table == "sites": cur.execute( """ INSERT INTO sites(id, client_id, address_text, payload_json, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET client_id=excluded.client_id, address_text=excluded.address_text, payload_json=excluded.payload_json, updated_at=excluded.updated_at """, ( entity_id, str(idx.get("client_id") or ""), str(idx.get("address_text") or ""), json.dumps(merged, ensure_ascii=False), now, now, ), ) elif table == "window_units": cur.execute( """ INSERT INTO window_units(id, site_id, label, payload_json, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET site_id=excluded.site_id, label=excluded.label, payload_json=excluded.payload_json, updated_at=excluded.updated_at """, ( entity_id, str(idx.get("site_id") or ""), str(idx.get("label") or ""), json.dumps(merged, ensure_ascii=False), now, now, ), ) elif table == "quotes": cur.execute( """ INSERT INTO quotes(id, client_id, site_id, payload_json, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET client_id=excluded.client_id, site_id=excluded.site_id, payload_json=excluded.payload_json, updated_at=excluded.updated_at """, ( entity_id, str(idx.get("client_id") or ""), str(idx.get("site_id") or ""), json.dumps(merged, ensure_ascii=False), now, now, ), ) elif table == "jobs": cur.execute( """ INSERT INTO jobs(id, site_id, job_type, status, payload_json, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET site_id=excluded.site_id, job_type=excluded.job_type, status=excluded.status, payload_json=excluded.payload_json, updated_at=excluded.updated_at """, ( entity_id, str(idx.get("site_id") or ""), str(idx.get("job_type") or ""), str(idx.get("status") or "new"), json.dumps(merged, ensure_ascii=False), now, now, ), ) conn.commit() finally: conn.close() return {"id": entity_id, "created": not exists} @app.on_event("startup") def _startup() -> None: _init_db() @app.get("/health") def health() -> Dict[str, Any]: return {"status": "ok", "service": "oneok-crm-adapter"} @app.get("/crm/search_client") def search_client(query: str = Query(...), _: None = Depends(_auth)) -> Dict[str, Any]: q = f"%{query.strip().lower()}%" conn = _connect() try: cur = conn.cursor() cur.execute( """ SELECT id, full_name, phone, email, payload_json, updated_at FROM clients WHERE lower(full_name) LIKE ? OR lower(phone) LIKE ? OR lower(email) LIKE ? ORDER BY updated_at DESC LIMIT 20 """, (q, q, q), ) rows = cur.fetchall() finally: conn.close() items = [] for r in rows: payload = json.loads(r["payload_json"]) payload["id"] = r["id"] items.append(payload) return {"count": len(items), "items": items} @app.post("/crm/upsert_client") def upsert_client(client_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]: result = _upsert_generic( "clients", client_payload, indexed_fields={ "full_name": client_payload.get("full_name") or client_payload.get("label"), "phone": client_payload.get("phone"), "email": client_payload.get("email"), "preferred_contact": client_payload.get("preferred_contact"), "notes": client_payload.get("notes"), }, ) return {"client_id": result["id"], "created": result["created"]} @app.post("/crm/upsert_site") def upsert_site(site_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]: result = _upsert_generic( "sites", site_payload, indexed_fields={ "client_id": site_payload.get("client_id"), "address_text": site_payload.get("address_text"), }, ) return {"site_id": result["id"], "created": result["created"]} @app.post("/crm/upsert_window_unit") def upsert_window_unit(window_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]: result = _upsert_generic( "window_units", window_payload, indexed_fields={ "site_id": window_payload.get("site_id"), "label": window_payload.get("label") or window_payload.get("room"), }, ) return {"window_id": result["id"], "created": result["created"]} @app.post("/crm/create_quote") def create_quote(quote_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]: result = _upsert_generic( "quotes", quote_payload, indexed_fields={ "client_id": quote_payload.get("client_id"), "site_id": quote_payload.get("site_id"), }, ) return {"quote_id": result["id"], "created": result["created"]} @app.patch("/crm/update_quote") def update_quote(body: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]: quote_id = body.get("quote_id") patch = body.get("patch") if not quote_id or not isinstance(patch, dict): raise HTTPException(status_code=400, detail="quote_id and patch required") conn = _connect() try: cur = conn.cursor() cur.execute("SELECT payload_json FROM quotes WHERE id = ?", (quote_id,)) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="quote not found") payload = json.loads(row["payload_json"]) payload.update(patch) finally: conn.close() result = _upsert_generic( "quotes", payload, indexed_fields={"client_id": payload.get("client_id"), "site_id": payload.get("site_id")}, ) return {"quote_id": result["id"], "updated": True} @app.post("/crm/create_job") def create_job(job_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]: result = _upsert_generic( "jobs", job_payload, indexed_fields={ "site_id": job_payload.get("site_id"), "job_type": job_payload.get("job_type"), "status": job_payload.get("status") or "new", }, ) return {"job_id": result["id"], "created": result["created"]}