Config policies (16 files): alert_routing, architecture_pressure, backlog, cost_weights, data_governance, incident_escalation, incident_intelligence, network_allowlist, nodes_registry, observability_sources, rbac_tools_matrix, release_gate, risk_attribution, risk_policy, slo_policy, tool_limits, tools_rollout Ops (22 files): Caddyfile, calendar compose, grafana voice dashboard, deployments/incidents logs, runbooks for alerts/audit/backlog/incidents/sofiia/voice, cron jobs, scripts (alert_triage, audit_cleanup, migrate_*, governance, schedule), task_registry, voice alerts/ha/latency/policy Docs (30+ files): HUMANIZED_STEPAN v2.7-v3 changelogs and runbooks, NODA1/NODA2 status and setup, audit index and traces, backlog, incident, supervisor, tools, voice, opencode, release, risk, aistalk, spacebot Made-with: Cursor
294 lines
12 KiB
Python
294 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
voice_canary.py — Voice pipeline health canary.
|
|
|
|
Two modes:
|
|
--mode preflight Hard-fail (exit 1) if Polina/Ostap don't synthesize.
|
|
Used in ops/fabric_preflight.sh before any deployment.
|
|
--mode runtime Soft-check: emit metrics + print results + alert via webhook.
|
|
Used by cron every 5-10 minutes to catch edge-tts degradation early.
|
|
|
|
Usage:
|
|
python3 ops/scripts/voice_canary.py --mode preflight
|
|
python3 ops/scripts/voice_canary.py --mode runtime --pushgateway http://localhost:9091
|
|
|
|
Environment:
|
|
MEMORY_SERVICE_URL default: http://localhost:8000
|
|
SOFIIA_CONSOLE_URL default: http://localhost:8002
|
|
ALERT_WEBHOOK_URL optional: Slack/Telegram webhook for runtime alerts
|
|
PUSHGATEWAY_URL optional: Prometheus Pushgateway for runtime metrics
|
|
CANARY_TTS_MAX_MS override max allowed synthesis time (default: 3000)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import urllib.error
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
MEMORY_URL = os.getenv("MEMORY_SERVICE_URL", "http://localhost:8000")
|
|
CONSOLE_URL = os.getenv("SOFIIA_CONSOLE_URL", "http://localhost:8002")
|
|
ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK_URL", "")
|
|
PUSHGATEWAY_URL = os.getenv("PUSHGATEWAY_URL", "")
|
|
CANARY_TTS_MAX_MS = int(os.getenv("CANARY_TTS_MAX_MS", "3000"))
|
|
MIN_AUDIO_BYTES = 1000
|
|
|
|
TEST_VOICES = [
|
|
("uk-UA-PolinaNeural", "Polina"),
|
|
("uk-UA-OstapNeural", "Ostap"),
|
|
]
|
|
TEST_TEXT = "Тест синтезу мовлення. Голос працює коректно."
|
|
|
|
|
|
@dataclass
|
|
class CanaryResult:
|
|
voice: str
|
|
voice_id: str
|
|
ok: bool
|
|
ms: Optional[int] = None
|
|
audio_bytes: Optional[int] = None
|
|
error: Optional[str] = None
|
|
status_code: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class CanaryReport:
|
|
mode: str
|
|
ts: str = field(default_factory=lambda: time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))
|
|
results: list[CanaryResult] = field(default_factory=list)
|
|
overall: str = "ok" # ok | degraded | failed
|
|
degraded_voices: list[str] = field(default_factory=list)
|
|
failed_voices: list[str] = field(default_factory=list)
|
|
health_endpoint_ok: bool = False
|
|
health_ms: Optional[int] = None
|
|
|
|
|
|
def _http_json(url: str, method: str = "GET", body: Optional[dict] = None,
|
|
timeout: int = 10) -> tuple[int, dict]:
|
|
data = json.dumps(body).encode() if body else None
|
|
headers = {"Content-Type": "application/json"} if data else {}
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return resp.status, json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, {}
|
|
except Exception as e:
|
|
return 0, {"error": str(e)}
|
|
|
|
|
|
def _http_post_binary(url: str, body: dict, timeout: int = 15) -> tuple[int, int]:
|
|
"""Returns (status_code, content_length_bytes)."""
|
|
data = json.dumps(body).encode()
|
|
req = urllib.request.Request(url, data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
content = resp.read()
|
|
return resp.status, len(content)
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, 0
|
|
except Exception as e:
|
|
return 0, 0
|
|
|
|
|
|
def check_health_endpoint(report: CanaryReport) -> None:
|
|
"""Quick probe of /voice/health on memory-service."""
|
|
t0 = time.monotonic()
|
|
status, data = _http_json(f"{MEMORY_URL}/voice/health", timeout=8)
|
|
report.health_ms = int((time.monotonic() - t0) * 1000)
|
|
report.health_endpoint_ok = (status == 200)
|
|
if status != 200:
|
|
print(f" [WARN] /voice/health returned HTTP {status}")
|
|
else:
|
|
edge_status = data.get("edge_tts", "?")
|
|
print(f" [OK] /voice/health: edge_tts={edge_status} in {report.health_ms}ms")
|
|
|
|
|
|
def check_tts_synthesis(report: CanaryReport) -> None:
|
|
"""Perform live synthesis for each test voice."""
|
|
for voice_id, voice_name in TEST_VOICES:
|
|
t0 = time.monotonic()
|
|
status, audio_bytes = _http_post_binary(
|
|
f"{MEMORY_URL}/voice/tts",
|
|
{"text": TEST_TEXT, "voice": voice_id, "speed": 1.0},
|
|
timeout=CANARY_TTS_MAX_MS // 1000 + 5,
|
|
)
|
|
ms = int((time.monotonic() - t0) * 1000)
|
|
|
|
if status == 200 and audio_bytes >= MIN_AUDIO_BYTES:
|
|
ok = True
|
|
error = None
|
|
if ms > CANARY_TTS_MAX_MS:
|
|
# Synthesis succeeded but too slow → degraded, not failed
|
|
ok = False
|
|
error = f"slow: {ms}ms > {CANARY_TTS_MAX_MS}ms SLO"
|
|
report.degraded_voices.append(voice_name)
|
|
print(f" [SLOW] {voice_name} ({voice_id}): {ms}ms > {CANARY_TTS_MAX_MS}ms, {audio_bytes}B")
|
|
else:
|
|
print(f" [OK] {voice_name} ({voice_id}): {ms}ms, {audio_bytes}B")
|
|
else:
|
|
ok = False
|
|
error = f"HTTP {status}, {audio_bytes}B"
|
|
report.failed_voices.append(voice_name)
|
|
print(f" [FAIL] {voice_name} ({voice_id}): HTTP {status}, {audio_bytes}B")
|
|
|
|
report.results.append(CanaryResult(
|
|
voice=voice_name, voice_id=voice_id,
|
|
ok=ok and error is None, ms=ms,
|
|
audio_bytes=audio_bytes, error=error,
|
|
status_code=status,
|
|
))
|
|
|
|
if report.failed_voices:
|
|
report.overall = "failed"
|
|
elif report.degraded_voices:
|
|
report.overall = "degraded"
|
|
|
|
|
|
def push_metrics(report: CanaryReport, pushgateway: str) -> None:
|
|
"""Push canary results to Prometheus Pushgateway."""
|
|
lines = []
|
|
for r in report.results:
|
|
label = f'voice="{r.voice_id}"'
|
|
if r.ms is not None:
|
|
lines.append(f'voice_canary_tts_ms{{{label}}} {r.ms}')
|
|
lines.append(f'voice_canary_ok{{{label}}} {1 if r.ok else 0}')
|
|
lines.append(f'voice_canary_health_ok {1 if report.health_endpoint_ok else 0}')
|
|
payload = "\n".join(lines) + "\n"
|
|
url = f"{pushgateway.rstrip('/')}/metrics/job/voice_canary/instance/noda2"
|
|
data = payload.encode()
|
|
req = urllib.request.Request(url, data=data,
|
|
headers={"Content-Type": "text/plain"},
|
|
method="PUT")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=5):
|
|
print(f" [PUSH] Metrics pushed to {url}")
|
|
except Exception as e:
|
|
print(f" [WARN] Pushgateway push failed: {e}")
|
|
|
|
|
|
def send_alert(report: CanaryReport, webhook: str) -> None:
|
|
"""Send alert to Slack/Telegram webhook."""
|
|
if not webhook or report.overall == "ok":
|
|
return
|
|
emoji = "🔴" if report.overall == "failed" else "🟡"
|
|
summary_lines = []
|
|
for r in report.results:
|
|
status = "✓" if r.ok else ("⚠ SLOW" if r.error and "slow" in r.error else "✗ FAIL")
|
|
timing = f"{r.ms}ms" if r.ms else "N/A"
|
|
summary_lines.append(f" {status} {r.voice} ({timing})")
|
|
text = (
|
|
f"{emoji} *Voice Canary {report.overall.upper()}* `{report.ts}`\n"
|
|
f"{'\\n'.join(summary_lines)}\n"
|
|
f"Health endpoint: {'✓' if report.health_endpoint_ok else '✗'}\n"
|
|
f"Degraded: {report.degraded_voices or 'none'}\n"
|
|
f"Failed: {report.failed_voices or 'none'}"
|
|
)
|
|
body = {"text": text}
|
|
# Try Slack format, fallback to plain
|
|
try:
|
|
data = json.dumps(body).encode()
|
|
req = urllib.request.Request(webhook, data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST")
|
|
with urllib.request.urlopen(req, timeout=5):
|
|
print(f" [ALERT] Webhook sent ({report.overall})")
|
|
except Exception as e:
|
|
print(f" [WARN] Webhook failed: {e}")
|
|
|
|
|
|
def run_preflight(report: CanaryReport) -> int:
|
|
"""Preflight mode: hard-fail on any synthesis failure."""
|
|
print("── Voice Canary: PREFLIGHT mode ──────────────────────────────────")
|
|
check_health_endpoint(report)
|
|
check_tts_synthesis(report)
|
|
|
|
if report.failed_voices:
|
|
print(f"\n[FATAL] Preflight FAILED — voices failed synthesis: {report.failed_voices}")
|
|
print(" Deployment blocked. Fix edge-tts / memory-service before proceeding.")
|
|
print(f" Run: docker logs dagi-memory-service-node2 --tail 50")
|
|
print(f" Check: curl {MEMORY_URL}/voice/health")
|
|
return 1
|
|
|
|
if report.degraded_voices:
|
|
# Degraded (slow) in preflight = warn but don't block
|
|
print(f"\n[WARN] Preflight DEGRADED — voices slow: {report.degraded_voices}")
|
|
print(f" Deployment allowed (soft warning). Monitor voice_tts_compute_ms after deploy.")
|
|
|
|
print(f"\n[OK] Voice preflight passed — all voices operational.")
|
|
return 0
|
|
|
|
|
|
def run_runtime(report: CanaryReport, pushgateway: str, webhook: str) -> int:
|
|
"""Runtime canary mode: metrics + alert, no hard-fail."""
|
|
print("── Voice Canary: RUNTIME mode ────────────────────────────────────")
|
|
check_health_endpoint(report)
|
|
check_tts_synthesis(report)
|
|
|
|
if pushgateway:
|
|
push_metrics(report, pushgateway)
|
|
if webhook:
|
|
send_alert(report, webhook)
|
|
|
|
# Write result to ops/voice_canary_last.json for policy_update.py to read
|
|
result_path = os.path.join(os.path.dirname(__file__), "..", "voice_canary_last.json")
|
|
try:
|
|
with open(result_path, "w") as f:
|
|
json.dump({
|
|
"ts": report.ts,
|
|
"overall": report.overall,
|
|
"results": [
|
|
{"voice": r.voice, "ok": r.ok, "ms": r.ms,
|
|
"audio_bytes": r.audio_bytes, "error": r.error}
|
|
for r in report.results
|
|
],
|
|
"degraded_voices": report.degraded_voices,
|
|
"failed_voices": report.failed_voices,
|
|
}, f, indent=2)
|
|
print(f" [JSON] Result saved to {result_path}")
|
|
except Exception as e:
|
|
print(f" [WARN] Could not save result: {e}")
|
|
|
|
status_emoji = {"ok": "✓", "degraded": "⚠", "failed": "✗"}[report.overall]
|
|
print(f"\n{status_emoji} Runtime canary: {report.overall.upper()}")
|
|
return 0 # runtime never hard-fails — alerting handles escalation
|
|
|
|
|
|
def main() -> int:
|
|
_default_memory = os.getenv("MEMORY_SERVICE_URL", "http://localhost:8000")
|
|
_default_pgw = os.getenv("PUSHGATEWAY_URL", "")
|
|
_default_hook = os.getenv("ALERT_WEBHOOK_URL", "")
|
|
|
|
parser = argparse.ArgumentParser(description="Voice pipeline canary check")
|
|
parser.add_argument("--mode", choices=["preflight", "runtime"], default="preflight")
|
|
parser.add_argument("--pushgateway", default=_default_pgw,
|
|
help="Prometheus Pushgateway URL (runtime mode)")
|
|
parser.add_argument("--webhook", default=_default_hook,
|
|
help="Alert webhook URL (runtime mode)")
|
|
parser.add_argument("--memory-url", default=_default_memory,
|
|
help=f"Memory service URL (default: {_default_memory})")
|
|
args = parser.parse_args()
|
|
|
|
global MEMORY_URL # noqa: PLW0603
|
|
MEMORY_URL = args.memory_url
|
|
|
|
report = CanaryReport(mode=args.mode)
|
|
|
|
if args.mode == "preflight":
|
|
return run_preflight(report)
|
|
else:
|
|
return run_runtime(report, args.pushgateway, args.webhook)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|