New router intelligence modules (26 files): alert_ingest/store, audit_store, architecture_pressure, backlog_generator/store, cost_analyzer, data_governance, dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment, platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files), signature_state_store, sofiia_auto_router, tool_governance New services: - sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static - memory-service: integration_endpoints, integrations, voice_endpoints, static UI - aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents) - sofiia-supervisor: new supervisor service - aistalk-bridge-lite: Telegram bridge lite - calendar-service: CalDAV calendar service with reminders - mlx-stt-service / mlx-tts-service: Apple Silicon speech services - binance-bot-monitor: market monitor service - node-worker: STT/TTS memory providers New tools (9): agent_email, browser_tool, contract_tool, observability_tool, oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus, farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine, session_context, style_adapter, telemetry) Tests: 85+ test files for all new modules Made-with: Cursor
844 lines
33 KiB
Python
844 lines
33 KiB
Python
"""
|
|
Tests for dependency_scanner.py
|
|
|
|
Uses tempfile.TemporaryDirectory fixtures — no dependency on the real repo.
|
|
All tests are self-contained and deterministic.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Dict, Any
|
|
from unittest.mock import MagicMock, AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
# ─── Path bootstrap ───────────────────────────────────────────────────────────
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "services" / "router"))
|
|
|
|
from dependency_scanner import (
|
|
scan_dependencies,
|
|
scan_dependencies_dict,
|
|
_parse_poetry_lock,
|
|
_parse_pipfile_lock,
|
|
_parse_requirements_txt,
|
|
_parse_pyproject_toml,
|
|
_parse_package_lock_json,
|
|
_parse_pnpm_lock,
|
|
_parse_yarn_lock,
|
|
_parse_package_json,
|
|
_compare_versions,
|
|
_normalize_pkg_name,
|
|
_redact,
|
|
ECOSYSTEM_PYPI,
|
|
ECOSYSTEM_NPM,
|
|
)
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def make_cache(entries: Dict[str, Any]) -> Dict:
|
|
"""Build an osv_cache.json-compatible dict."""
|
|
return {"version": 1, "updated_at": "2026-01-01T00:00:00+00:00", "entries": entries}
|
|
|
|
|
|
def write_cache(tmpdir: str, entries: Dict[str, Any]) -> str:
|
|
cache_path = os.path.join(tmpdir, "osv_cache.json")
|
|
with open(cache_path, "w") as f:
|
|
json.dump(make_cache(entries), f)
|
|
return cache_path
|
|
|
|
|
|
def vuln_entry(osv_id: str, pkg: str, ecosystem: str, severity: str, fixed: str) -> Dict:
|
|
"""Build a minimal OSV vuln object."""
|
|
return {
|
|
"id": osv_id,
|
|
"aliases": [f"CVE-2024-{osv_id[-4:]}"],
|
|
"database_specific": {"severity": severity},
|
|
"summary": f"Test vuln in {pkg}",
|
|
"affected": [
|
|
{
|
|
"package": {"name": pkg, "ecosystem": ecosystem},
|
|
"ranges": [
|
|
{"type": "ECOSYSTEM", "events": [{"introduced": "0"}, {"fixed": fixed}]}
|
|
],
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
# ─── Unit: version comparison ─────────────────────────────────────────────────
|
|
|
|
class TestVersionCompare:
|
|
def test_equal(self):
|
|
assert _compare_versions("1.2.3", "1.2.3") == 0
|
|
|
|
def test_newer(self):
|
|
assert _compare_versions("2.0.0", "1.9.9") > 0
|
|
|
|
def test_older(self):
|
|
assert _compare_versions("1.0.0", "2.0.0") < 0
|
|
|
|
def test_patch(self):
|
|
assert _compare_versions("4.17.21", "4.17.20") > 0
|
|
|
|
def test_semver_prerelease_digit_appended(self):
|
|
# Simple parser treats "1.0.0b1" as [1,0,0,1] vs [1,0,0] → 1 > 0
|
|
# This is a known limitation of the simple parser; full semver is not required
|
|
result = _compare_versions("1.0.0b1", "1.0.0")
|
|
assert result >= 0 # beta suffix appended as digit, not less than release
|
|
|
|
def test_normalize_name(self):
|
|
assert _normalize_pkg_name("Requests_lib") == "requests-lib"
|
|
assert _normalize_pkg_name("PyYAML") == "pyyaml"
|
|
|
|
|
|
# ─── Unit: secret redaction ───────────────────────────────────────────────────
|
|
|
|
class TestRedact:
|
|
def test_masks_api_key(self):
|
|
assert "***REDACTED***" in _redact("api_key = 'abc12345678'")
|
|
|
|
def test_masks_token(self):
|
|
assert "***REDACTED***" in _redact("token: Bearer eyJhbGciOiJIUzI1NiJ9")
|
|
|
|
def test_leaves_clean_text(self):
|
|
text = "requests==2.31.0"
|
|
assert _redact(text) == text
|
|
|
|
|
|
# ─── Unit: Python parsers ─────────────────────────────────────────────────────
|
|
|
|
class TestPoetryLockParser:
|
|
def test_basic_parse(self):
|
|
content = '''
|
|
[[package]]
|
|
name = "requests"
|
|
version = "2.31.0"
|
|
description = "HTTP library"
|
|
optional = false
|
|
|
|
[[package]]
|
|
name = "pyyaml"
|
|
version = "6.0.1"
|
|
description = "YAML library"
|
|
optional = false
|
|
'''
|
|
pkgs = _parse_poetry_lock(content, "poetry.lock")
|
|
assert len(pkgs) == 2
|
|
names = {p.name for p in pkgs}
|
|
assert "requests" in names
|
|
assert "pyyaml" in names
|
|
assert all(p.ecosystem == ECOSYSTEM_PYPI for p in pkgs)
|
|
assert all(p.pinned for p in pkgs)
|
|
|
|
def test_empty_content(self):
|
|
assert _parse_poetry_lock("", "poetry.lock") == []
|
|
|
|
def test_version_extracted(self):
|
|
content = '[[package]]\nname = "fastapi"\nversion = "0.104.1"\n'
|
|
pkgs = _parse_poetry_lock(content, "poetry.lock")
|
|
assert pkgs[0].version == "0.104.1"
|
|
|
|
|
|
class TestPipfileLockParser:
|
|
def test_basic_parse(self):
|
|
data = {
|
|
"default": {
|
|
"requests": {"version": "==2.31.0"},
|
|
"flask": {"version": "==2.3.0"},
|
|
},
|
|
"develop": {
|
|
"pytest": {"version": "==7.4.0"},
|
|
}
|
|
}
|
|
pkgs = _parse_pipfile_lock(json.dumps(data), "Pipfile.lock")
|
|
names = {p.name: p.version for p in pkgs}
|
|
assert names["requests"] == "2.31.0"
|
|
assert names["flask"] == "2.3.0"
|
|
assert names["pytest"] == "7.4.0"
|
|
|
|
def test_invalid_json_returns_empty(self):
|
|
assert _parse_pipfile_lock("not json", "Pipfile.lock") == []
|
|
|
|
|
|
class TestRequirementsTxtParser:
|
|
def test_pinned_extracted(self):
|
|
content = "requests==2.31.0\nflask==2.3.0\n# comment\n"
|
|
pkgs = _parse_requirements_txt(content, "requirements.txt")
|
|
pinned = {p.name: p.version for p in pkgs if p.pinned}
|
|
assert pinned["requests"] == "2.31.0"
|
|
assert pinned["flask"] == "2.3.0"
|
|
|
|
def test_unpinned_recorded_but_no_version(self):
|
|
content = "requests>=2.28.0\n"
|
|
pkgs = _parse_requirements_txt(content, "requirements.txt")
|
|
assert len(pkgs) == 1
|
|
assert pkgs[0].version == ""
|
|
assert not pkgs[0].pinned
|
|
|
|
def test_extras_stripped(self):
|
|
content = "uvicorn[standard]==0.24.0\n"
|
|
pkgs = _parse_requirements_txt(content, "requirements.txt")
|
|
pinned = [p for p in pkgs if p.pinned]
|
|
assert len(pinned) == 1
|
|
assert pinned[0].name == "uvicorn"
|
|
assert pinned[0].version == "0.24.0"
|
|
|
|
def test_git_and_comment_skipped(self):
|
|
content = "# comment\n-r other.txt\ngit+https://github.com/foo/bar.git\n"
|
|
pkgs = _parse_requirements_txt(content, "requirements.txt")
|
|
assert pkgs == []
|
|
|
|
def test_deduplication(self):
|
|
content = "requests==2.31.0\nrequests==2.31.0\n"
|
|
pkgs = _parse_requirements_txt(content, "requirements.txt")
|
|
assert len([p for p in pkgs if p.name == "requests"]) == 1
|
|
|
|
|
|
class TestPyprojectParser:
|
|
def test_poetry_deps(self):
|
|
content = """
|
|
[tool.poetry.dependencies]
|
|
python = "^3.11"
|
|
fastapi = "^0.104"
|
|
pydantic = "^2.5"
|
|
"""
|
|
pkgs = _parse_pyproject_toml(content, "pyproject.toml")
|
|
names = {p.name for p in pkgs}
|
|
assert "fastapi" in names
|
|
assert "pydantic" in names
|
|
assert "python" not in names
|
|
|
|
def test_no_deps_section_returns_empty(self):
|
|
assert _parse_pyproject_toml("[build-system]\n", "pyproject.toml") == []
|
|
|
|
|
|
# ─── Unit: Node parsers ───────────────────────────────────────────────────────
|
|
|
|
class TestPackageLockParser:
|
|
def test_v2_format(self):
|
|
data = {
|
|
"lockfileVersion": 2,
|
|
"packages": {
|
|
"": {"name": "my-app"},
|
|
"node_modules/lodash": {"version": "4.17.20", "resolved": "https://..."},
|
|
"node_modules/axios": {"version": "1.7.2", "resolved": "https://..."},
|
|
}
|
|
}
|
|
pkgs = _parse_package_lock_json(json.dumps(data), "package-lock.json")
|
|
names = {p.name: p.version for p in pkgs}
|
|
assert names["lodash"] == "4.17.20"
|
|
assert names["axios"] == "1.7.2"
|
|
# Root package skipped
|
|
assert "" not in names
|
|
|
|
def test_v1_fallback(self):
|
|
data = {
|
|
"lockfileVersion": 1,
|
|
"dependencies": {
|
|
"lodash": {"version": "4.17.21"},
|
|
}
|
|
}
|
|
pkgs = _parse_package_lock_json(json.dumps(data), "package-lock.json")
|
|
assert pkgs[0].version == "4.17.21"
|
|
|
|
def test_scoped_package(self):
|
|
data = {
|
|
"lockfileVersion": 2,
|
|
"packages": {
|
|
"node_modules/@babel/core": {"version": "7.23.0"},
|
|
}
|
|
}
|
|
pkgs = _parse_package_lock_json(json.dumps(data), "package-lock.json")
|
|
assert pkgs[0].name == "@babel/core"
|
|
|
|
|
|
class TestPnpmLockParser:
|
|
def test_basic_parse(self):
|
|
content = "/lodash@4.17.21:\n resolution: {integrity: sha512-xxx}\n dev: false\n"
|
|
pkgs = _parse_pnpm_lock(content, "pnpm-lock.yaml")
|
|
assert pkgs[0].name == "lodash"
|
|
assert pkgs[0].version == "4.17.21"
|
|
|
|
|
|
class TestYarnLockParser:
|
|
def test_basic_parse(self):
|
|
content = '''lodash@^4.17.11:
|
|
version "4.17.21"
|
|
resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.21.tgz"
|
|
integrity sha512-xxx
|
|
|
|
axios@^1.7.2:
|
|
version "1.7.2"
|
|
resolved "https://registry.yarnpkg.com/axios/-/axios-1.7.2.tgz"
|
|
integrity sha512-yyy
|
|
'''
|
|
pkgs = _parse_yarn_lock(content, "yarn.lock")
|
|
names = {p.name: p.version for p in pkgs}
|
|
assert names["lodash"] == "4.17.21"
|
|
assert names["axios"] == "1.7.2"
|
|
|
|
def test_deduplication(self):
|
|
content = '''lodash@^4.17.11, lodash@^4.17.4:
|
|
version "4.17.21"
|
|
resolved "https://..."
|
|
integrity sha512-xxx
|
|
'''
|
|
pkgs = _parse_yarn_lock(content, "yarn.lock")
|
|
lodash_entries = [p for p in pkgs if p.name == "lodash"]
|
|
assert len(lodash_entries) == 1
|
|
|
|
|
|
class TestPackageJsonParser:
|
|
def test_deps_and_dev_deps(self):
|
|
data = {
|
|
"dependencies": {"axios": "^1.7.2", "nats": "^2.28.2"},
|
|
"devDependencies": {"jest": "^29.0.0"}
|
|
}
|
|
pkgs = _parse_package_json(json.dumps(data), "package.json")
|
|
names = {p.name for p in pkgs}
|
|
assert {"axios", "nats", "jest"}.issubset(names)
|
|
assert all(p.version == "" for p in pkgs)
|
|
|
|
|
|
# ─── Integration: scan with offline cache ─────────────────────────────────────
|
|
|
|
class TestScanWithOfflineCache:
|
|
def test_python_pinned_high_vuln_fails(self):
|
|
"""requirements.txt with pinned requests; cache has HIGH vuln → pass=False."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
reqs = os.path.join(tmpdir, "requirements.txt")
|
|
Path(reqs).write_text("requests==2.28.0\n")
|
|
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:requests:2.28.0": {
|
|
"vulns": [vuln_entry("GHSA-001", "requests", "PyPI", "HIGH", "2.31.0")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
severity_thresholds={"fail_on": ["CRITICAL", "HIGH"], "warn_on": ["MEDIUM"]},
|
|
)
|
|
assert result.pass_ is False
|
|
assert result.stats["vulns_total"] == 1
|
|
assert result.stats["by_severity"]["HIGH"] == 1
|
|
assert len(result.vulnerabilities) == 1
|
|
assert result.vulnerabilities[0]["package"] == "requests"
|
|
assert "2.31.0" in result.vulnerabilities[0]["recommendation"]
|
|
|
|
def test_python_critical_vuln_fails(self):
|
|
"""CRITICAL vulnerability must fail the gate."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text(
|
|
"cryptography==38.0.0\n"
|
|
)
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:cryptography:38.0.0": {
|
|
"vulns": [vuln_entry("GHSA-CRIT-001", "cryptography", "PyPI", "CRITICAL", "41.0.6")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert result.pass_ is False
|
|
assert result.stats["by_severity"]["CRITICAL"] == 1
|
|
|
|
def test_python_medium_only_passes(self):
|
|
"""MEDIUM vuln should not block release (not in fail_on)."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text(
|
|
"pyyaml==5.3.0\n"
|
|
)
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:pyyaml:5.3.0": {
|
|
"vulns": [vuln_entry("GHSA-MED-001", "pyyaml", "PyPI", "MEDIUM", "6.0")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
severity_thresholds={"fail_on": ["CRITICAL", "HIGH"], "warn_on": ["MEDIUM"]},
|
|
)
|
|
assert result.pass_ is True
|
|
assert result.stats["by_severity"]["MEDIUM"] == 1
|
|
# MEDIUM should appear in recommendations
|
|
recs_text = " ".join(result.recommendations).lower()
|
|
assert "medium" in recs_text
|
|
|
|
def test_node_package_lock_high_fails(self):
|
|
"""package-lock.json with lodash@4.17.20; cache has HIGH → fail."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
lock_data = {
|
|
"lockfileVersion": 2,
|
|
"packages": {
|
|
"node_modules/lodash": {"version": "4.17.20"}
|
|
}
|
|
}
|
|
Path(os.path.join(tmpdir, "package-lock.json")).write_text(
|
|
json.dumps(lock_data)
|
|
)
|
|
cache_path = write_cache(tmpdir, {
|
|
"npm:lodash:4.17.20": {
|
|
"vulns": [vuln_entry("GHSA-NPM-001", "lodash", "npm", "HIGH", "4.17.21")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["node"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert result.pass_ is False
|
|
assert result.stats["vulns_total"] == 1
|
|
|
|
def test_cache_miss_unknown_severity_passes_by_default(self):
|
|
"""Dep exists but has no cache entry → severity UNKNOWN → pass=True (not in fail_on)."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text(
|
|
"newlib==1.0.0\n"
|
|
)
|
|
cache_path = write_cache(tmpdir, {}) # empty cache
|
|
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
severity_thresholds={"fail_on": ["CRITICAL", "HIGH"], "warn_on": ["MEDIUM"]},
|
|
)
|
|
assert result.pass_ is True
|
|
assert result.stats["deps_unresolved"] == 1
|
|
# Should recommend populating cache
|
|
recs = " ".join(result.recommendations)
|
|
assert "cache" in recs.lower() or "Update cache" in recs
|
|
|
|
def test_no_pinned_deps_passes_with_recommendation(self):
|
|
"""Unpinned deps cannot be checked → pass=True + recommendation to pin."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text(
|
|
"requests>=2.28.0\nflask>=2.0\n"
|
|
)
|
|
cache_path = write_cache(tmpdir, {})
|
|
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert result.pass_ is True
|
|
recs = " ".join(result.recommendations).lower()
|
|
assert "unpinned" in recs or "pin" in recs
|
|
|
|
def test_no_vulns_passes_cleanly(self):
|
|
"""All deps in cache with empty vuln list → clean pass."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text(
|
|
"requests==2.31.0\npyyaml==6.0.1\n"
|
|
)
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:requests:2.31.0": {"vulns": [], "cached_at": "2026-01-01T00:00:00+00:00"},
|
|
"PyPI:pyyaml:6.0.1": {"vulns": [], "cached_at": "2026-01-01T00:00:00+00:00"},
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir,
|
|
targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert result.pass_ is True
|
|
assert result.stats["vulns_total"] == 0
|
|
assert result.recommendations == []
|
|
|
|
|
|
class TestScanStructure:
|
|
def test_stats_structure(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text("flask==2.3.0\n")
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:flask:2.3.0": {"vulns": [], "cached_at": "2026-01-01T00:00:00+00:00"},
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
stats = result.stats
|
|
assert "ecosystems" in stats
|
|
assert "files_scanned" in stats
|
|
assert "deps_total" in stats
|
|
assert "deps_pinned" in stats
|
|
assert "vulns_total" in stats
|
|
assert "by_severity" in stats
|
|
assert set(stats["by_severity"].keys()) >= {"CRITICAL", "HIGH", "MEDIUM", "LOW", "UNKNOWN"}
|
|
|
|
def test_vuln_object_structure(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text("vuln-pkg==1.0.0\n")
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:vuln-pkg:1.0.0": {
|
|
"vulns": [vuln_entry("GHSA-TEST-001", "vuln-pkg", "PyPI", "HIGH", "2.0.0")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert result.vulnerabilities
|
|
v = result.vulnerabilities[0]
|
|
required_keys = {"id", "ecosystem", "package", "version", "severity",
|
|
"fixed_versions", "aliases", "evidence", "recommendation"}
|
|
assert required_keys.issubset(v.keys())
|
|
|
|
def test_scan_dependencies_dict_wrapper(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text("requests==2.31.0\n")
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:requests:2.31.0": {"vulns": [], "cached_at": "2026-01-01T00:00:00+00:00"},
|
|
})
|
|
d = scan_dependencies_dict(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert isinstance(d, dict)
|
|
assert "pass" in d
|
|
assert "summary" in d
|
|
assert "stats" in d
|
|
assert "vulnerabilities" in d
|
|
assert "recommendations" in d
|
|
|
|
|
|
class TestPoetryLockIntegration:
|
|
def test_poetry_lock_full_scan(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
content = """[[package]]
|
|
name = "requests"
|
|
version = "2.31.0"
|
|
description = "Python HTTP"
|
|
optional = false
|
|
|
|
[[package]]
|
|
name = "cryptography"
|
|
version = "41.0.0"
|
|
description = "Crypto"
|
|
optional = false
|
|
"""
|
|
Path(os.path.join(tmpdir, "poetry.lock")).write_text(content)
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:requests:2.31.0": {"vulns": [], "cached_at": "2026-01-01T00:00:00+00:00"},
|
|
"PyPI:cryptography:41.0.0": {
|
|
"vulns": [vuln_entry("GHSA-CRYPTO", "cryptography", "PyPI", "HIGH", "42.0.0")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
assert result.pass_ is False
|
|
assert result.stats["deps_pinned"] == 2
|
|
assert result.stats["vulns_total"] == 1
|
|
|
|
|
|
class TestOutdatedAnalysis:
|
|
def test_outdated_detected_from_fixed_version(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text("pyyaml==5.4.1\n")
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:pyyaml:5.4.1": {
|
|
"vulns": [vuln_entry("GHSA-YAML", "pyyaml", "PyPI", "MEDIUM", "6.0")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
outdated_cfg={"enabled": True, "mode": "lockfile_only"},
|
|
)
|
|
assert result.stats["outdated_total"] == 1
|
|
assert result.outdated[0]["package"] == "pyyaml"
|
|
assert result.outdated[0]["current"] == "5.4.1"
|
|
assert result.outdated[0]["latest"] == "6.0"
|
|
|
|
def test_outdated_disabled(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text("pyyaml==5.4.1\n")
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:pyyaml:5.4.1": {
|
|
"vulns": [vuln_entry("GHSA-YAML", "pyyaml", "PyPI", "MEDIUM", "6.0")],
|
|
"cached_at": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
outdated_cfg={"enabled": False},
|
|
)
|
|
assert result.stats["outdated_total"] == 0
|
|
assert result.outdated == []
|
|
|
|
|
|
class TestExcludedPaths:
|
|
def test_node_modules_excluded(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Real requirements.txt
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text("requests==2.31.0\n")
|
|
# Fake vuln in node_modules (should be skipped)
|
|
nm = os.path.join(tmpdir, "node_modules", "some-pkg")
|
|
os.makedirs(nm)
|
|
Path(os.path.join(nm, "requirements.txt")).write_text("evil-pkg==0.0.1\n")
|
|
|
|
cache_path = write_cache(tmpdir, {
|
|
"PyPI:requests:2.31.0": {"vulns": [], "cached_at": "2026-01-01T00:00:00+00:00"},
|
|
})
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
)
|
|
pkg_names = [v["package"] for v in result.vulnerabilities]
|
|
assert "evil-pkg" not in pkg_names
|
|
|
|
|
|
class TestLimitsEnforced:
|
|
def test_max_deps_truncation(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# 10 pinned deps, limit to 3
|
|
lines = "\n".join(f"pkg{i}=={i}.0.0" for i in range(10))
|
|
Path(os.path.join(tmpdir, "requirements.txt")).write_text(lines + "\n")
|
|
cache_path = write_cache(tmpdir, {})
|
|
|
|
result = scan_dependencies(
|
|
repo_root=tmpdir, targets=["python"],
|
|
vuln_sources={"osv": {"enabled": True, "mode": "offline_cache",
|
|
"cache_path": cache_path}},
|
|
limits={"max_files": 80, "max_deps": 3, "max_vulns": 500},
|
|
)
|
|
assert result.stats["deps_total"] <= 3
|
|
|
|
|
|
# ─── Integration: release_check with dependency_scan gate ─────────────────────
|
|
|
|
def _run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
class TestReleaseCheckWithDeps:
|
|
"""Ensure the dependency_scan gate correctly influences release_check."""
|
|
|
|
def _make_tool_manager(self, scan_result: Dict) -> MagicMock:
|
|
"""Build a minimal tool_manager mock for release_check tests."""
|
|
tm = MagicMock()
|
|
|
|
async def execute_tool(tool_name, args, **kwargs):
|
|
result = MagicMock()
|
|
if tool_name == "dependency_scanner_tool":
|
|
result.success = True
|
|
result.result = scan_result
|
|
result.error = None
|
|
else:
|
|
# Other tools: pass by default
|
|
result.success = True
|
|
result.result = {
|
|
"status": "pass",
|
|
"findings": [],
|
|
"summary": "ok",
|
|
"verdict": "PASS",
|
|
"pass": True,
|
|
}
|
|
result.error = None
|
|
return result
|
|
|
|
tm.execute_tool = execute_tool
|
|
return tm
|
|
|
|
def test_dep_scan_fail_blocks_release(self):
|
|
"""HIGH vuln in deps → release_check pass=False."""
|
|
from release_check_runner import run_release_check
|
|
|
|
scan_fail = {
|
|
"pass": False,
|
|
"summary": "❌ HIGH vuln found",
|
|
"stats": {
|
|
"vulns_total": 1,
|
|
"by_severity": {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 0, "LOW": 0, "UNKNOWN": 0},
|
|
"deps_total": 10,
|
|
},
|
|
"vulnerabilities": [{
|
|
"id": "GHSA-TEST",
|
|
"package": "requests",
|
|
"version": "2.28.0",
|
|
"severity": "HIGH",
|
|
"fixed_versions": ["2.31.0"],
|
|
"recommendation": "Upgrade requests to 2.31.0",
|
|
"aliases": [],
|
|
"evidence": {},
|
|
}],
|
|
"outdated": [],
|
|
"licenses": [],
|
|
"recommendations": ["Upgrade requests to 2.31.0"],
|
|
}
|
|
tm = self._make_tool_manager(scan_fail)
|
|
report = _run(run_release_check(tm, {
|
|
"service_name": "router",
|
|
"diff": "--- a/req.txt\n+++ b/req.txt\n",
|
|
"run_deps": True,
|
|
"deps_vuln_mode": "offline_cache",
|
|
}, agent_id="sofiia"))
|
|
|
|
assert report["pass"] is False
|
|
gate_names = [g["name"] for g in report["gates"]]
|
|
assert "dependency_scan" in gate_names
|
|
dep_gate = next(g for g in report["gates"] if g["name"] == "dependency_scan")
|
|
assert dep_gate["status"] == "fail"
|
|
|
|
def test_dep_scan_pass_allows_release(self):
|
|
"""No vulns → gate passes, release_check can continue."""
|
|
from release_check_runner import run_release_check
|
|
|
|
scan_pass = {
|
|
"pass": True,
|
|
"summary": "✅ No vulns",
|
|
"stats": {
|
|
"vulns_total": 0,
|
|
"by_severity": {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "UNKNOWN": 0},
|
|
"deps_total": 50,
|
|
},
|
|
"vulnerabilities": [],
|
|
"outdated": [],
|
|
"licenses": [],
|
|
"recommendations": [],
|
|
}
|
|
tm = self._make_tool_manager(scan_pass)
|
|
report = _run(run_release_check(tm, {
|
|
"service_name": "router",
|
|
"diff": "",
|
|
"run_deps": True,
|
|
}, agent_id="sofiia"))
|
|
|
|
dep_gate = next(
|
|
(g for g in report["gates"] if g["name"] == "dependency_scan"), None
|
|
)
|
|
assert dep_gate is not None
|
|
assert dep_gate["status"] == "pass"
|
|
|
|
def test_dep_scan_disabled(self):
|
|
"""run_deps=False → dependency_scan gate not in report."""
|
|
from release_check_runner import run_release_check
|
|
|
|
tm = self._make_tool_manager({"pass": True, "stats": {}, "vulnerabilities": [],
|
|
"outdated": [], "licenses": [], "recommendations": [],
|
|
"summary": ""})
|
|
report = _run(run_release_check(tm, {
|
|
"service_name": "router",
|
|
"diff": "",
|
|
"run_deps": False,
|
|
}, agent_id="sofiia"))
|
|
|
|
gate_names = [g["name"] for g in report["gates"]]
|
|
assert "dependency_scan" not in gate_names
|
|
|
|
def test_dep_scan_fail_fast(self):
|
|
"""fail_fast=True → report is returned immediately after dep scan failure."""
|
|
from release_check_runner import run_release_check
|
|
|
|
scan_fail = {
|
|
"pass": False,
|
|
"summary": "CRITICAL vuln",
|
|
"stats": {
|
|
"vulns_total": 1,
|
|
"by_severity": {"CRITICAL": 1, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "UNKNOWN": 0},
|
|
"deps_total": 5,
|
|
},
|
|
"vulnerabilities": [{
|
|
"id": "GHSA-CRIT",
|
|
"package": "oldlib",
|
|
"version": "0.1.0",
|
|
"severity": "CRITICAL",
|
|
"fixed_versions": ["1.0.0"],
|
|
"recommendation": "Upgrade oldlib",
|
|
"aliases": [],
|
|
"evidence": {},
|
|
}],
|
|
"outdated": [],
|
|
"licenses": [],
|
|
"recommendations": ["Upgrade oldlib"],
|
|
}
|
|
call_count = {"n": 0}
|
|
tm = MagicMock()
|
|
|
|
async def execute_tool(tool_name, args, **kwargs):
|
|
call_count["n"] += 1
|
|
result = MagicMock()
|
|
if tool_name == "dependency_scanner_tool":
|
|
result.success = True
|
|
result.result = scan_fail
|
|
else:
|
|
result.success = True
|
|
result.result = {"pass": True, "findings": [], "summary": "ok"}
|
|
result.error = None
|
|
return result
|
|
|
|
tm.execute_tool = execute_tool
|
|
|
|
report = _run(run_release_check(tm, {
|
|
"service_name": "router",
|
|
"diff": "",
|
|
"run_deps": True,
|
|
"fail_fast": True,
|
|
}, agent_id="sofiia"))
|
|
|
|
assert report["pass"] is False
|
|
# pr_review + config_lint + dependency_scan = 3 calls; no further tools called
|
|
assert call_count["n"] <= 3
|
|
|
|
|
|
# ─── RBAC Tests ───────────────────────────────────────────────────────────────
|
|
|
|
class TestDepsRBAC:
|
|
"""Verify RBAC guards for dependency_scanner_tool."""
|
|
|
|
def test_agent_without_deps_read_is_denied(self):
|
|
from tool_governance import check_rbac
|
|
ok, reason = check_rbac("agent_media", "dependency_scanner_tool", "scan")
|
|
assert not ok
|
|
assert "denied" in reason.lower() or "entitlement" in reason.lower()
|
|
|
|
def test_agent_cto_has_deps_read(self):
|
|
from tool_governance import check_rbac
|
|
ok, _ = check_rbac("sofiia", "dependency_scanner_tool", "scan")
|
|
assert ok
|
|
|
|
def test_agent_oncall_has_deps_read(self):
|
|
from tool_governance import check_rbac
|
|
ok, _ = check_rbac("helion", "dependency_scanner_tool", "scan")
|
|
assert ok
|
|
|
|
def test_agent_media_no_deps_gate(self):
|
|
from tool_governance import check_rbac
|
|
ok, _ = check_rbac("agent_media", "dependency_scanner_tool", "gate")
|
|
assert not ok
|