""" Tests for RepoTool - Read-only repository access """ import pytest import os import tempfile import asyncio from pathlib import Path # Mock the tool_manager imports import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from services.router.tool_manager import ToolManager, ToolResult class TestRepoToolSecurity: """Test security features of RepoTool""" @pytest.fixture def temp_repo(self): """Create a temporary repo for testing""" with tempfile.TemporaryDirectory() as tmpdir: # Create structure os.makedirs(os.path.join(tmpdir, "src")) os.makedirs(os.path.join(tmpdir, "subdir")) # Create files with open(os.path.join(tmpdir, "README.md"), "w") as f: f.write("# Test Repo\n") with open(os.path.join(tmpdir, "src", "main.py"), "w") as f: f.write("print('hello')\n") with open(os.path.join(tmpdir, ".env"), "w") as f: f.write("SECRET_KEY=mysecret123\nAPI_TOKEN=testtoken\n") # Create symlink to test escape os.symlink("/tmp", os.path.join(tmpdir, "escape_link")) yield tmpdir @pytest.mark.asyncio async def test_path_traversal_blocked(self, temp_repo): """Test that path traversal .. is blocked""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) # Try to access file outside repo result = await tool_mgr._repo_tool({ "action": "read", "path": "../../../etc/passwd" }) assert result.success is False assert "traversal" in result.error.lower() or "denied" in result.error.lower() @pytest.mark.asyncio async def test_symlink_escape_blocked(self, temp_repo): """Test that symlink escape is blocked""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) # Try to access via symlink result = await tool_mgr._repo_tool({ "action": "read", "path": "escape_link/some_file" }) assert result.success is False assert "symlink" in result.error.lower() or "denied" in result.error.lower() @pytest.mark.asyncio async def test_absolute_path_blocked(self, temp_repo): """Test that absolute paths are blocked""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) # Try absolute path result = await tool_mgr._repo_tool({ "action": "read", "path": "/etc/passwd" }) assert result.success is False @pytest.mark.asyncio async def test_read_nonexistent_file(self, temp_repo): """Test reading non-existent file""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "read", "path": "nonexistent.py" }) assert result.success is False assert "does not exist" in result.error.lower() class TestRepoToolFunctionality: """Test functionality of RepoTool""" @pytest.fixture def temp_repo(self): """Create a temporary repo for testing""" with tempfile.TemporaryDirectory() as tmpdir: # Create structure os.makedirs(os.path.join(tmpdir, "src")) os.makedirs(os.path.join(tmpdir, "docs")) # Create files with open(os.path.join(tmpdir, "README.md"), "w") as f: f.write("# Test Repo\n") with open(os.path.join(tmpdir, "src", "main.py"), "w") as f: f.write("def hello():\n print('hello')\n") with open(os.path.join(tmpdir, "docs", "guide.md"), "w") as f: f.write("# Guide\nHello world\n") yield tmpdir @pytest.mark.asyncio async def test_tree_action(self, temp_repo): """Test tree action returns directory structure""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "tree", "path": ".", "depth": 2 }) assert result.success is True assert "tree" in result.result assert "src" in result.result["tree"] @pytest.mark.asyncio async def test_read_action(self, temp_repo): """Test read action returns file content""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "read", "path": "README.md" }) assert result.success is True assert "Test Repo" in result.result["content"] @pytest.mark.asyncio async def test_read_with_line_limits(self, temp_repo): """Test read with line limits""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "read", "path": "src/main.py", "start_line": 1, "end_line": 1 }) assert result.success is True assert result.result["start_line"] == 1 assert result.result["end_line"] == 1 @pytest.mark.asyncio async def test_search_action(self, temp_repo): """Test search action finds text""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "search", "query": "hello", "path": "." }) assert result.success is True assert result.result["count"] >= 1 @pytest.mark.asyncio async def test_metadata_action(self, temp_repo): """Test metadata action returns git info""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "metadata", "path": "." }) assert result.success is True assert "repo_root" in result.result @pytest.mark.asyncio async def test_unknown_action(self, temp_repo): """Test unknown action returns error""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "unknown_action" }) assert result.success is False assert "unknown" in result.error.lower() class TestRepoToolSecretMasking: """Test secret masking in RepoTool""" @pytest.fixture def temp_repo(self): """Create a temporary repo with secrets""" with tempfile.TemporaryDirectory() as tmpdir: # Create env file with secrets with open(os.path.join(tmpdir, ".env"), "w") as f: f.write("""SECRET_KEY=mysecret123 API_TOKEN=abc123def456 DATABASE_PASSWORD=pass123 JWT_BEARER=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0 """) # Create regular file with secrets inline with open(os.path.join(tmpdir, "config.py"), "w") as f: f.write("""# Regular config API_KEY = "regular-key-123" SECRET = "regular-secret" """) yield tmpdir @pytest.mark.asyncio async def test_env_file_masked(self, temp_repo): """Test that .env files are completely masked""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "read", "path": ".env" }) assert result.success is True assert result.result.get("masked") is True assert "MASKED" in result.result["content"] @pytest.mark.asyncio async def test_inline_secrets_masked(self, temp_repo): """Test that inline secrets are masked""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "read", "path": "config.py" }) assert result.success is True content = result.result["content"] # Secrets should be masked assert "mysecret123" not in content assert "abc123def456" not in content assert "pass123" not in content assert "***" in content or "MASKED" in content class TestRepoToolLimits: """Test limits in RepoTool""" @pytest.fixture def temp_repo(self): """Create a temporary repo""" with tempfile.TemporaryDirectory() as tmpdir: # Create large file with open(os.path.join(tmpdir, "large.txt"), "w") as f: f.write("x" * 300000) # 300KB yield tmpdir @pytest.mark.asyncio async def test_max_bytes_limit(self, temp_repo): """Test that large files are rejected""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) result = await tool_mgr._repo_tool({ "action": "read", "path": "large.txt", "max_bytes": 1000 # 1KB limit }) assert result.success is False assert "too large" in result.error.lower() @pytest.mark.asyncio async def test_depth_limit(self, temp_repo): """Test that depth is limited""" os.environ["REPO_ROOT"] = temp_repo tool_mgr = ToolManager({}) # Try depth 100 (should be capped to 10) result = await tool_mgr._repo_tool({ "action": "tree", "path": ".", "depth": 100 }) assert result.success is True if __name__ == "__main__": pytest.main([__file__, "-v"])