New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
336 lines
10 KiB
Python
336 lines
10 KiB
Python
"""
|
|
Tests for RepoTool - Read-only repository access
|
|
"""
|
|
|
|
import pytest
|
|
import os
|
|
import tempfile
|
|
import asyncio
|
|
from pathlib import Path
|
|
|
|
# Mock the tool_manager imports
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from services.router.tool_manager import ToolManager, ToolResult
|
|
|
|
|
|
class TestRepoToolSecurity:
|
|
"""Test security features of RepoTool"""
|
|
|
|
@pytest.fixture
|
|
def temp_repo(self):
|
|
"""Create a temporary repo for testing"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create structure
|
|
os.makedirs(os.path.join(tmpdir, "src"))
|
|
os.makedirs(os.path.join(tmpdir, "subdir"))
|
|
|
|
# Create files
|
|
with open(os.path.join(tmpdir, "README.md"), "w") as f:
|
|
f.write("# Test Repo\n")
|
|
|
|
with open(os.path.join(tmpdir, "src", "main.py"), "w") as f:
|
|
f.write("print('hello')\n")
|
|
|
|
with open(os.path.join(tmpdir, ".env"), "w") as f:
|
|
f.write("SECRET_KEY=mysecret123\nAPI_TOKEN=testtoken\n")
|
|
|
|
# Create symlink to test escape
|
|
os.symlink("/tmp", os.path.join(tmpdir, "escape_link"))
|
|
|
|
yield tmpdir
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_path_traversal_blocked(self, temp_repo):
|
|
"""Test that path traversal .. is blocked"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
# Try to access file outside repo
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "../../../etc/passwd"
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "traversal" in result.error.lower() or "denied" in result.error.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_symlink_escape_blocked(self, temp_repo):
|
|
"""Test that symlink escape is blocked"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
# Try to access via symlink
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "escape_link/some_file"
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "symlink" in result.error.lower() or "denied" in result.error.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_absolute_path_blocked(self, temp_repo):
|
|
"""Test that absolute paths are blocked"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
# Try absolute path
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "/etc/passwd"
|
|
})
|
|
|
|
assert result.success is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_nonexistent_file(self, temp_repo):
|
|
"""Test reading non-existent file"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "nonexistent.py"
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "does not exist" in result.error.lower()
|
|
|
|
|
|
class TestRepoToolFunctionality:
|
|
"""Test functionality of RepoTool"""
|
|
|
|
@pytest.fixture
|
|
def temp_repo(self):
|
|
"""Create a temporary repo for testing"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create structure
|
|
os.makedirs(os.path.join(tmpdir, "src"))
|
|
os.makedirs(os.path.join(tmpdir, "docs"))
|
|
|
|
# Create files
|
|
with open(os.path.join(tmpdir, "README.md"), "w") as f:
|
|
f.write("# Test Repo\n")
|
|
|
|
with open(os.path.join(tmpdir, "src", "main.py"), "w") as f:
|
|
f.write("def hello():\n print('hello')\n")
|
|
|
|
with open(os.path.join(tmpdir, "docs", "guide.md"), "w") as f:
|
|
f.write("# Guide\nHello world\n")
|
|
|
|
yield tmpdir
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tree_action(self, temp_repo):
|
|
"""Test tree action returns directory structure"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "tree",
|
|
"path": ".",
|
|
"depth": 2
|
|
})
|
|
|
|
assert result.success is True
|
|
assert "tree" in result.result
|
|
assert "src" in result.result["tree"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_action(self, temp_repo):
|
|
"""Test read action returns file content"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "README.md"
|
|
})
|
|
|
|
assert result.success is True
|
|
assert "Test Repo" in result.result["content"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_with_line_limits(self, temp_repo):
|
|
"""Test read with line limits"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "src/main.py",
|
|
"start_line": 1,
|
|
"end_line": 1
|
|
})
|
|
|
|
assert result.success is True
|
|
assert result.result["start_line"] == 1
|
|
assert result.result["end_line"] == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_action(self, temp_repo):
|
|
"""Test search action finds text"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "search",
|
|
"query": "hello",
|
|
"path": "."
|
|
})
|
|
|
|
assert result.success is True
|
|
assert result.result["count"] >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_metadata_action(self, temp_repo):
|
|
"""Test metadata action returns git info"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "metadata",
|
|
"path": "."
|
|
})
|
|
|
|
assert result.success is True
|
|
assert "repo_root" in result.result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_action(self, temp_repo):
|
|
"""Test unknown action returns error"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "unknown_action"
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "unknown" in result.error.lower()
|
|
|
|
|
|
class TestRepoToolSecretMasking:
|
|
"""Test secret masking in RepoTool"""
|
|
|
|
@pytest.fixture
|
|
def temp_repo(self):
|
|
"""Create a temporary repo with secrets"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create env file with secrets
|
|
with open(os.path.join(tmpdir, ".env"), "w") as f:
|
|
f.write("""SECRET_KEY=mysecret123
|
|
API_TOKEN=abc123def456
|
|
DATABASE_PASSWORD=pass123
|
|
JWT_BEARER=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0
|
|
""")
|
|
|
|
# Create regular file with secrets inline
|
|
with open(os.path.join(tmpdir, "config.py"), "w") as f:
|
|
f.write("""# Regular config
|
|
API_KEY = "regular-key-123"
|
|
SECRET = "regular-secret"
|
|
""")
|
|
|
|
yield tmpdir
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_env_file_masked(self, temp_repo):
|
|
"""Test that .env files are completely masked"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": ".env"
|
|
})
|
|
|
|
assert result.success is True
|
|
assert result.result.get("masked") is True
|
|
assert "MASKED" in result.result["content"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inline_secrets_masked(self, temp_repo):
|
|
"""Test that inline secrets are masked"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "config.py"
|
|
})
|
|
|
|
assert result.success is True
|
|
content = result.result["content"]
|
|
|
|
# Secrets should be masked
|
|
assert "mysecret123" not in content
|
|
assert "abc123def456" not in content
|
|
assert "pass123" not in content
|
|
assert "***" in content or "MASKED" in content
|
|
|
|
|
|
class TestRepoToolLimits:
|
|
"""Test limits in RepoTool"""
|
|
|
|
@pytest.fixture
|
|
def temp_repo(self):
|
|
"""Create a temporary repo"""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create large file
|
|
with open(os.path.join(tmpdir, "large.txt"), "w") as f:
|
|
f.write("x" * 300000) # 300KB
|
|
|
|
yield tmpdir
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_bytes_limit(self, temp_repo):
|
|
"""Test that large files are rejected"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "read",
|
|
"path": "large.txt",
|
|
"max_bytes": 1000 # 1KB limit
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "too large" in result.error.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_depth_limit(self, temp_repo):
|
|
"""Test that depth is limited"""
|
|
os.environ["REPO_ROOT"] = temp_repo
|
|
|
|
tool_mgr = ToolManager({})
|
|
|
|
# Try depth 100 (should be capped to 10)
|
|
result = await tool_mgr._repo_tool({
|
|
"action": "tree",
|
|
"path": ".",
|
|
"depth": 100
|
|
})
|
|
|
|
assert result.success is True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|