""" Dependency & Supply Chain Scanner. Scans Python and Node.js dependencies for: 1. Known vulnerabilities (via OSV.dev API or offline cache) 2. Outdated packages (lockfile_only mode, using OSV fixed_versions) 3. License policy enforcement (optional, MVP: offline-only) Ecosystems supported: Python → poetry.lock, pipfile.lock, requirements*.txt, pyproject.toml Node → package-lock.json, pnpm-lock.yaml, yarn.lock, package.json Pass rule: pass=false if any vuln with severity in fail_on (default: CRITICAL, HIGH). MEDIUM → warning (not blocking by default). UNKNOWN → warning if not in fail_on. Security: - Read-only: no file writes except cache update (explicit) - Evidence masked for secrets - Payload not logged; only hash + counts - Max files/deps enforced via limits - Timeout via deadline """ from __future__ import annotations import csv import fnmatch import hashlib import json import logging import os import re import time import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple logger = logging.getLogger(__name__) # ─── Constants ──────────────────────────────────────────────────────────────── EXCLUDED_DIRS: FrozenSet[str] = frozenset({ "node_modules", ".git", "dist", "build", "vendor", ".venv", "venv", "venv_models", "sofia_venv", "__pycache__", ".pytest_cache", "rollback_backups", "docs/consolidation", }) OSV_API_URL = "https://api.osv.dev/v1/querybatch" OSV_BATCH_SIZE = 100 # max per request OSV_TIMEOUT_SEC = 15.0 # OSV ecosystems ECOSYSTEM_PYPI = "PyPI" ECOSYSTEM_NPM = "npm" SEVERITY_ORDER = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1, "UNKNOWN": 0} # ─── Data Structures ────────────────────────────────────────────────────────── @dataclass class Package: name: str version: str # empty string = unresolved/unpinned ecosystem: str # "PyPI" | "npm" source_file: str pinned: bool = True @property def normalized_name(self) -> str: return self.name.lower().replace("_", "-") @property def cache_key(self) -> str: return f"{self.ecosystem}:{self.normalized_name}:{self.version}" @dataclass class Vulnerability: osv_id: str ecosystem: str package: str version: str severity: str # CRITICAL | HIGH | MEDIUM | LOW | UNKNOWN fixed_versions: List[str] aliases: List[str] # CVE-XXXX-XXXX etc. evidence: Dict[str, str] recommendation: str @dataclass class OutdatedPackage: ecosystem: str package: str current: str latest: Optional[str] notes: str @dataclass class LicenseFinding: package: str license: str policy: str # "deny" | "warn" | "ok" | "unknown" recommendation: str @dataclass class ScanResult: pass_: bool summary: str stats: Dict[str, Any] vulnerabilities: List[Dict] outdated: List[Dict] licenses: List[Dict] recommendations: List[str] # ─── Helpers ────────────────────────────────────────────────────────────────── _SECRET_PAT = re.compile( r'(?i)(api[_-]?key|token|secret|password|bearer|jwt|private[_-]?key)' r'[\s=:]+[\'"`]?([a-zA-Z0-9_\-\.]{8,})[\'"`]?' ) def _redact(text: str) -> str: return _SECRET_PAT.sub(lambda m: f"{m.group(1)}=***REDACTED***", text or "") def _is_excluded(path: str) -> bool: parts = Path(path).parts return any(p in EXCLUDED_DIRS for p in parts) def _read_file(path: str, max_bytes: int = 524288) -> str: try: size = os.path.getsize(path) with open(path, "r", errors="replace") as f: return f.read(min(size, max_bytes)) except Exception: return "" def _normalize_pkg_name(name: str) -> str: """Normalize: lowercase, underscores → dashes.""" return name.strip().lower().replace("_", "-") def _compare_versions(v1: str, v2: str) -> int: """ Simple version comparison. Returns -1 / 0 / 1. Handles semver and PEP 440 in a best-effort way. """ def _parts(v: str) -> List[int]: nums = re.findall(r'\d+', v.split("+")[0].split("-")[0]) return [int(x) for x in nums] if nums else [0] p1, p2 = _parts(v1), _parts(v2) # Pad to equal length max_len = max(len(p1), len(p2)) p1 += [0] * (max_len - len(p1)) p2 += [0] * (max_len - len(p2)) if p1 < p2: return -1 if p1 > p2: return 1 return 0 # ─── Python Parsers ─────────────────────────────────────────────────────────── def _parse_poetry_lock(content: str, source_file: str) -> List[Package]: """Parse poetry.lock [[package]] sections.""" packages = [] # Split on [[package]] headers sections = re.split(r'\[\[package\]\]', content) for section in sections[1:]: name_m = re.search(r'^name\s*=\s*"([^"]+)"', section, re.MULTILINE) ver_m = re.search(r'^version\s*=\s*"([^"]+)"', section, re.MULTILINE) if name_m and ver_m: packages.append(Package( name=name_m.group(1), version=ver_m.group(1), ecosystem=ECOSYSTEM_PYPI, source_file=source_file, pinned=True, )) return packages def _parse_pipfile_lock(content: str, source_file: str) -> List[Package]: """Parse Pipfile.lock JSON.""" packages = [] try: data = json.loads(content) for section in ("default", "develop"): for pkg_name, pkg_info in (data.get(section) or {}).items(): version = pkg_info.get("version", "") # Pipfile.lock versions are like "==2.28.0" version = re.sub(r'^==', '', version) if version: packages.append(Package( name=pkg_name, version=version, ecosystem=ECOSYSTEM_PYPI, source_file=source_file, pinned=True, )) except Exception as e: logger.debug(f"Could not parse Pipfile.lock: {e}") return packages _REQ_LINE_PAT = re.compile( r'^([A-Za-z0-9_\-\.]+)(?:\[.*?\])?\s*==\s*([^\s;#]+)', re.MULTILINE, ) _REQ_UNPINNED_PAT = re.compile( r'^([A-Za-z0-9_\-\.]+)(?:\[.*?\])?\s*[> List[Package]: """ Parse requirements.txt. Only pinned (==) lines yield concrete versions. Unpinned are recorded with empty version (unresolved). """ packages = [] seen: Set[str] = set() for m in _REQ_LINE_PAT.finditer(content): name, version = m.group(1), m.group(2).strip() key = _normalize_pkg_name(name) if key not in seen: packages.append(Package( name=name, version=version, ecosystem=ECOSYSTEM_PYPI, source_file=source_file, pinned=True, )) seen.add(key) # Record unpinned for reporting (no vuln scan) for m in _REQ_UNPINNED_PAT.finditer(content): name = m.group(1) key = _normalize_pkg_name(name) if key not in seen: packages.append(Package( name=name, version="", ecosystem=ECOSYSTEM_PYPI, source_file=source_file, pinned=False, )) seen.add(key) return packages def _parse_pyproject_toml(content: str, source_file: str) -> List[Package]: """Extract declared deps from pyproject.toml (without resolving versions).""" packages = [] # [tool.poetry.dependencies] or [project.dependencies] dep_section = re.search( r'\[(?:tool\.poetry\.dependencies|project)\]([^\[]*)', content, re.DOTALL ) if not dep_section: return packages block = dep_section.group(1) for m in re.finditer(r'^([A-Za-z0-9_\-\.]+)\s*=', block, re.MULTILINE): name = m.group(1).strip() if name.lower() in ("python", "python-version"): continue packages.append(Package( name=name, version="", ecosystem=ECOSYSTEM_PYPI, source_file=source_file, pinned=False, )) return packages # ─── Node Parsers ───────────────────────────────────────────────────────────── def _parse_package_lock_json(content: str, source_file: str) -> List[Package]: """Parse package-lock.json (npm v2/v3 format).""" packages = [] try: data = json.loads(content) # v2/v3: flat packages object pkg_map = data.get("packages") or {} for path_key, info in pkg_map.items(): if path_key == "" or not path_key.startswith("node_modules/"): continue # Extract package name from path name = path_key.replace("node_modules/", "").split("/node_modules/")[-1] version = info.get("version", "") if name and version: packages.append(Package( name=name, version=version, ecosystem=ECOSYSTEM_NPM, source_file=source_file, pinned=True, )) # v1 fallback: nested dependencies if not packages: for name, info in (data.get("dependencies") or {}).items(): version = info.get("version", "") if version: packages.append(Package( name=name, version=version, ecosystem=ECOSYSTEM_NPM, source_file=source_file, pinned=True, )) except Exception as e: logger.debug(f"Could not parse package-lock.json: {e}") return packages def _parse_pnpm_lock(content: str, source_file: str) -> List[Package]: """Parse pnpm-lock.yaml packages section.""" packages = [] # Pattern: /package@version: for m in re.finditer(r'^/([^@\s]+)@([^\s:]+):', content, re.MULTILINE): name, version = m.group(1), m.group(2) packages.append(Package( name=name, version=version, ecosystem=ECOSYSTEM_NPM, source_file=source_file, pinned=True, )) return packages def _parse_yarn_lock(content: str, source_file: str) -> List[Package]: """Parse yarn.lock v1 format.""" packages = [] # Yarn.lock block: "package@version":\n version "X.Y.Z" block_pat = re.compile( r'^"?([^@"\s]+)@[^:]+:\n(?:\s+.*\n)*?\s+version "([^"]+)"', re.MULTILINE, ) seen: Set[str] = set() for m in block_pat.finditer(content): name, version = m.group(1), m.group(2) key = f"{name}@{version}" if key not in seen: packages.append(Package( name=name, version=version, ecosystem=ECOSYSTEM_NPM, source_file=source_file, pinned=True, )) seen.add(key) return packages def _parse_package_json(content: str, source_file: str) -> List[Package]: """Extract declared deps from package.json (no lock = unresolved).""" packages = [] try: data = json.loads(content) for section in ("dependencies", "devDependencies"): for name in (data.get(section) or {}): packages.append(Package( name=name, version="", ecosystem=ECOSYSTEM_NPM, source_file=source_file, pinned=False, )) except Exception: pass return packages # ─── Dependency Discovery ───────────────────────────────────────────────────── _PYTHON_MANIFESTS = ( "poetry.lock", "Pipfile.lock", ) _PYTHON_REQUIREMENTS = ("requirements",) # matched via endswith _PYTHON_PYPROJECT = ("pyproject.toml",) _NODE_MANIFESTS = ( "package-lock.json", "pnpm-lock.yaml", "yarn.lock", "package.json", ) def _find_and_parse_deps( repo_root: str, targets: List[str], max_files: int, deadline: float, ) -> List[Package]: """Walk repo and extract all packages from manifest files.""" all_packages: List[Package] = [] files_scanned = 0 for dirpath, dirnames, filenames in os.walk(repo_root): dirnames[:] = [ d for d in dirnames if d not in EXCLUDED_DIRS and not d.startswith(".") ] if time.monotonic() > deadline: logger.warning("dependency_scanner: walk timeout") break for fname in filenames: if files_scanned >= max_files: break full = os.path.join(dirpath, fname) if _is_excluded(full): continue rel = os.path.relpath(full, repo_root) content = None if "python" in targets: if fname in _PYTHON_MANIFESTS: content = _read_file(full) if fname == "poetry.lock": all_packages.extend(_parse_poetry_lock(content, rel)) elif fname == "Pipfile.lock": all_packages.extend(_parse_pipfile_lock(content, rel)) files_scanned += 1 elif fname.endswith(".txt") and "requirements" in fname.lower(): content = _read_file(full) all_packages.extend(_parse_requirements_txt(content, rel)) files_scanned += 1 elif fname in _PYTHON_PYPROJECT: content = _read_file(full) all_packages.extend(_parse_pyproject_toml(content, rel)) files_scanned += 1 if "node" in targets: if fname in _NODE_MANIFESTS: # Skip package.json if package-lock.json sibling exists if fname == "package.json": lock_exists = ( os.path.exists(os.path.join(dirpath, "package-lock.json")) or os.path.exists(os.path.join(dirpath, "yarn.lock")) or os.path.exists(os.path.join(dirpath, "pnpm-lock.yaml")) ) if lock_exists: continue content = _read_file(full) if fname == "package-lock.json": all_packages.extend(_parse_package_lock_json(content, rel)) elif fname == "pnpm-lock.yaml": all_packages.extend(_parse_pnpm_lock(content, rel)) elif fname == "yarn.lock": all_packages.extend(_parse_yarn_lock(content, rel)) elif fname == "package.json": all_packages.extend(_parse_package_json(content, rel)) files_scanned += 1 # Deduplicate: prefer pinned over unpinned; first seen wins seen: Dict[str, Package] = {} for pkg in all_packages: key = f"{pkg.ecosystem}:{pkg.normalized_name}" if key not in seen or (not seen[key].pinned and pkg.pinned): seen[key] = pkg return list(seen.values()) # ─── OSV Cache ──────────────────────────────────────────────────────────────── def _load_osv_cache(cache_path: str) -> Dict[str, Any]: """Load offline OSV cache from JSON file.""" if not cache_path or not os.path.exists(cache_path): return {} try: with open(cache_path, "r") as f: data = json.load(f) return data.get("entries", {}) except Exception as e: logger.warning(f"Could not load OSV cache {cache_path}: {e}") return {} def _save_osv_cache(cache_path: str, entries: Dict[str, Any]): """Persist updated cache entries to disk.""" os.makedirs(os.path.dirname(os.path.abspath(cache_path)), exist_ok=True) existing = {} if os.path.exists(cache_path): try: with open(cache_path, "r") as f: existing = json.load(f) except Exception: pass existing_entries = existing.get("entries", {}) existing_entries.update(entries) import datetime output = { "version": 1, "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), "entries": existing_entries, } with open(cache_path, "w") as f: json.dump(output, f, indent=2) # ─── OSV API ────────────────────────────────────────────────────────────────── def _query_osv_online( packages: List[Package], new_cache: Dict[str, Any], deadline: float, ) -> Dict[str, List[Dict]]: """ Query OSV.dev /v1/querybatch in batches. Returns {cache_key: [vuln_objects]}. """ try: import httpx except ImportError: logger.warning("httpx not available for OSV online query") return {} results: Dict[str, List[Dict]] = {} batches = [packages[i:i + OSV_BATCH_SIZE] for i in range(0, len(packages), OSV_BATCH_SIZE)] for batch in batches: if time.monotonic() > deadline: break queries = [] batch_keys = [] for pkg in batch: if not pkg.pinned or not pkg.version: continue queries.append({ "package": {"name": pkg.normalized_name, "ecosystem": pkg.ecosystem}, "version": pkg.version, }) batch_keys.append(pkg.cache_key) if not queries: continue try: remaining = max(1.0, deadline - time.monotonic()) timeout = min(OSV_TIMEOUT_SEC, remaining) with httpx.Client(timeout=timeout) as client: resp = client.post(OSV_API_URL, json={"queries": queries}) resp.raise_for_status() data = resp.json() except Exception as e: logger.warning(f"OSV query failed: {e}") continue for key, result in zip(batch_keys, data.get("results", [])): vulns = result.get("vulns") or [] results[key] = vulns new_cache[key] = {"vulns": vulns, "cached_at": _now_iso()} return results def _parse_osv_severity(vuln: Dict) -> str: """Extract best-effort severity from OSV vuln object.""" # Try database_specific.severity (many databases provide this) db_specific = vuln.get("database_specific", {}) sev = (db_specific.get("severity") or "").upper() if sev in SEVERITY_ORDER: return sev # Try severity[].type=CVSS_V3 score for sev_entry in (vuln.get("severity") or []): score_str = sev_entry.get("score", "") # CVSS vector like CVSS:3.1/AV:N/AC:L/.../C:H/I:H/A:H # Extract base score from the end: not available directly # Try to extract numerical score if present num_m = re.search(r'(\d+\.\d+)', score_str) if num_m: score = float(num_m.group(1)) if score >= 9.0: return "CRITICAL" if score >= 7.0: return "HIGH" if score >= 4.0: return "MEDIUM" if score > 0: return "LOW" # Try ecosystem_specific eco_specific = vuln.get("ecosystem_specific", {}) sev = (eco_specific.get("severity") or "").upper() if sev in SEVERITY_ORDER: return sev return "UNKNOWN" def _extract_fixed_versions(vuln: Dict, pkg_name: str, ecosystem: str) -> List[str]: """Extract fixed versions from OSV affected[].ranges[].events.""" fixed = [] for affected in (vuln.get("affected") or []): pkg = affected.get("package", {}) if (pkg.get("ecosystem") or "").lower() != ecosystem.lower(): continue if _normalize_pkg_name(pkg.get("name", "")) != _normalize_pkg_name(pkg_name): continue for rng in (affected.get("ranges") or []): for event in (rng.get("events") or []): if "fixed" in event: fixed.append(event["fixed"]) return sorted(set(fixed)) def _lookup_vulnerability( pkg: Package, osv_vulns: List[Dict], ) -> List[Vulnerability]: """Convert raw OSV vulns → Vulnerability objects.""" results = [] for vuln in osv_vulns: osv_id = vuln.get("id", "UNKNOWN") aliases = [a for a in (vuln.get("aliases") or []) if a.startswith("CVE")] severity = _parse_osv_severity(vuln) fixed = _extract_fixed_versions(vuln, pkg.name, pkg.ecosystem) rec = ( f"Upgrade {pkg.name} from {pkg.version} to {fixed[0]}" if fixed else f"No fix available for {pkg.name}@{pkg.version}. Monitor {osv_id}." ) results.append(Vulnerability( osv_id=osv_id, ecosystem=pkg.ecosystem, package=pkg.name, version=pkg.version, severity=severity, fixed_versions=fixed, aliases=aliases, evidence={ "file": _redact(pkg.source_file), "details": f"{pkg.name}=={pkg.version} in {pkg.source_file}", }, recommendation=rec, )) return results # ─── Outdated Analysis ──────────────────────────────────────────────────────── def _analyze_outdated( packages: List[Package], vuln_results: Dict[str, List[Dict]], ) -> List[OutdatedPackage]: """ Lockfile-only outdated analysis. Uses fixed_versions from OSV results as a hint for "newer version available". """ outdated = [] for pkg in packages: if not pkg.pinned or not pkg.version: continue key = pkg.cache_key vulns = vuln_results.get(key, []) for vuln in vulns: fixed = _extract_fixed_versions(vuln, pkg.name, pkg.ecosystem) if not fixed: continue # Find the smallest fixed version > current upgrades = [v for v in fixed if _compare_versions(v, pkg.version) > 0] if upgrades: min_fix = sorted(upgrades, key=lambda v: [int(x) for x in re.findall(r'\d+', v)])[0] outdated.append(OutdatedPackage( ecosystem=pkg.ecosystem, package=pkg.name, current=pkg.version, latest=min_fix, notes=f"Security fix available (vuln: {vuln.get('id', '?')})", )) break # One entry per package return outdated # ─── License Policy ─────────────────────────────────────────────────────────── def _apply_license_policy( packages: List[Package], policy_cfg: Dict, ) -> List[LicenseFinding]: """MVP: license data is rarely in lock files, so most will be UNKNOWN.""" if not policy_cfg.get("enabled", False): return [] deny_list = {l.upper() for l in (policy_cfg.get("deny") or [])} warn_list = {l.upper() for l in (policy_cfg.get("warn") or [])} findings = [] for pkg in packages: # In MVP there's no way to get license from lockfile without network license_str = "UNKNOWN" if license_str == "UNKNOWN": continue # skip unknown in MVP policy = "ok" if license_str.upper() in deny_list: policy = "deny" elif license_str.upper() in warn_list: policy = "warn" findings.append(LicenseFinding( package=pkg.name, license=license_str, policy=policy, recommendation=f"Review license {license_str} for {pkg.name}." if policy != "ok" else "", )) return findings # ─── Main Scanner ───────────────────────────────────────────────────────────── def scan_dependencies( repo_root: str, targets: Optional[List[str]] = None, vuln_sources: Optional[Dict] = None, license_policy: Optional[Dict] = None, severity_thresholds: Optional[Dict] = None, outdated_cfg: Optional[Dict] = None, limits: Optional[Dict] = None, timeout_sec: float = 40.0, ) -> ScanResult: """ Scan repo dependencies for vulnerabilities, outdated packages, license issues. Args: repo_root: absolute path to repo root targets: ["python", "node"] (default: both) vuln_sources: {"osv": {"enabled": true, "mode": "online|offline_cache", "cache_path": "..."}} license_policy: {"enabled": false, "deny": [...], "warn": [...]} severity_thresholds: {"fail_on": ["CRITICAL", "HIGH"], "warn_on": ["MEDIUM"]} outdated_cfg: {"enabled": true, "mode": "lockfile_only"} limits: {"max_files": 80, "max_deps": 2000, "max_vulns": 500} timeout_sec: hard deadline Returns: ScanResult with pass/fail verdict """ deadline = time.monotonic() + timeout_sec targets = targets or ["python", "node"] vuln_sources = vuln_sources or {"osv": {"enabled": True, "mode": "offline_cache", "cache_path": "ops/cache/osv_cache.json"}} license_policy = license_policy or {"enabled": False} severity_thresholds = severity_thresholds or {"fail_on": ["CRITICAL", "HIGH"], "warn_on": ["MEDIUM"]} outdated_cfg = outdated_cfg or {"enabled": True, "mode": "lockfile_only"} limits = limits or {"max_files": 80, "max_deps": 2000, "max_vulns": 500} fail_on = {s.upper() for s in (severity_thresholds.get("fail_on") or ["CRITICAL", "HIGH"])} warn_on = {s.upper() for s in (severity_thresholds.get("warn_on") or ["MEDIUM"])} # ── Step 1: Extract dependencies ───────────────────────────────────────── all_packages = _find_and_parse_deps( repo_root, targets, max_files=limits.get("max_files", 80), deadline=deadline, ) # Apply dep count limit max_deps = limits.get("max_deps", 2000) if len(all_packages) > max_deps: logger.warning(f"Dep count {len(all_packages)} > max {max_deps}, truncating") all_packages = all_packages[:max_deps] pinned = [p for p in all_packages if p.pinned and p.version] unpinned = [p for p in all_packages if not p.pinned or not p.version] # ── Step 2: Vulnerability lookup ───────────────────────────────────────── osv_cfg = vuln_sources.get("osv", {}) osv_enabled = osv_cfg.get("enabled", True) osv_mode = osv_cfg.get("mode", "offline_cache") # Resolve cache path (absolute or relative to repo_root) cache_path_raw = osv_cfg.get("cache_path", "ops/cache/osv_cache.json") cache_path = ( cache_path_raw if os.path.isabs(cache_path_raw) else os.path.join(repo_root, cache_path_raw) ) cache_entries = _load_osv_cache(cache_path) if osv_enabled else {} new_cache: Dict[str, Any] = {} vuln_results: Dict[str, List[Dict]] = {} if osv_enabled: # Populate from cache first cache_miss: List[Package] = [] for pkg in pinned: key = pkg.cache_key if key in cache_entries: vuln_results[key] = (cache_entries[key] or {}).get("vulns", []) else: cache_miss.append(pkg) # Online query for cache misses if osv_mode == "online" and cache_miss and time.monotonic() < deadline: online_results = _query_osv_online(cache_miss, new_cache, deadline) vuln_results.update(online_results) # Mark remaining misses as UNKNOWN (no cache entry) for pkg in cache_miss: if pkg.cache_key not in vuln_results: vuln_results[pkg.cache_key] = None # type: ignore[assignment] else: # Offline: cache misses → UNKNOWN for pkg in cache_miss: vuln_results[pkg.cache_key] = None # type: ignore[assignment] # Persist new cache entries if online mode if new_cache and osv_mode == "online": try: _save_osv_cache(cache_path, new_cache) except Exception as e: logger.warning(f"Could not save OSV cache: {e}") # ── Step 3: Build vulnerability findings ───────────────────────────────── all_vulns: List[Vulnerability] = [] cache_miss_pkgs: List[Package] = [] for pkg in pinned: key = pkg.cache_key raw_vulns = vuln_results.get(key) if raw_vulns is None: cache_miss_pkgs.append(pkg) continue vulns = _lookup_vulnerability(pkg, raw_vulns) all_vulns.extend(vulns) # Apply vuln limit max_vulns = limits.get("max_vulns", 500) all_vulns = all_vulns[:max_vulns] # Sort by severity desc all_vulns.sort(key=lambda v: SEVERITY_ORDER.get(v.severity, 0), reverse=True) # ── Step 4: Outdated ────────────────────────────────────────────────────── outdated: List[OutdatedPackage] = [] if outdated_cfg.get("enabled", True): outdated = _analyze_outdated(pinned, { k: v for k, v in vuln_results.items() if v is not None }) # ── Step 5: License policy ──────────────────────────────────────────────── licenses = _apply_license_policy(all_packages, license_policy) # ── Step 6: Compute pass/fail ───────────────────────────────────────────── by_severity: Dict[str, int] = {s: 0 for s in SEVERITY_ORDER} for v in all_vulns: by_severity[v.severity] = by_severity.get(v.severity, 0) + 1 blocking_count = sum(by_severity.get(s, 0) for s in fail_on) warning_count = sum(by_severity.get(s, 0) for s in warn_on) # License denials also block denied_licenses = [lf for lf in licenses if lf.policy == "deny"] if denied_licenses: blocking_count += len(denied_licenses) pass_ = blocking_count == 0 # ── Step 7: Build recommendations ──────────────────────────────────────── recommendations: List[str] = [] if blocking_count > 0: top_crit = [v for v in all_vulns if v.severity in fail_on][:3] for v in top_crit: recommendations.append(v.recommendation) if warning_count > 0: recommendations.append( f"{warning_count} MEDIUM severity vulnerabilities found — review and upgrade where possible." ) if cache_miss_pkgs: recommendations.append( f"{len(cache_miss_pkgs)} packages have no OSV cache entry (severity UNKNOWN). " "Run in online mode to populate cache: mode=online." ) if unpinned: recommendations.append( f"{len(unpinned)} unpinned dependencies detected — cannot check for vulnerabilities. " "Pin versions in requirements.txt/lock files." ) # ── Step 8: Summary ─────────────────────────────────────────────────────── ecosystems_found = sorted({p.ecosystem for p in all_packages}) elapsed_ms = round((time.monotonic() - (deadline - timeout_sec)) * 1000, 1) if pass_: summary = ( f"✅ Dependency scan PASSED. " f"{len(pinned)} deps scanned, {len(all_vulns)} vulns found " f"({by_severity.get('CRITICAL', 0)} critical, {by_severity.get('HIGH', 0)} high)." ) else: summary = ( f"❌ Dependency scan FAILED. " f"{blocking_count} blocking issue(s): " f"{by_severity.get('CRITICAL', 0)} CRITICAL, {by_severity.get('HIGH', 0)} HIGH" + (f", {len(denied_licenses)} denied licenses" if denied_licenses else "") + "." ) stats = { "ecosystems": ecosystems_found, "files_scanned": len(set(p.source_file for p in all_packages)), "deps_total": len(all_packages), "deps_pinned": len(pinned), "deps_unresolved": len(cache_miss_pkgs), "vulns_total": len(all_vulns), "by_severity": by_severity, "outdated_total": len(outdated), "elapsed_ms": elapsed_ms, } return ScanResult( pass_=pass_, summary=summary, stats=stats, vulnerabilities=[_vuln_to_dict(v) for v in all_vulns], outdated=[_outdated_to_dict(o) for o in outdated], licenses=[_license_to_dict(lf) for lf in licenses], recommendations=list(dict.fromkeys(recommendations)), # dedupe ) def scan_dependencies_dict(repo_root: str, **kwargs) -> Dict: """Convenience wrapper returning plain dict for ToolResult.""" result = scan_dependencies(repo_root, **kwargs) return { "pass": result.pass_, "summary": result.summary, "stats": result.stats, "vulnerabilities": result.vulnerabilities, "outdated": result.outdated, "licenses": result.licenses, "recommendations": result.recommendations, } # ─── Serializers ────────────────────────────────────────────────────────────── def _vuln_to_dict(v: Vulnerability) -> Dict: return { "id": v.osv_id, "ecosystem": v.ecosystem, "package": v.package, "version": v.version, "severity": v.severity, "fixed_versions": v.fixed_versions, "aliases": v.aliases, "evidence": {k: _redact(val) for k, val in v.evidence.items()}, "recommendation": v.recommendation, } def _outdated_to_dict(o: OutdatedPackage) -> Dict: return { "ecosystem": o.ecosystem, "package": o.package, "current": o.current, "latest": o.latest, "notes": o.notes, } def _license_to_dict(lf: LicenseFinding) -> Dict: return { "package": lf.package, "license": lf.license, "policy": lf.policy, "recommendation": lf.recommendation, } def _now_iso() -> str: import datetime return datetime.datetime.now(datetime.timezone.utc).isoformat()