New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
140 lines
4.4 KiB
Python
140 lines
4.4 KiB
Python
"""
|
|
Reminder Worker - Background worker for calendar reminders
|
|
Polls for pending reminders and sends notifications
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
import threading
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ReminderWorker:
|
|
"""
|
|
Background worker that processes calendar reminders.
|
|
|
|
Runs in background thread, polling for pending reminders
|
|
and sending notifications via configured channels.
|
|
"""
|
|
|
|
def __init__(self, storage, poll_interval: int = 60):
|
|
self.storage = storage
|
|
self.poll_interval = poll_interval
|
|
self.running = False
|
|
self.thread = None
|
|
self.notification_handler = None
|
|
|
|
# Stats
|
|
self.processed_count = 0
|
|
self.failed_count = 0
|
|
self.last_run = None
|
|
|
|
def start(self):
|
|
"""Start the worker thread"""
|
|
if self.running:
|
|
logger.warning("Worker already running")
|
|
return
|
|
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._run_loop, daemon=True)
|
|
self.thread.start()
|
|
|
|
logger.info("Reminder worker started")
|
|
|
|
def stop(self):
|
|
"""Stop the worker thread"""
|
|
self.running = False
|
|
if self.thread:
|
|
self.thread.join(timeout=5)
|
|
|
|
logger.info("Reminder worker stopped")
|
|
|
|
def _run_loop(self):
|
|
"""Main worker loop"""
|
|
while self.running:
|
|
try:
|
|
self._process_reminders()
|
|
except Exception as e:
|
|
logger.error(f"Error in reminder loop: {e}")
|
|
|
|
# Sleep until next poll
|
|
time.sleep(self.poll_interval)
|
|
|
|
def _process_reminders(self):
|
|
"""Process pending reminders"""
|
|
pending = self.storage.get_pending_reminders()
|
|
|
|
if not pending:
|
|
return
|
|
|
|
logger.info(f"Processing {len(pending)} pending reminders")
|
|
|
|
for reminder in pending:
|
|
try:
|
|
self._send_reminder(reminder)
|
|
self.storage.update_reminder_status(reminder.id, "sent")
|
|
self.processed_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send reminder {reminder.id}: {e}")
|
|
self.storage.update_reminder_status(
|
|
reminder.id,
|
|
"failed" if reminder.attempts >= 3 else "pending",
|
|
str(e)
|
|
)
|
|
self.failed_count += 1
|
|
|
|
self.last_run = datetime.utcnow()
|
|
|
|
def _send_reminder(self, reminder):
|
|
"""Send reminder via appropriate channel"""
|
|
# Get event details
|
|
# In production, fetch event from CalDAV
|
|
|
|
event_info = {
|
|
"uid": reminder.event_uid,
|
|
"user_id": reminder.user_id,
|
|
"workspace_id": reminder.workspace_id
|
|
}
|
|
|
|
if reminder.channel == "inapp":
|
|
self._send_inapp(reminder, event_info)
|
|
elif reminder.channel == "telegram":
|
|
self._send_telegram(reminder, event_info)
|
|
elif reminder.channel == "email":
|
|
self._send_email(reminder, event_info)
|
|
else:
|
|
logger.warning(f"Unknown channel: {reminder.channel}")
|
|
|
|
def _send_inapp(self, reminder, event_info):
|
|
"""Send in-app notification"""
|
|
# In production, would send to notification service
|
|
logger.info(f"[INAPP] Reminder for event {reminder.event_uid}")
|
|
|
|
def _send_telegram(self, reminder, event_info):
|
|
"""Send Telegram notification"""
|
|
# In production, use Telegram bot API
|
|
logger.info(f"[TELEGRAM] Reminder for event {reminder.event_uid}")
|
|
|
|
def _send_email(self, reminder, event_info):
|
|
"""Send email notification"""
|
|
# In production, use email service
|
|
logger.info(f"[EMAIL] Reminder for event {reminder.event_uid}")
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get worker status"""
|
|
return {
|
|
"running": self.running,
|
|
"processed_count": self.processed_count,
|
|
"failed_count": self.failed_count,
|
|
"last_run": self.last_run.isoformat() if self.last_run else None,
|
|
"poll_interval": self.poll_interval
|
|
}
|
|
|
|
def set_notification_handler(self, handler):
|
|
"""Set custom notification handler"""
|
|
self.notification_handler = handler
|