Files
microdao-daarion/services/oneok-schedule-adapter/app.py

68 lines
2.1 KiB
Python

import os
import uuid
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from fastapi import Depends, FastAPI, Header, HTTPException
API_KEY = os.getenv("ONEOK_ADAPTER_API_KEY", "").strip()
DEFAULT_TZ = os.getenv("ONEOK_SCHEDULE_TZ", "Europe/Kyiv")
app = FastAPI(title="1OK Schedule Adapter", version="1.0.0")
def _auth(authorization: Optional[str] = Header(default=None)) -> None:
if not API_KEY:
return
expected = f"Bearer {API_KEY}"
if authorization != expected:
raise HTTPException(status_code=401, detail="Unauthorized")
def _next_work_slots(count: int = 3) -> List[Dict[str, Any]]:
now = datetime.utcnow()
slots: List[Dict[str, Any]] = []
d = now
while len(slots) < count:
d += timedelta(days=1)
if d.weekday() >= 5:
continue
day = d.strftime("%Y-%m-%d")
for hour in (10, 13, 16):
slots.append(
{
"slot_id": str(uuid.uuid4()),
"start_local": f"{day}T{hour:02d}:00:00",
"end_local": f"{day}T{hour+1:02d}:00:00",
"timezone": DEFAULT_TZ,
}
)
if len(slots) >= count:
break
return slots
@app.get("/health")
def health() -> Dict[str, Any]:
return {"status": "ok", "service": "oneok-schedule-adapter", "timezone": DEFAULT_TZ}
@app.post("/schedule/propose_slots")
def propose_slots(params: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
count = int(params.get("count") or 3)
count = 3 if count < 1 else min(count, 8)
return {"slots": _next_work_slots(count=count)}
@app.post("/schedule/confirm_slot")
def confirm_slot(body: Dict[str, Any], _: None = Depends(_auth)) -> Dict[str, Any]:
job_id = body.get("job_id")
slot = body.get("slot")
if not job_id or slot is None:
raise HTTPException(status_code=400, detail="job_id and slot required")
return {
"job_id": job_id,
"confirmed_slot": slot,
"status": "confirmed",
}