373 lines
12 KiB
Python
373 lines
12 KiB
Python
import json
|
|
import os
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
from fastapi import Depends, FastAPI, Header, HTTPException, Query
|
|
|
|
DB_PATH = Path(os.getenv("ONEOK_CRM_DB_PATH", "/data/oneok_crm.sqlite"))
|
|
API_KEY = os.getenv("ONEOK_ADAPTER_API_KEY", "").strip()
|
|
|
|
app = FastAPI(title="1OK CRM Adapter", version="1.0.0")
|
|
|
|
|
|
def _utc_now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _connect() -> sqlite3.Connection:
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _init_db() -> None:
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS clients (
|
|
id TEXT PRIMARY KEY,
|
|
full_name TEXT,
|
|
phone TEXT,
|
|
email TEXT,
|
|
preferred_contact TEXT,
|
|
notes TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS sites (
|
|
id TEXT PRIMARY KEY,
|
|
client_id TEXT,
|
|
address_text TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS window_units (
|
|
id TEXT PRIMARY KEY,
|
|
site_id TEXT,
|
|
label TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS quotes (
|
|
id TEXT PRIMARY KEY,
|
|
client_id TEXT,
|
|
site_id TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id TEXT PRIMARY KEY,
|
|
site_id TEXT,
|
|
job_type TEXT,
|
|
status TEXT,
|
|
payload_json TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _auth(authorization: Optional[str] = Header(default=None)) -> None:
|
|
if not API_KEY:
|
|
return
|
|
expected = f"Bearer {API_KEY}"
|
|
if authorization != expected:
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
|
|
def _upsert_generic(
|
|
table: str,
|
|
payload: Dict[str, Any],
|
|
*,
|
|
id_field: str = "id",
|
|
indexed_fields: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
entity_id = str(payload.get(id_field) or uuid.uuid4())
|
|
now = _utc_now()
|
|
merged = dict(payload)
|
|
merged[id_field] = entity_id
|
|
idx = indexed_fields or {}
|
|
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(f"SELECT id FROM {table} WHERE id = ?", (entity_id,))
|
|
exists = cur.fetchone() is not None
|
|
|
|
if table == "clients":
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO clients(id, full_name, phone, email, preferred_contact, notes, payload_json, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
full_name=excluded.full_name,
|
|
phone=excluded.phone,
|
|
email=excluded.email,
|
|
preferred_contact=excluded.preferred_contact,
|
|
notes=excluded.notes,
|
|
payload_json=excluded.payload_json,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
entity_id,
|
|
str(idx.get("full_name") or ""),
|
|
str(idx.get("phone") or ""),
|
|
str(idx.get("email") or ""),
|
|
str(idx.get("preferred_contact") or ""),
|
|
str(idx.get("notes") or ""),
|
|
json.dumps(merged, ensure_ascii=False),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
elif table == "sites":
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO sites(id, client_id, address_text, payload_json, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
client_id=excluded.client_id,
|
|
address_text=excluded.address_text,
|
|
payload_json=excluded.payload_json,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
entity_id,
|
|
str(idx.get("client_id") or ""),
|
|
str(idx.get("address_text") or ""),
|
|
json.dumps(merged, ensure_ascii=False),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
elif table == "window_units":
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO window_units(id, site_id, label, payload_json, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
site_id=excluded.site_id,
|
|
label=excluded.label,
|
|
payload_json=excluded.payload_json,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
entity_id,
|
|
str(idx.get("site_id") or ""),
|
|
str(idx.get("label") or ""),
|
|
json.dumps(merged, ensure_ascii=False),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
elif table == "quotes":
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO quotes(id, client_id, site_id, payload_json, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
client_id=excluded.client_id,
|
|
site_id=excluded.site_id,
|
|
payload_json=excluded.payload_json,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
entity_id,
|
|
str(idx.get("client_id") or ""),
|
|
str(idx.get("site_id") or ""),
|
|
json.dumps(merged, ensure_ascii=False),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
elif table == "jobs":
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO jobs(id, site_id, job_type, status, payload_json, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
site_id=excluded.site_id,
|
|
job_type=excluded.job_type,
|
|
status=excluded.status,
|
|
payload_json=excluded.payload_json,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
entity_id,
|
|
str(idx.get("site_id") or ""),
|
|
str(idx.get("job_type") or ""),
|
|
str(idx.get("status") or "new"),
|
|
json.dumps(merged, ensure_ascii=False),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return {"id": entity_id, "created": not exists}
|
|
|
|
|
|
@app.on_event("startup")
|
|
def _startup() -> None:
|
|
_init_db()
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> Dict[str, Any]:
|
|
return {"status": "ok", "service": "oneok-crm-adapter"}
|
|
|
|
|
|
@app.get("/crm/search_client")
|
|
def search_client(query: str = Query(...), _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
q = f"%{query.strip().lower()}%"
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
SELECT id, full_name, phone, email, payload_json, updated_at
|
|
FROM clients
|
|
WHERE lower(full_name) LIKE ? OR lower(phone) LIKE ? OR lower(email) LIKE ?
|
|
ORDER BY updated_at DESC
|
|
LIMIT 20
|
|
""",
|
|
(q, q, q),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.close()
|
|
items = []
|
|
for r in rows:
|
|
payload = json.loads(r["payload_json"])
|
|
payload["id"] = r["id"]
|
|
items.append(payload)
|
|
return {"count": len(items), "items": items}
|
|
|
|
|
|
@app.post("/crm/upsert_client")
|
|
def upsert_client(client_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
result = _upsert_generic(
|
|
"clients",
|
|
client_payload,
|
|
indexed_fields={
|
|
"full_name": client_payload.get("full_name") or client_payload.get("label"),
|
|
"phone": client_payload.get("phone"),
|
|
"email": client_payload.get("email"),
|
|
"preferred_contact": client_payload.get("preferred_contact"),
|
|
"notes": client_payload.get("notes"),
|
|
},
|
|
)
|
|
return {"client_id": result["id"], "created": result["created"]}
|
|
|
|
|
|
@app.post("/crm/upsert_site")
|
|
def upsert_site(site_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
result = _upsert_generic(
|
|
"sites",
|
|
site_payload,
|
|
indexed_fields={
|
|
"client_id": site_payload.get("client_id"),
|
|
"address_text": site_payload.get("address_text"),
|
|
},
|
|
)
|
|
return {"site_id": result["id"], "created": result["created"]}
|
|
|
|
|
|
@app.post("/crm/upsert_window_unit")
|
|
def upsert_window_unit(window_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
result = _upsert_generic(
|
|
"window_units",
|
|
window_payload,
|
|
indexed_fields={
|
|
"site_id": window_payload.get("site_id"),
|
|
"label": window_payload.get("label") or window_payload.get("room"),
|
|
},
|
|
)
|
|
return {"window_id": result["id"], "created": result["created"]}
|
|
|
|
|
|
@app.post("/crm/create_quote")
|
|
def create_quote(quote_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
result = _upsert_generic(
|
|
"quotes",
|
|
quote_payload,
|
|
indexed_fields={
|
|
"client_id": quote_payload.get("client_id"),
|
|
"site_id": quote_payload.get("site_id"),
|
|
},
|
|
)
|
|
return {"quote_id": result["id"], "created": result["created"]}
|
|
|
|
|
|
@app.patch("/crm/update_quote")
|
|
def update_quote(body: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
quote_id = body.get("quote_id")
|
|
patch = body.get("patch")
|
|
if not quote_id or not isinstance(patch, dict):
|
|
raise HTTPException(status_code=400, detail="quote_id and patch required")
|
|
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT payload_json FROM quotes WHERE id = ?", (quote_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="quote not found")
|
|
payload = json.loads(row["payload_json"])
|
|
payload.update(patch)
|
|
finally:
|
|
conn.close()
|
|
|
|
result = _upsert_generic(
|
|
"quotes",
|
|
payload,
|
|
indexed_fields={"client_id": payload.get("client_id"), "site_id": payload.get("site_id")},
|
|
)
|
|
return {"quote_id": result["id"], "updated": True}
|
|
|
|
|
|
@app.post("/crm/create_job")
|
|
def create_job(job_payload: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
|
|
result = _upsert_generic(
|
|
"jobs",
|
|
job_payload,
|
|
indexed_fields={
|
|
"site_id": job_payload.get("site_id"),
|
|
"job_type": job_payload.get("job_type"),
|
|
"status": job_payload.get("status") or "new",
|
|
},
|
|
)
|
|
return {"job_id": result["id"], "created": result["created"]}
|