Files
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

244 lines
7.3 KiB
Python

"""
Calendar Service Tests
"""
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime, timedelta
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import app, CalendarToolRequest, CreateEventRequest, ConnectRequest
from storage import CalendarStorage, CalendarAccount, CalendarReminder
from calendar_client import CalDAVClient
class TestCalendarStorage:
"""Test CalendarStorage"""
def test_create_account(self):
"""Test creating calendar account"""
storage = CalendarStorage()
account = storage.create_account(
workspace_id="ws1",
user_id="user1",
provider="radicale",
username="testuser",
password="testpass",
principal_url="/testuser/",
default_calendar_id="default"
)
assert account.id.startswith("acc_")
assert account.workspace_id == "ws1"
assert account.user_id == "user1"
assert account.provider == "radicale"
assert account.username == "testuser"
def test_get_account(self):
"""Test getting account by ID"""
storage = CalendarStorage()
account = storage.create_account(
workspace_id="ws1",
user_id="user1",
provider="radicale",
username="testuser",
password="testpass"
)
retrieved = storage.get_account(account.id)
assert retrieved is not None
assert retrieved.id == account.id
def test_list_accounts(self):
"""Test listing accounts for user"""
storage = CalendarStorage()
storage.create_account("ws1", "user1", "radicale", "user1", "pass1")
storage.create_account("ws1", "user1", "google", "user1@gmail.com", "pass2")
storage.create_account("ws1", "user2", "radicale", "user2", "pass3")
accounts = storage.list_accounts("ws1", "user1")
assert len(accounts) == 2
def test_create_reminder(self):
"""Test creating reminder"""
storage = CalendarStorage()
account = storage.create_account(
workspace_id="ws1",
user_id="user1",
provider="radicale",
username="testuser",
password="testpass"
)
reminder = storage.create_reminder(
workspace_id="ws1",
user_id="user1",
account_id=account.id,
event_uid="evt123",
remind_at=(datetime.utcnow() + timedelta(hours=1)).isoformat(),
channel="inapp"
)
assert reminder.id.startswith("rem_")
assert reminder.event_uid == "evt123"
assert reminder.status == "pending"
def test_idempotency_key(self):
"""Test idempotency key storage"""
storage = CalendarStorage()
storage.store_idempotency_key(
key="unique-key-123",
workspace_id="ws1",
user_id="user1",
event_uid="evt123"
)
result = storage.get_by_idempotency_key("unique-key-123")
assert result is not None
assert result["event_uid"] == "evt123"
class TestCalDAVClient:
"""Test CalDAV Client"""
def test_client_init(self):
"""Test client initialization"""
client = CalDAVClient(
server_url="https://caldav.example.com",
username="testuser",
password="testpass"
)
assert client.server_url == "https://caldav.example.com"
assert client.username == "testuser"
assert client.principal_url is None
def test_discover_principal(self):
"""Test principal discovery"""
client = CalDAVClient(
server_url="https://caldav.example.com",
username="testuser",
password="testpass"
)
with patch.object(client, '_request') as mock_request:
mock_response = Mock()
mock_response.status_code = 207
mock_request.return_value = mock_response
principal = client.discover_principal()
assert principal == "/testuser/"
def test_build_vevent(self):
"""Test VEVENT building"""
client = CalDAVClient(
server_url="https://caldav.example.com",
username="testuser",
password="testpass"
)
vevent = client._build_vevent(
uid="test-uid-123",
title="Test Event",
start="2024-01-15T10:00:00",
end="2024-01-15T11:00:00",
timezone="Europe/Kiev",
location="Office",
description="Test description",
attendees=["test@example.com"]
)
assert "BEGIN:VCALENDAR" in vevent
assert "BEGIN:VEVENT" in vevent
assert "SUMMARY:Test Event" in vevent
assert "LOCATION:Office" in vevent
assert "ATTENDEE:mailto:test@example.com" in vevent
assert "UID:test-uid-123" in vevent
def test_parse_vevent(self):
"""Test VEVENT parsing"""
client = CalDAVClient(
server_url="https://caldav.example.com",
username="testuser",
password="testpass"
)
ics_data = """BEGIN:VCALENDAR
VERSION:2.0
BEGIN:VEVENT
UID:test-uid-456
SUMMARY:Parsed Event
DTSTART:20240115T100000
DTEND:20240115T110000
LOCATION:Home
DESCRIPTION:Test description
END:VEVENT
END:VCALENDAR"""
event = client._parse_vevent(ics_data)
assert event["uid"] == "test-uid-456"
assert event["title"] == "Parsed Event"
assert event["location"] == "Home"
class TestCalendarToolEndpoint:
"""Test calendar tool API endpoint"""
@pytest.fixture
def client(self):
"""Test client fixture"""
from fastapi.testclient import TestClient
return TestClient(app)
def test_health_check(self, client):
"""Test health endpoint"""
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_metrics(self, client):
"""Test metrics endpoint"""
response = client.get("/metrics")
assert response.status_code == 200
data = response.json()
assert "accounts_count" in data
assert "reminders_pending" in data
@patch('main.CalDAVClient')
def test_connect_radicale(self, mock_caldav_class, client):
"""Test connecting Radicale account"""
mock_client = Mock()
mock_client.list_calendars.return_value = [
{"id": "default", "display_name": "Default Calendar"}
]
mock_client.principal_url = "/testuser/"
mock_caldav_class.return_value = mock_client
response = client.post(
"/v1/calendar/connect/radicale",
json={
"workspace_id": "ws1",
"user_id": "user1",
"username": "testuser",
"password": "testpass"
}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "connected"
assert "account_id" in data
if __name__ == "__main__":
pytest.main([__file__, "-v"])