Files
microdao-daarion/tools/secure_vault/tests/test_rotate.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

119 lines
3.4 KiB
Python

"""
Test 2: Master key rotation
Demonstrates:
- Store credentials with old key
- Rotate to new master key
- Verify credentials still accessible
- Check key version
"""
import os
import sys
import tempfile
import shutil
temp_dir = tempfile.mkdtemp()
os.environ["VAULT_DIR"] = temp_dir
os.environ["VAULT_AUDIT_LOG_DIR"] = temp_dir
from secure_vault import SecureVault
def test_key_rotation():
"""Test master key rotation"""
print("=== Test: Master Key Rotation ===\n")
# Initialize vault with first key
print("1. Initializing vault with key v1...")
vault = SecureVault()
vault.init_vault("old-master-password")
# Store some credentials
print("\n2. Storing credentials with old key...")
vault.store("sofiia", "aws", "access_key", "AKIAIOSFODNN7EXAMPLE")
vault.store("sofiia", "aws", "secret_key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
vault.store("sofiia", "github", "token", "ghp_xxxxxxxxxxxx")
print(" - Stored AWS and GitHub credentials")
# Check key version
print("\n3. Checking key version...")
meta_file = os.path.join(temp_dir, ".vault_master.key.meta")
with open(meta_file) as f:
meta = eval(f.read())
print(f" Key version: {meta['key_version']}")
# Rotate to new key
print("\n4. Rotating to new master key...")
result = vault.rotate_master_key("new-master-password")
print(f" Status: {result['status']}")
print(f" New version: {result['key_version']}")
print(f" Re-encrypted: {result['credentials_reencrypted']} creds")
# Verify credentials still accessible
print("\n5. Verifying credentials after rotation...")
aws_key = vault.get("sofiia", "aws", "access_key")
print(f" AWS access key: {aws_key[:10]}...")
github_token = vault.get("sofiia", "github", "token")
print(f" GitHub token: {github_token}")
# Check new key metadata
print("\n6. Checking new key metadata...")
with open(meta_file) as f:
meta = eval(f.read())
print(f" New key version: {meta['key_version']}")
print(f" Rotation due: {meta['rotation_due'][:10]}...")
# List all services
print("\n7. Listing all services...")
services = vault.list("sofiia")
print(f" Services: {services}")
# Clean up
shutil.rmtree(temp_dir)
print("\n✅ Key rotation test passed!")
return True
def test_check_expiring():
"""Test expiring credentials"""
print("\n=== Test: Expiring Credentials ===\n")
vault = SecureVault()
vault.init_vault("test-password")
# Store with short TTL
print("1. Storing credentials with TTL...")
vault.store("sofiia", "test", "temp_token", "abc123", ttl_seconds=1)
print(" - Stored token with 1 second TTL")
# Check immediately
print("\n2. Checking before expiry...")
token = vault.get("sofiia", "test", "temp_token")
print(f" Token found: {token is not None}")
# Wait for expiry
print("\n3. Waiting for expiry...")
import time
time.sleep(2)
# Check after expiry
print("\n4. Checking after expiry...")
token = vault.get("sofiia", "test", "temp_token")
print(f" Token found: {token is not None} (should be False)")
# Clean up
shutil.rmtree(temp_dir)
print("\n✅ Expiring credentials test passed!")
return True
if __name__ == "__main__":
test_key_rotation()
test_check_expiring()