New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
163 lines
4.1 KiB
Python
163 lines
4.1 KiB
Python
"""
|
|
Security tests for SafeCodeExecutor
|
|
"""
|
|
|
|
import sys
|
|
sys.path.insert(0, "..")
|
|
|
|
from safe_code_executor import SafeCodeExecutor
|
|
|
|
|
|
def test_fork_bomb_blocked():
|
|
"""Test that fork bombs are blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
# This would cause infinite loop if not properly sandboxed
|
|
code = "while True: pass"
|
|
|
|
result = executor.execute(
|
|
language="python",
|
|
code=code,
|
|
limits={"timeout_ms": 1000}
|
|
)
|
|
|
|
assert result["status"] == "timeout"
|
|
print("✓ Fork bomb / infinite loop blocked by timeout")
|
|
|
|
|
|
def test_memory_exhaustion_blocked():
|
|
"""Test that memory exhaustion is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
# This would try to allocate huge memory
|
|
code = "x = [0] * 1000000000"
|
|
|
|
result = executor.execute(
|
|
language="python",
|
|
code=code,
|
|
limits={"timeout_ms": 3000, "max_memory_mb": 128}
|
|
)
|
|
|
|
# Should fail due to memory limit or timeout
|
|
assert result["status"] in ["timeout", "failed", "killed"]
|
|
print("✓ Memory exhaustion blocked")
|
|
|
|
|
|
def test_read_proc_blocked():
|
|
"""Test that /proc access is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
code = "print(open('/proc/self/environ').read())"
|
|
|
|
result = executor.execute(language="python", code=code)
|
|
|
|
assert result["status"] == "failed"
|
|
print("✓ /proc access blocked")
|
|
|
|
|
|
def test_read_env_blocked():
|
|
"""Test that env access is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
code = "import os; print(os.environ.get('SECRET'))"
|
|
|
|
result = executor.execute(language="python", code=code)
|
|
|
|
assert result["status"] == "failed"
|
|
print("✓ Environment access blocked")
|
|
|
|
|
|
def test_subprocess_blocked():
|
|
"""Test that subprocess is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
code = "import subprocess; subprocess.run(['cat', '/etc/passwd'])"
|
|
|
|
result = executor.execute(language="python", code=code)
|
|
|
|
assert result["status"] == "failed"
|
|
assert "subprocess" in result["error"].lower()
|
|
print("✓ Subprocess blocked")
|
|
|
|
|
|
def test_requests_blocked():
|
|
"""Test that requests library is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
code = "import requests; print(requests.get('http://evil.com'))"
|
|
|
|
result = executor.execute(language="python", code=code)
|
|
|
|
assert result["status"] == "failed"
|
|
print("✓ Requests library blocked")
|
|
|
|
|
|
def test_pickle_blocked():
|
|
"""Test that pickle is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
code = "import pickle; pickle.loads(b'...')"
|
|
|
|
result = executor.execute(language="python", code=code)
|
|
|
|
assert result["status"] == "failed"
|
|
print("✓ Pickle blocked")
|
|
|
|
|
|
def test_yaml_blocked():
|
|
"""Test that yaml is blocked"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
code = "import yaml; yaml.load('!!python/object/apply:os.system', Loader=None)"
|
|
|
|
result = executor.execute(language="python", code=code)
|
|
|
|
assert result["status"] == "failed"
|
|
print("✓ YAML blocked")
|
|
|
|
|
|
def test_output_limit():
|
|
"""Test that output is limited"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
# Generate huge output
|
|
code = "print('x' * 100000)"
|
|
|
|
result = executor.execute(
|
|
language="python",
|
|
code=code,
|
|
limits={"max_stdout_bytes": 1000}
|
|
)
|
|
|
|
# Output should be truncated
|
|
assert len(result.get("stdout", "")) <= 1100
|
|
print("✓ Output limit enforced")
|
|
|
|
|
|
def test_input_validation():
|
|
"""Test language validation"""
|
|
executor = SafeCodeExecutor()
|
|
|
|
result = executor.execute(language="ruby", code="puts 'hello'")
|
|
|
|
assert result["status"] == "failed"
|
|
assert "Unsupported" in result["error"]
|
|
print("✓ Language validation works")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=== Running Security Tests ===\n")
|
|
|
|
test_fork_bomb_blocked()
|
|
test_memory_exhaustion_blocked()
|
|
test_read_proc_blocked()
|
|
test_read_env_blocked()
|
|
test_subprocess_blocked()
|
|
test_requests_blocked()
|
|
test_pickle_blocked()
|
|
test_yaml_blocked()
|
|
test_output_limit()
|
|
test_input_validation()
|
|
|
|
print("\n✅ All security tests passed!")
|