#!/usr/bin/env python3 """ voice_canary.py — Voice pipeline health canary. Two modes: --mode preflight Hard-fail (exit 1) if Polina/Ostap don't synthesize. Used in ops/fabric_preflight.sh before any deployment. --mode runtime Soft-check: emit metrics + print results + alert via webhook. Used by cron every 5-10 minutes to catch edge-tts degradation early. Usage: python3 ops/scripts/voice_canary.py --mode preflight python3 ops/scripts/voice_canary.py --mode runtime --pushgateway http://localhost:9091 Environment: MEMORY_SERVICE_URL default: http://localhost:8000 SOFIIA_CONSOLE_URL default: http://localhost:8002 ALERT_WEBHOOK_URL optional: Slack/Telegram webhook for runtime alerts PUSHGATEWAY_URL optional: Prometheus Pushgateway for runtime metrics CANARY_TTS_MAX_MS override max allowed synthesis time (default: 3000) """ from __future__ import annotations import argparse import json import os import sys import time import urllib.request import urllib.error from dataclasses import dataclass, field from typing import Optional # ── Config ──────────────────────────────────────────────────────────────────── MEMORY_URL = os.getenv("MEMORY_SERVICE_URL", "http://localhost:8000") CONSOLE_URL = os.getenv("SOFIIA_CONSOLE_URL", "http://localhost:8002") ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK_URL", "") PUSHGATEWAY_URL = os.getenv("PUSHGATEWAY_URL", "") CANARY_TTS_MAX_MS = int(os.getenv("CANARY_TTS_MAX_MS", "3000")) MIN_AUDIO_BYTES = 1000 TEST_VOICES = [ ("uk-UA-PolinaNeural", "Polina"), ("uk-UA-OstapNeural", "Ostap"), ] TEST_TEXT = "Тест синтезу мовлення. Голос працює коректно." @dataclass class CanaryResult: voice: str voice_id: str ok: bool ms: Optional[int] = None audio_bytes: Optional[int] = None error: Optional[str] = None status_code: Optional[int] = None @dataclass class CanaryReport: mode: str ts: str = field(default_factory=lambda: time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) results: list[CanaryResult] = field(default_factory=list) overall: str = "ok" # ok | degraded | failed degraded_voices: list[str] = field(default_factory=list) failed_voices: list[str] = field(default_factory=list) health_endpoint_ok: bool = False health_ms: Optional[int] = None def _http_json(url: str, method: str = "GET", body: Optional[dict] = None, timeout: int = 10) -> tuple[int, dict]: data = json.dumps(body).encode() if body else None headers = {"Content-Type": "application/json"} if data else {} req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=timeout) as resp: return resp.status, json.loads(resp.read()) except urllib.error.HTTPError as e: return e.code, {} except Exception as e: return 0, {"error": str(e)} def _http_post_binary(url: str, body: dict, timeout: int = 15) -> tuple[int, int]: """Returns (status_code, content_length_bytes).""" data = json.dumps(body).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") try: with urllib.request.urlopen(req, timeout=timeout) as resp: content = resp.read() return resp.status, len(content) except urllib.error.HTTPError as e: return e.code, 0 except Exception as e: return 0, 0 def check_health_endpoint(report: CanaryReport) -> None: """Quick probe of /voice/health on memory-service.""" t0 = time.monotonic() status, data = _http_json(f"{MEMORY_URL}/voice/health", timeout=8) report.health_ms = int((time.monotonic() - t0) * 1000) report.health_endpoint_ok = (status == 200) if status != 200: print(f" [WARN] /voice/health returned HTTP {status}") else: edge_status = data.get("edge_tts", "?") print(f" [OK] /voice/health: edge_tts={edge_status} in {report.health_ms}ms") def check_tts_synthesis(report: CanaryReport) -> None: """Perform live synthesis for each test voice.""" for voice_id, voice_name in TEST_VOICES: t0 = time.monotonic() status, audio_bytes = _http_post_binary( f"{MEMORY_URL}/voice/tts", {"text": TEST_TEXT, "voice": voice_id, "speed": 1.0}, timeout=CANARY_TTS_MAX_MS // 1000 + 5, ) ms = int((time.monotonic() - t0) * 1000) if status == 200 and audio_bytes >= MIN_AUDIO_BYTES: ok = True error = None if ms > CANARY_TTS_MAX_MS: # Synthesis succeeded but too slow → degraded, not failed ok = False error = f"slow: {ms}ms > {CANARY_TTS_MAX_MS}ms SLO" report.degraded_voices.append(voice_name) print(f" [SLOW] {voice_name} ({voice_id}): {ms}ms > {CANARY_TTS_MAX_MS}ms, {audio_bytes}B") else: print(f" [OK] {voice_name} ({voice_id}): {ms}ms, {audio_bytes}B") else: ok = False error = f"HTTP {status}, {audio_bytes}B" report.failed_voices.append(voice_name) print(f" [FAIL] {voice_name} ({voice_id}): HTTP {status}, {audio_bytes}B") report.results.append(CanaryResult( voice=voice_name, voice_id=voice_id, ok=ok and error is None, ms=ms, audio_bytes=audio_bytes, error=error, status_code=status, )) if report.failed_voices: report.overall = "failed" elif report.degraded_voices: report.overall = "degraded" def push_metrics(report: CanaryReport, pushgateway: str) -> None: """Push canary results to Prometheus Pushgateway.""" lines = [] for r in report.results: label = f'voice="{r.voice_id}"' if r.ms is not None: lines.append(f'voice_canary_tts_ms{{{label}}} {r.ms}') lines.append(f'voice_canary_ok{{{label}}} {1 if r.ok else 0}') lines.append(f'voice_canary_health_ok {1 if report.health_endpoint_ok else 0}') payload = "\n".join(lines) + "\n" url = f"{pushgateway.rstrip('/')}/metrics/job/voice_canary/instance/noda2" data = payload.encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "text/plain"}, method="PUT") try: with urllib.request.urlopen(req, timeout=5): print(f" [PUSH] Metrics pushed to {url}") except Exception as e: print(f" [WARN] Pushgateway push failed: {e}") def send_alert(report: CanaryReport, webhook: str) -> None: """Send alert to Slack/Telegram webhook.""" if not webhook or report.overall == "ok": return emoji = "🔴" if report.overall == "failed" else "🟡" summary_lines = [] for r in report.results: status = "✓" if r.ok else ("⚠ SLOW" if r.error and "slow" in r.error else "✗ FAIL") timing = f"{r.ms}ms" if r.ms else "N/A" summary_lines.append(f" {status} {r.voice} ({timing})") text = ( f"{emoji} *Voice Canary {report.overall.upper()}* `{report.ts}`\n" f"{'\\n'.join(summary_lines)}\n" f"Health endpoint: {'✓' if report.health_endpoint_ok else '✗'}\n" f"Degraded: {report.degraded_voices or 'none'}\n" f"Failed: {report.failed_voices or 'none'}" ) body = {"text": text} # Try Slack format, fallback to plain try: data = json.dumps(body).encode() req = urllib.request.Request(webhook, data=data, headers={"Content-Type": "application/json"}, method="POST") with urllib.request.urlopen(req, timeout=5): print(f" [ALERT] Webhook sent ({report.overall})") except Exception as e: print(f" [WARN] Webhook failed: {e}") def run_preflight(report: CanaryReport) -> int: """Preflight mode: hard-fail on any synthesis failure.""" print("── Voice Canary: PREFLIGHT mode ──────────────────────────────────") check_health_endpoint(report) check_tts_synthesis(report) if report.failed_voices: print(f"\n[FATAL] Preflight FAILED — voices failed synthesis: {report.failed_voices}") print(" Deployment blocked. Fix edge-tts / memory-service before proceeding.") print(f" Run: docker logs dagi-memory-service-node2 --tail 50") print(f" Check: curl {MEMORY_URL}/voice/health") return 1 if report.degraded_voices: # Degraded (slow) in preflight = warn but don't block print(f"\n[WARN] Preflight DEGRADED — voices slow: {report.degraded_voices}") print(f" Deployment allowed (soft warning). Monitor voice_tts_compute_ms after deploy.") print(f"\n[OK] Voice preflight passed — all voices operational.") return 0 def run_runtime(report: CanaryReport, pushgateway: str, webhook: str) -> int: """Runtime canary mode: metrics + alert, no hard-fail.""" print("── Voice Canary: RUNTIME mode ────────────────────────────────────") check_health_endpoint(report) check_tts_synthesis(report) if pushgateway: push_metrics(report, pushgateway) if webhook: send_alert(report, webhook) # Write result to ops/voice_canary_last.json for policy_update.py to read result_path = os.path.join(os.path.dirname(__file__), "..", "voice_canary_last.json") try: with open(result_path, "w") as f: json.dump({ "ts": report.ts, "overall": report.overall, "results": [ {"voice": r.voice, "ok": r.ok, "ms": r.ms, "audio_bytes": r.audio_bytes, "error": r.error} for r in report.results ], "degraded_voices": report.degraded_voices, "failed_voices": report.failed_voices, }, f, indent=2) print(f" [JSON] Result saved to {result_path}") except Exception as e: print(f" [WARN] Could not save result: {e}") status_emoji = {"ok": "✓", "degraded": "⚠", "failed": "✗"}[report.overall] print(f"\n{status_emoji} Runtime canary: {report.overall.upper()}") return 0 # runtime never hard-fails — alerting handles escalation def main() -> int: _default_memory = os.getenv("MEMORY_SERVICE_URL", "http://localhost:8000") _default_pgw = os.getenv("PUSHGATEWAY_URL", "") _default_hook = os.getenv("ALERT_WEBHOOK_URL", "") parser = argparse.ArgumentParser(description="Voice pipeline canary check") parser.add_argument("--mode", choices=["preflight", "runtime"], default="preflight") parser.add_argument("--pushgateway", default=_default_pgw, help="Prometheus Pushgateway URL (runtime mode)") parser.add_argument("--webhook", default=_default_hook, help="Alert webhook URL (runtime mode)") parser.add_argument("--memory-url", default=_default_memory, help=f"Memory service URL (default: {_default_memory})") args = parser.parse_args() global MEMORY_URL # noqa: PLW0603 MEMORY_URL = args.memory_url report = CanaryReport(mode=args.mode) if args.mode == "preflight": return run_preflight(report) else: return run_runtime(report, args.pushgateway, args.webhook) if __name__ == "__main__": sys.exit(main())