diff --git a/ops/monitor_notify_sofiia.sh b/ops/monitor_notify_sofiia.sh old mode 100644 new mode 100755 index cb9d0c88..6fca6bf9 --- a/ops/monitor_notify_sofiia.sh +++ b/ops/monitor_notify_sofiia.sh @@ -7,6 +7,7 @@ ROUTER_URL="${ROUTER_URL:-http://127.0.0.1:9102}" REPORT_ENABLED="${SOFIIA_REPORTS_ENABLED:-true}" REPORT_MODE="${SOFIIA_REPORT_MODE:-fail_only}" # fail_only | always REPORT_TIMEOUT="${SOFIIA_REPORT_TIMEOUT:-180}" +REPORT_MAX_TOKENS="${SOFIIA_REPORT_MAX_TOKENS:-900}" REPORT_CHAT_ID="${SOFIIA_REPORT_CHAT_ID:-ops-monitor-sofiia}" REPORT_USER_ID="${SOFIIA_REPORT_USER_ID:-ops-monitor-agent}" REPORT_USERNAME="${SOFIIA_REPORT_USERNAME:-monitor-agent}" @@ -23,7 +24,7 @@ if [[ ! -f "$STATUS_JSON" ]]; then exit 0 fi -python3 - "$STATUS_JSON" "$ROOT" "$ROUTER_URL" "$REPORT_MODE" "$REPORT_TIMEOUT" "$REPORT_CHAT_ID" "$REPORT_USER_ID" "$REPORT_USERNAME" "$REPORT_TELEGRAM_CHAT_ID" "$SOFIIA_BOT_TOKEN" <<'PY' +python3 - "$STATUS_JSON" "$ROOT" "$ROUTER_URL" "$REPORT_MODE" "$REPORT_TIMEOUT" "$REPORT_MAX_TOKENS" "$REPORT_CHAT_ID" "$REPORT_USER_ID" "$REPORT_USERNAME" "$REPORT_TELEGRAM_CHAT_ID" "$SOFIIA_BOT_TOKEN" <<'PY' import json import sys from pathlib import Path @@ -35,11 +36,12 @@ root = Path(sys.argv[2]) router_url = sys.argv[3].rstrip('/') report_mode = sys.argv[4] timeout_s = int(sys.argv[5]) -chat_id = sys.argv[6] -user_id = sys.argv[7] -username = sys.argv[8] -tg_chat_id = sys.argv[9].strip() -tg_token = sys.argv[10].strip() +max_tokens = int(sys.argv[6]) +chat_id = sys.argv[7] +user_id = sys.argv[8] +username = sys.argv[9] +tg_chat_id = sys.argv[10].strip() +tg_token = sys.argv[11].strip() payload = json.loads(status_json.read_text(encoding='utf-8')) status = str(payload.get('status', 'unknown')).lower() @@ -70,7 +72,7 @@ prompt = ( body = { 'prompt': prompt, - 'max_tokens': 400, + 'max_tokens': max_tokens, 'temperature': 0.1, 'metadata': { 'source': 'ops-monitor-canary', @@ -99,26 +101,42 @@ try: print(f"[OK] sofiia report sent: backend={data.get('backend')} model={data.get('model')} preview={short!r}") if tg_chat_id and tg_token and text: - msg = ( + def chunk_text(value: str, limit: int = 3500): + chunks = [] + remaining = value + while remaining: + if len(remaining) <= limit: + chunks.append(remaining) + break + split_at = remaining.rfind('\n', 0, limit) + if split_at < max(1, limit // 2): + split_at = limit + chunks.append(remaining[:split_at].rstrip()) + remaining = remaining[split_at:].lstrip() + return chunks or [value] + + header = ( "[NODE1 Monitor]\n" f"status={payload.get('status')} exit_code={payload.get('exit_code')}\n\n" - f"{text[:3500]}" ) - tg_req = urlreq.Request( - url=f"https://api.telegram.org/bot{tg_token}/sendMessage", - data=json.dumps({"chat_id": tg_chat_id, "text": msg}).encode('utf-8'), - headers={'Content-Type': 'application/json'}, - method='POST', - ) - try: + parts = chunk_text(text, 3500 - len("(99/99)\n")) + total = len(parts) + delivered = 0 + for idx, part in enumerate(parts, start=1): + prefix = f"({idx}/{total})\n" if total > 1 else "" + msg = f"{header}{prefix}{part}" if idx == 1 else f"{prefix}{part}" + tg_req = urlreq.Request( + url=f"https://api.telegram.org/bot{tg_token}/sendMessage", + data=json.dumps({"chat_id": tg_chat_id, "text": msg}).encode('utf-8'), + headers={'Content-Type': 'application/json'}, + method='POST', + ) with urlreq.urlopen(tg_req, timeout=20) as tg_resp: tg_data = json.loads(tg_resp.read().decode('utf-8', errors='ignore')) - if tg_data.get('ok'): - print(f"[OK] telegram report delivered: chat_id={tg_chat_id}") - else: - print(f"[WARN] telegram send not ok: {tg_data}") - except Exception as tg_e: - print(f"[WARN] telegram send failed: {tg_e}") + if not tg_data.get('ok'): + raise RuntimeError(f"telegram send not ok: {tg_data}") + delivered += 1 + print(f"[OK] telegram report delivered: chat_id={tg_chat_id} parts={delivered}") else: print('[INFO] telegram delivery skipped (missing SOFIIA_REPORT_TELEGRAM_CHAT_ID or token or empty text)') except HTTPError as e: diff --git a/services/agent-e2e-prober/main.py b/services/agent-e2e-prober/main.py index c57d864c..1ac11eae 100644 --- a/services/agent-e2e-prober/main.py +++ b/services/agent-e2e-prober/main.py @@ -16,9 +16,16 @@ logger = logging.getLogger(__name__) # Configuration GATEWAY_URL = os.getenv("GATEWAY_URL", "http://gateway:9300") +ROUTER_URL = os.getenv("ROUTER_URL", "http://router:8000") PROBE_INTERVAL = int(os.getenv("PROBE_INTERVAL", "60")) # seconds PROBE_TIMEOUT = int(os.getenv("PROBE_TIMEOUT", "30")) # seconds +SEMANTIC_TIMEOUT = int(os.getenv("SEMANTIC_TIMEOUT", "45")) # seconds METRICS_PORT = int(os.getenv("METRICS_PORT", "9108")) +SEMANTIC_PROBE_ENABLED = os.getenv("SEMANTIC_PROBE_ENABLED", "true").lower() == "true" +SEMANTIC_AGENTS = [a.strip() for a in os.getenv("SEMANTIC_AGENTS", "clan,sofiia,monitor").split(",") if a.strip()] +SEMANTIC_PROMPT = os.getenv("SEMANTIC_PROMPT", "Коротко: хто такий DAARWIZZ?") +SEMANTIC_EXPECT_KEYWORD = os.getenv("SEMANTIC_EXPECT_KEYWORD", "daarwizz").lower() +MONITOR_EXPECT_LOCAL = os.getenv("MONITOR_EXPECT_LOCAL", "true").lower() == "true" # Prometheus metrics agent_e2e_success = Gauge('agent_e2e_success', 'Whether last E2E probe succeeded', ['target']) @@ -42,7 +49,7 @@ async def probe_gateway_health() -> tuple[bool, float, str]: async with httpx.AsyncClient(timeout=PROBE_TIMEOUT) as client: resp = await client.get(f"{GATEWAY_URL}/health") latency = time.time() - start - + if resp.status_code == 200: data = resp.json() if data.get("status") == "healthy": @@ -67,7 +74,7 @@ async def probe_agent_ping() -> tuple[bool, float, str]: json={"probe": True, "timestamp": datetime.utcnow().isoformat()} ) latency = time.time() - start - + if resp.status_code == 200: data = resp.json() if data.get("success"): @@ -100,7 +107,7 @@ async def probe_webhook_echo() -> tuple[bool, float, str]: "text": "/health" # Simple health check command } } - + async with httpx.AsyncClient(timeout=PROBE_TIMEOUT) as client: # Use helion webhook as it's the most tested resp = await client.post( @@ -108,7 +115,7 @@ async def probe_webhook_echo() -> tuple[bool, float, str]: json=test_update ) latency = time.time() - start - + if resp.status_code == 200: return True, latency, "" else: @@ -119,53 +126,102 @@ async def probe_webhook_echo() -> tuple[bool, float, str]: return False, time.time() - start, f"error: {str(e)[:50]}" +async def probe_agent_semantic(agent_id: str) -> tuple[bool, float, str]: + """Probe semantic response via router infer and assert DAARWIZZ awareness.""" + start = time.time() + try: + payload = { + "prompt": SEMANTIC_PROMPT, + "max_tokens": 180, + "temperature": 0.1, + "metadata": { + "agent_id": agent_id, + "user_id": "tg:0", + "chat_id": "0", + "username": "e2e-prober", + "raw_user_text": SEMANTIC_PROMPT, + }, + } + async with httpx.AsyncClient(timeout=SEMANTIC_TIMEOUT) as client: + resp = await client.post(f"{ROUTER_URL}/v1/agents/{agent_id}/infer", json=payload) + latency = time.time() - start + if resp.status_code != 200: + return False, latency, f"http_{resp.status_code}" + + data = resp.json() + answer = str(data.get("response") or "") + backend = str(data.get("backend") or "") + model = str(data.get("model") or "") + + answer_lc = answer.lower() + if SEMANTIC_EXPECT_KEYWORD not in answer_lc and "даар" not in answer_lc: + return False, latency, "no_daarwizz_in_answer" + + if MONITOR_EXPECT_LOCAL and agent_id == "monitor": + local_ok = ("ollama" in backend.lower()) or model.lower().startswith("qwen") + if not local_ok: + return False, latency, f"monitor_nonlocal_backend:{backend}:{model}" + + return True, latency, "" + except httpx.TimeoutException: + return False, time.time() - start, "timeout" + except Exception as e: + return False, time.time() - start, f"error: {str(e)[:50]}" + + +def record_probe(target: str, success: bool, latency: float, reason: str): + """Record probe metrics and log line.""" + agent_e2e_runs_total.labels(target=target).inc() + agent_e2e_success.labels(target=target).set(1 if success else 0) + agent_e2e_latency.labels(target=target).set(latency) + agent_e2e_latency_histogram.labels(target=target).observe(latency) + if not success: + agent_e2e_failures_total.labels(target=target, reason=reason).inc() + logger.info(f"{target}: success={success}, latency={latency:.3f}s, reason={reason}") + + async def run_probes(): """Run all probes and update metrics""" # Probe 1: Gateway health success, latency, reason = await probe_gateway_health() - agent_e2e_runs_total.labels(target="gateway_health").inc() - agent_e2e_success.labels(target="gateway_health").set(1 if success else 0) - agent_e2e_latency.labels(target="gateway_health").set(latency) - agent_e2e_latency_histogram.labels(target="gateway_health").observe(latency) - if not success: - agent_e2e_failures_total.labels(target="gateway_health", reason=reason).inc() - logger.info(f"gateway_health: success={success}, latency={latency:.3f}s, reason={reason}") - + record_probe("gateway_health", success, latency, reason) + # Probe 2: Agent ping (if endpoint exists) success, latency, reason = await probe_agent_ping() - agent_e2e_runs_total.labels(target="agent_ping").inc() - agent_e2e_success.labels(target="agent_ping").set(1 if success else 0) - agent_e2e_latency.labels(target="agent_ping").set(latency) - agent_e2e_latency_histogram.labels(target="agent_ping").observe(latency) - if not success: - agent_e2e_failures_total.labels(target="agent_ping", reason=reason).inc() - logger.info(f"agent_ping: success={success}, latency={latency:.3f}s, reason={reason}") - + record_probe("agent_ping", success, latency, reason) + # Probe 3: Webhook E2E (full path test) success, latency, reason = await probe_webhook_echo() - agent_e2e_runs_total.labels(target="webhook_e2e").inc() - agent_e2e_success.labels(target="webhook_e2e").set(1 if success else 0) - agent_e2e_latency.labels(target="webhook_e2e").set(latency) - agent_e2e_latency_histogram.labels(target="webhook_e2e").observe(latency) - if not success: - agent_e2e_failures_total.labels(target="webhook_e2e", reason=reason).inc() - logger.info(f"webhook_e2e: success={success}, latency={latency:.3f}s, reason={reason}") + record_probe("webhook_e2e", success, latency, reason) + + # Probe 4+: semantic checks for selected agents (parallel) + if SEMANTIC_PROBE_ENABLED and SEMANTIC_AGENTS: + results = await asyncio.gather(*(probe_agent_semantic(agent_id) for agent_id in SEMANTIC_AGENTS)) + matrix = [] + for agent_id, (success, latency, reason) in zip(SEMANTIC_AGENTS, results): + record_probe(f"semantic_{agent_id}", success, latency, reason) + matrix.append(f"{agent_id}:{'PASS' if success else 'FAIL'}") + logger.info("semantic_matrix: " + " | ".join(matrix)) async def main(): - logger.info(f"Starting E2E Agent Prober") + logger.info("Starting E2E Agent Prober") logger.info(f" GATEWAY_URL: {GATEWAY_URL}") + logger.info(f" ROUTER_URL: {ROUTER_URL}") logger.info(f" PROBE_INTERVAL: {PROBE_INTERVAL}s") logger.info(f" PROBE_TIMEOUT: {PROBE_TIMEOUT}s") logger.info(f" METRICS_PORT: {METRICS_PORT}") - + logger.info(f" SEMANTIC_TIMEOUT: {SEMANTIC_TIMEOUT}s") + logger.info(f" SEMANTIC_PROBE_ENABLED: {SEMANTIC_PROBE_ENABLED}") + logger.info(f" SEMANTIC_AGENTS: {','.join(SEMANTIC_AGENTS)}") + # Start Prometheus metrics server start_http_server(METRICS_PORT) logger.info(f"Prometheus metrics available at :{METRICS_PORT}/metrics") - + # Initial probe await run_probes() - + # Continuous probing while True: await asyncio.sleep(PROBE_INTERVAL) diff --git a/services/plant-vision-node1/Dockerfile b/services/plant-vision-node1/Dockerfile new file mode 100644 index 00000000..fcf80226 --- /dev/null +++ b/services/plant-vision-node1/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +EXPOSE 8085 + +HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen(http://localhost:8085/health)" + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8085"] diff --git a/services/plant-vision-node1/main.py b/services/plant-vision-node1/main.py new file mode 100644 index 00000000..568b7df9 --- /dev/null +++ b/services/plant-vision-node1/main.py @@ -0,0 +1,238 @@ +import json +import os +import re +import shlex +import subprocess +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Optional + +import httpx +from fastapi import FastAPI, File, HTTPException, UploadFile +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + +app = FastAPI(title="plant-vision-node1", version="0.1.1") + + +class IdentifyRequest(BaseModel): + image_url: Optional[str] = None + top_k: int = Field(default=3, ge=1, le=10) + + +def _normalize_predictions(raw: Any, top_k: int) -> List[Dict[str, Any]]: + preds: List[Dict[str, Any]] = [] + if isinstance(raw, dict): + for key in ("predictions", "results", "candidates"): + if isinstance(raw.get(key), list): + raw = raw[key] + break + if isinstance(raw, list): + for item in raw[:top_k]: + if not isinstance(item, dict): + continue + name = ( + item.get("scientific_name") + or item.get("scientificName") + or item.get("label") + or item.get("name") + or "unknown" + ) + common = item.get("common_name") or item.get("commonName") or item.get("common") or "-" + score = item.get("score", item.get("confidence", 0.0)) + try: + score_f = float(score) + except Exception: + score_f = 0.0 + preds.append({"scientific_name": str(name), "common_name": str(common), "score": score_f}) + return preds[:top_k] + + +def _parse_text_output(text: str, top_k: int) -> List[Dict[str, Any]]: + """ + Parse only model score lines, e.g.: + 97.6% Persicaria amphibia + 86.1% Canada Goldenrod (Solidago canadensis) + Ignore service lines like "Read ..." or "Classification of ...". + """ + preds: List[Dict[str, Any]] = [] + for raw_line in (text or "").splitlines(): + line = raw_line.strip() + if not line or "%" not in line: + continue + + m = re.match(r"^\s*(\d+(?:\.\d+)?)%\s+(.+)$", line) + if not m: + continue + + score_str, name_part = m.groups() + try: + score = float(score_str) + except ValueError: + continue + + name = name_part.strip() + if not name: + continue + + common_name = "-" + scientific_name = name + + # If output is "Common Name (Scientific name)", preserve both. + paren = re.match(r"^(.*?)\s*\(([^()]+)\)\s*$", name) + if paren: + common, scientific = paren.groups() + common = common.strip() + scientific = scientific.strip() + if common: + common_name = common + if scientific: + scientific_name = scientific + + preds.append( + { + "scientific_name": scientific_name, + "common_name": common_name, + "score": score, + } + ) + + preds.sort(key=lambda x: float(x.get("score", 0.0)), reverse=True) + return preds[:top_k] + + +def _extract_inference_time(stdout: str) -> Optional[float]: + m = re.search(r"took\s+(\d+(?:\.\d+)?)\s+secs", stdout or "") + if not m: + return None + try: + return float(m.group(1)) + except Exception: + return None + + +def _run_nature_id_cli(image_path: str, top_k: int) -> Dict[str, Any]: + cmd_tmpl = (os.getenv("NATURE_ID_CMD") or "").strip() + timeout_s = int(os.getenv("NATURE_ID_TIMEOUT", "40")) + + if not cmd_tmpl: + raise RuntimeError("NATURE_ID_CMD is not configured") + + cmd = cmd_tmpl.replace("{image_path}", image_path) + proc = subprocess.run( + shlex.split(cmd), + capture_output=True, + text=True, + timeout=timeout_s, + check=False, + ) + if proc.returncode != 0: + raise RuntimeError(f"nature-id cli failed rc={proc.returncode}: {proc.stderr.strip()[:240]}") + + out = (proc.stdout or "").strip() + inference_time_sec = _extract_inference_time(out) + if not out: + return {"predictions": [], "inference_time_sec": inference_time_sec} + + try: + parsed = json.loads(out) + preds = _normalize_predictions(parsed, top_k) + except Exception: + preds = _parse_text_output(out, top_k) + + return {"predictions": preds, "inference_time_sec": inference_time_sec} + + +async def _download_image(image_url: str) -> str: + timeout_s = float(os.getenv("DOWNLOAD_TIMEOUT", "20")) + async with httpx.AsyncClient(timeout=timeout_s) as client: + resp = await client.get(image_url) + resp.raise_for_status() + data = resp.content + + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f: + f.write(data) + return f.name + + +def _response_payload(result: Dict[str, Any]) -> Dict[str, Any]: + preds = result.get("predictions") or [] + top_k = [ + { + "confidence": float(p.get("score", 0.0)), + "name": str((p.get("common_name") if p.get("common_name") not in (None, "", "-") else p.get("scientific_name")) or "unknown"), + "scientific_name": str(p.get("scientific_name") or "unknown"), + } + for p in preds + ] + return { + "status": "success", + "model": "aiy_plants_V1", + "source": "nature-id-cli", + "count": len(preds), + "inference_time_sec": result.get("inference_time_sec"), + "predictions": preds, + "top_k": top_k, + } + + +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(_, exc: RequestValidationError): + # Avoid leaking raw multipart bytes in validation responses. + errs: List[Dict[str, Any]] = [] + for e in exc.errors() or []: + errs.append({"loc": e.get("loc"), "msg": e.get("msg"), "type": e.get("type")}) + return JSONResponse(status_code=422, content={"detail": errs}) + + +@app.get("/health") +def health() -> Dict[str, Any]: + cmd = (os.getenv("NATURE_ID_CMD") or "").strip() + return { + "status": "healthy", + "nature_id_cmd_configured": bool(cmd), + "nature_id_cmd": cmd, + } + + +@app.post("/identify") +async def identify(payload: IdentifyRequest) -> Dict[str, Any]: + if not payload.image_url: + raise HTTPException(status_code=400, detail="image_url is required") + + tmp_path = "" + try: + tmp_path = await _download_image(payload.image_url) + result = _run_nature_id_cli(tmp_path, payload.top_k) + return _response_payload(result) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=503, detail=f"identify_failed: {e}") + finally: + if tmp_path: + try: + Path(tmp_path).unlink(missing_ok=True) + except Exception: + pass + + +@app.post("/identify-file") +async def identify_file(file: UploadFile = File(...), top_k: int = 3) -> Dict[str, Any]: + top_k = max(1, min(top_k, 10)) + tmp_path = "" + try: + with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as f: + f.write(await file.read()) + tmp_path = f.name + result = _run_nature_id_cli(tmp_path, top_k) + return _response_payload(result) + except Exception as e: + raise HTTPException(status_code=503, detail=f"identify_failed: {e}") + finally: + if tmp_path: + try: + Path(tmp_path).unlink(missing_ok=True) + except Exception: + pass diff --git a/services/plant-vision-node1/requirements.txt b/services/plant-vision-node1/requirements.txt new file mode 100644 index 00000000..dccabe92 --- /dev/null +++ b/services/plant-vision-node1/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.115.5 +uvicorn[standard]==0.32.1 +httpx==0.28.1 +python-multipart==0.0.17 +Pillow==11.1.0 +requests==2.32.3 +tflite-runtime==2.14.0 +numpy==1.26.4