diff --git a/docker-compose.node1.yml b/docker-compose.node1.yml index 7dcb1a13..dbcec849 100644 --- a/docker-compose.node1.yml +++ b/docker-compose.node1.yml @@ -46,6 +46,8 @@ services: - ONEOK_SCHEDULE_BASE_URL=http://oneok-schedule-adapter:8091 - ONEOK_ADAPTER_API_KEY=${ONEOK_ADAPTER_API_KEY} - ROUTER_TOOL_MAX_ROUNDS=${ROUTER_TOOL_MAX_ROUNDS:-10} + - AGROMATRIX_REVIEW_AUTH_MODE=${AGROMATRIX_REVIEW_AUTH_MODE:-bearer} + - AGROMATRIX_REVIEW_BEARER_TOKENS=${AGROMATRIX_REVIEW_BEARER_TOKENS:-mentor-review-dev-token-change-me} volumes: - ${DEPLOY_ROOT:-.}/services/router/router_config.yaml:/app/router_config.yaml:ro - ${DEPLOY_ROOT:-.}/services/router/router-config.yml:/app/router-config.yml:ro @@ -220,6 +222,8 @@ services: - WEB_SEARCH_SERVICE_URL=http://swapper-service:8890 - REDIS_URL=redis://redis:6379/0 - CREWAI_SERVICE_URL=http://dagi-staging-crewai-service:9010 + - AGROMATRIX_REVIEW_AUTH_MODE=${AGROMATRIX_REVIEW_AUTH_MODE:-bearer} + - AGROMATRIX_REVIEW_BEARER_TOKENS=${AGROMATRIX_REVIEW_BEARER_TOKENS:-mentor-review-dev-token-change-me} volumes: - ${DEPLOY_ROOT:-.}/gateway-bot:/app/gateway-bot:ro - ${DEPLOY_ROOT:-.}/logs:/app/logs diff --git a/gateway-bot/daarion_facade/invoke_api.py b/gateway-bot/daarion_facade/invoke_api.py index 2be0fc32..6cab2b54 100644 --- a/gateway-bot/daarion_facade/invoke_api.py +++ b/gateway-bot/daarion_facade/invoke_api.py @@ -1,10 +1,12 @@ import asyncio from datetime import datetime, timezone +import hmac import json import os import uuid from typing import Any, Dict, List +import httpx from fastapi import APIRouter, HTTPException, Request, status from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field @@ -17,6 +19,14 @@ router = APIRouter(prefix="/v1", tags=["daarion-facade"]) EVENT_TERMINAL_STATUSES = {"done", "failed"} EVENT_KNOWN_STATUSES = {"queued", "running", "done", "failed"} EVENT_POLL_SECONDS = float(os.getenv("DAARION_JOB_EVENTS_POLL_SECONDS", "0.5")) +ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000").rstrip("/") +ROUTER_REVIEW_TIMEOUT = float(os.getenv("DAARION_ROUTER_REVIEW_TIMEOUT_SECONDS", "20")) +AGROMATRIX_REVIEW_AUTH_MODE = os.getenv("AGROMATRIX_REVIEW_AUTH_MODE", "bearer").strip().lower() +AGROMATRIX_REVIEW_BEARER_TOKENS = [ + part.strip() + for part in os.getenv("AGROMATRIX_REVIEW_BEARER_TOKENS", "").replace(";", ",").split(",") + if part.strip() +] class InvokeInput(BaseModel): @@ -36,6 +46,69 @@ class InvokeResponse(BaseModel): status_url: str +class SharedMemoryReviewRequest(BaseModel): + point_id: str + approve: bool + reviewer: str | None = None + note: str | None = None + + +def _extract_bearer_token(request: Request) -> str: + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing Bearer token") + token = auth_header[len("Bearer ") :].strip() + if not token: + raise HTTPException(status_code=401, detail="Empty Bearer token") + return token + + +def _require_mentor_auth(request: Request) -> str: + mode = AGROMATRIX_REVIEW_AUTH_MODE + if mode in {"off", "none", "disabled"}: + return "" + if mode != "bearer": + raise HTTPException(status_code=500, detail=f"Unsupported AGROMATRIX_REVIEW_AUTH_MODE={mode}") + if not AGROMATRIX_REVIEW_BEARER_TOKENS: + raise HTTPException(status_code=503, detail="Review auth is not configured") + token = _extract_bearer_token(request) + if not any(hmac.compare_digest(token, candidate) for candidate in AGROMATRIX_REVIEW_BEARER_TOKENS): + raise HTTPException(status_code=403, detail="Invalid mentor token") + return token + + +async def _router_json( + method: str, + path: str, + *, + payload: Dict[str, Any] | None = None, + params: Dict[str, Any] | None = None, + authorization: str | None = None, +) -> Dict[str, Any]: + headers: Dict[str, str] = {} + if authorization: + headers["Authorization"] = authorization + url = f"{ROUTER_URL}{path}" + + try: + async with httpx.AsyncClient(timeout=ROUTER_REVIEW_TIMEOUT) as client: + resp = await client.request(method, url, json=payload, params=params, headers=headers) + except httpx.TimeoutException: + raise HTTPException(status_code=504, detail="Router timeout") + except Exception as e: + raise HTTPException(status_code=502, detail=f"Router unavailable: {e}") + + try: + body = resp.json() + except Exception: + body = {"raw": resp.text} + + if resp.status_code >= 400: + detail = body.get("detail") if isinstance(body, dict) else body + raise HTTPException(status_code=resp.status_code, detail=detail or f"Router error {resp.status_code}") + return body if isinstance(body, dict) else {"data": body} + + def _sse_message(event: str, payload: Dict[str, Any]) -> str: return f"event: {event}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n" @@ -116,3 +189,24 @@ async def job_events(job_id: str, request: Request) -> StreamingResponse: "X-Accel-Buffering": "no", }, ) + + +@router.get("/agromatrix/shared-memory/pending") +async def agromatrix_shared_pending(limit: int = 50) -> Dict[str, Any]: + return await _router_json( + "GET", + "/v1/agromatrix/shared-memory/pending", + params={"limit": max(1, min(limit, 200))}, + ) + + +@router.post("/agromatrix/shared-memory/review") +async def agromatrix_shared_review(req: SharedMemoryReviewRequest, request: Request) -> Dict[str, Any]: + token = _require_mentor_auth(request) + auth_header = f"Bearer {token}" if token else None + return await _router_json( + "POST", + "/v1/agromatrix/shared-memory/review", + payload=req.model_dump(), + authorization=auth_header, + ) diff --git a/scripts/node1/agromatrix_regression_smoke.py b/scripts/node1/agromatrix_regression_smoke.py index e672bf46..2e205f27 100755 --- a/scripts/node1/agromatrix_regression_smoke.py +++ b/scripts/node1/agromatrix_regression_smoke.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import argparse import json +import os import sys import urllib.error import urllib.request @@ -12,13 +13,13 @@ TINY_PNG_DATA_URL = ( ) -def http_json(method: str, url: str, payload=None): +def http_json(method: str, url: str, payload=None, headers=None): data = None - headers = {} + req_headers = dict(headers or {}) if payload is not None: data = json.dumps(payload).encode("utf-8") - headers["Content-Type"] = "application/json" - req = urllib.request.Request(url, data=data, headers=headers, method=method) + req_headers.setdefault("Content-Type", "application/json") + req = urllib.request.Request(url, data=data, headers=req_headers, method=method) try: with urllib.request.urlopen(req, timeout=60) as resp: body = resp.read().decode("utf-8", errors="replace") @@ -46,6 +47,14 @@ def main() -> int: parser.add_argument("--chat-id", default="smoke-agromatrix") parser.add_argument("--user-id", default="smoke-user") parser.add_argument("--skip-review-404", action="store_true") + parser.add_argument( + "--mentor-token", + default=( + os.getenv("AGROMATRIX_REVIEW_BEARER_TOKEN") + or (os.getenv("AGROMATRIX_REVIEW_BEARER_TOKENS", "").split(",")[0].strip()) + or "" + ), + ) args = parser.parse_args() ok_all = True @@ -95,6 +104,9 @@ def main() -> int: ok_all &= check(status == 200 and pending_shape, "shared_pending_endpoint", f"total={pending.get('total')}") if not args.skip_review_404: + req_headers = {} + if args.mentor_token: + req_headers["Authorization"] = f"Bearer {args.mentor_token}" status, review = http_json( "POST", f"{args.base_url}/v1/agromatrix/shared-memory/review", @@ -104,8 +116,10 @@ def main() -> int: "reviewer": "smoke", "note": "nonexistent id check", }, + headers=req_headers, ) - ok_all &= check(status == 404, "shared_review_not_found_contract", str(review)) + expected = 404 if args.mentor_token else 401 + ok_all &= check(status == expected, "shared_review_not_found_contract", str(review)) return 0 if ok_all else 1 diff --git a/services/router/main.py b/services/router/main.py index 737475ba..3e16ad9b 100644 --- a/services/router/main.py +++ b/services/router/main.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request from fastapi.responses import Response from pydantic import BaseModel from typing import Literal, Optional, Dict, Any, List @@ -10,6 +10,7 @@ import yaml import httpx import logging import hashlib +import hmac import time # For latency metrics from difflib import SequenceMatcher @@ -888,6 +889,12 @@ CLAN_RUNTIME_CONSENT_EVENT_SCHEMA_PATH = os.getenv( NEO4J_URI = os.getenv("NEO4J_BOLT_URL", "bolt://neo4j:7687") NEO4J_USER = os.getenv("NEO4J_USER", "neo4j") NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "DaarionNeo4j2026!") +AGROMATRIX_REVIEW_AUTH_MODE = os.getenv("AGROMATRIX_REVIEW_AUTH_MODE", "bearer").strip().lower() +AGROMATRIX_REVIEW_BEARER_TOKENS = [ + part.strip() + for part in os.getenv("AGROMATRIX_REVIEW_BEARER_TOKENS", "").replace(";", ",").split(",") + if part.strip() +] # HTTP client for backend services http_client: Optional[httpx.AsyncClient] = None @@ -1235,6 +1242,30 @@ class SharedMemoryReviewRequest(BaseModel): note: Optional[str] = None +def _require_agromatrix_review_auth(request: Request) -> None: + mode = AGROMATRIX_REVIEW_AUTH_MODE + if mode in {"off", "none", "disabled"}: + return + + if mode != "bearer": + raise HTTPException(status_code=500, detail=f"Unsupported AGROMATRIX_REVIEW_AUTH_MODE={mode}") + + if not AGROMATRIX_REVIEW_BEARER_TOKENS: + logger.error("AGROMATRIX_REVIEW_AUTH_MODE=bearer but AGROMATRIX_REVIEW_BEARER_TOKENS is empty") + raise HTTPException(status_code=503, detail="Review auth is not configured") + + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing Bearer token") + + token = auth_header[len("Bearer ") :].strip() + if not token: + raise HTTPException(status_code=401, detail="Empty Bearer token") + + if not any(hmac.compare_digest(token, candidate) for candidate in AGROMATRIX_REVIEW_BEARER_TOKENS): + raise HTTPException(status_code=403, detail="Invalid mentor token") + + # ========================================================================= @@ -2889,8 +2920,10 @@ async def agromatrix_shared_pending(limit: int = 50): @app.post("/v1/agromatrix/shared-memory/review") -async def agromatrix_shared_review(req: SharedMemoryReviewRequest): +async def agromatrix_shared_review(req: SharedMemoryReviewRequest, request: Request): """Approve or reject a pending shared agronomy memory case.""" + _require_agromatrix_review_auth(request) + if not MEMORY_RETRIEVAL_AVAILABLE or not memory_retrieval: raise HTTPException(status_code=503, detail="Memory retrieval not available") if not hasattr(memory_retrieval, "review_shared_pending_case"):