fix(deps): harden input sanitization, expand test coverage, bump version

Security:
- Add _INLINE_DANGEROUS_OPTIONS regex to catch pip options after package
  names (--find-links, --constraint, --requirement, --editable, --trusted-host,
  --global-option, --install-option and short forms)
- Stage index URLs in pending_urls, commit only after full line validation
  to prevent URL injection from rejected lines

Tests:
- Add 50 new tests: inline sanitization, false-positive guards, parse
  helpers (_parse_conflicts, _parse_install_output), exception paths
  (91 → 141 total, all pass)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dr.Lt.Data 2026-03-07 05:35:23 +09:00
parent ca8698533d
commit b11aee7c1e
3 changed files with 251 additions and 5 deletions

View File

@ -94,13 +94,30 @@ class ResolveResult:
_TMP_PREFIX = "comfyui_resolver_"
# Security: reject dangerous requirement patterns.
# Security: reject dangerous requirement patterns at line start.
# NOTE: This regex is intentionally kept alongside _INLINE_DANGEROUS_OPTIONS
# because it covers ``@ file://`` via ``.*@\s*file://`` which relies on the
# ``^`` anchor. Both regexes share responsibility for option rejection:
# this one catches line-start patterns; _INLINE_DANGEROUS_OPTIONS catches
# options appearing after a package name.
_DANGEROUS_PATTERNS = re.compile(
r'^(-r\b|--requirement\b|-e\b|--editable\b|-c\b|--constraint\b'
r'|--find-links\b|-f\b|.*@\s*file://)',
re.IGNORECASE,
)
# Security: reject dangerous pip options appearing anywhere in the line
# (supplements the ^-anchored _DANGEROUS_PATTERNS which only catches line-start).
# The ``(?:^|\s)`` prefix prevents false positives on hyphenated package names
# (e.g. ``re-crypto``, ``package[extra-c]``) while still catching concatenated
# short-flag attacks like ``-fhttps://evil.com``.
_INLINE_DANGEROUS_OPTIONS = re.compile(
r'(?:^|\s)(--find-links\b|--constraint\b|--requirement\b|--editable\b'
r'|--trusted-host\b|--global-option\b|--install-option\b'
r'|-f|-r|-e|-c)',
re.IGNORECASE,
)
# Credential redaction in index URLs.
_CREDENTIAL_PATTERN = re.compile(r'://([^@]+)@')
@ -293,12 +310,25 @@ class UnifiedDepResolver:
# 1. Separate --index-url / --extra-index-url handling
# (before path separator check, because URLs contain '/')
# URLs are staged but NOT committed until the line passes
# all validation (prevents URL injection from rejected lines).
pending_urls: list[str] = []
if '--index-url' in line or '--extra-index-url' in line:
pkg_spec, index_urls = self._split_index_url(line)
extra_index_urls.extend(index_urls)
pkg_spec, pending_urls = self._split_index_url(line)
line = pkg_spec
if not line:
# Standalone option line (no package prefix)
# Standalone option line (no package prefix) — safe
extra_index_urls.extend(pending_urls)
continue
# 1b. Reject dangerous pip options appearing after package name
# (--index-url/--extra-index-url already extracted above)
if _INLINE_DANGEROUS_OPTIONS.search(line):
skipped.append((line, f"rejected: inline pip option in {pack_path}"))
logger.warning(
"[UnifiedDepResolver] rejected inline pip option: '%s' from %s",
line, pack_path,
)
continue
# Reject path separators in package name portion
@ -334,6 +364,10 @@ class UnifiedDepResolver:
)
sources[pkg_name].append(pack_path)
# Commit staged index URLs only after all validation passed.
if pending_urls:
extra_index_urls.extend(pending_urls)
return CollectedDeps(
requirements=requirements,
skipped=skipped,

View File

@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "comfyui-manager"
license = { text = "GPL-3.0-only" }
version = "4.1b1"
version = "4.1b2"
requires-python = ">= 3.9"
description = "ComfyUI-Manager provides features to install and manage custom nodes for ComfyUI, as well as various functionalities to assist with ComfyUI."
readme = "README.md"

View File

@ -98,6 +98,7 @@ collect_base_requirements = _udr_module.collect_base_requirements
collect_node_pack_paths = _udr_module.collect_node_pack_paths
_CREDENTIAL_PATTERN = _udr_module._CREDENTIAL_PATTERN
_DANGEROUS_PATTERNS = _udr_module._DANGEROUS_PATTERNS
_INLINE_DANGEROUS_OPTIONS = _udr_module._INLINE_DANGEROUS_OPTIONS
_TMP_PREFIX = _udr_module._TMP_PREFIX
_VERSION_SPEC_PATTERN = _udr_module._VERSION_SPEC_PATTERN
@ -294,6 +295,77 @@ class TestInputSanitization:
deps = r.collect_requirements()
assert len(deps.requirements) == 1
@pytest.mark.parametrize("line", [
"torch --find-links localdir",
"numpy --constraint evil.txt",
"scipy --requirement secret.txt",
"pkg --editable ./local",
"torch -f localdir",
"numpy -c evil.txt",
"pkg -r secret.txt",
"scipy -e ./local",
# Concatenated short flags (no space between flag and value)
"torch -fhttps://evil.com/packages",
"numpy -cevil.txt",
"pkg -rsecret.txt",
"scipy -e./local",
# Case-insensitive
"torch --FIND-LINKS localdir",
"numpy --Constraint evil.txt",
# Additional dangerous options
"torch --trusted-host evil.com",
"numpy --global-option=--no-user-cfg",
"pkg --install-option=--prefix=/tmp",
])
def test_inline_dangerous_options_rejected(self, line, tmp_path):
"""Pip options after package name must be caught (not just at line start)."""
p = _make_node_pack(str(tmp_path), "pack_a", line + "\n")
r = _resolver([p])
deps = r.collect_requirements()
assert deps.requirements == [], f"'{line}' should have been rejected"
assert len(deps.skipped) == 1
assert "rejected" in deps.skipped[0][1]
def test_index_url_not_blocked_by_inline_check(self, tmp_path):
"""--index-url and --extra-index-url are legitimate and extracted before inline check."""
p = _make_node_pack(str(tmp_path), "pack_a",
"torch --extra-index-url https://download.pytorch.org/whl/cu121\n")
r = _resolver([p])
deps = r.collect_requirements()
assert len(deps.requirements) == 1
assert deps.requirements[0].name == "torch"
assert len(deps.extra_index_urls) == 1
def test_combined_index_url_and_dangerous_option(self, tmp_path):
"""A line with both --extra-index-url and --find-links must reject
AND must NOT retain the extracted index URL."""
p = _make_node_pack(str(tmp_path), "pack_a",
"torch --extra-index-url https://evil.com --find-links /local\n")
r = _resolver([p])
deps = r.collect_requirements()
assert deps.requirements == [], "line should have been rejected"
assert deps.extra_index_urls == [], "evil URL should not be retained"
assert len(deps.skipped) == 1
@pytest.mark.parametrize("spec", [
"package[extra-c]>=1.0",
"package[extra-r]",
"my-e-package>=2.0",
"some-f-lib",
"re-crypto>=1.0",
# Real-world packages with hyphens near short flag letters
"opencv-contrib-python-headless",
"scikit-learn>=1.0",
"onnxruntime-gpu",
"face-recognition>=1.3",
])
def test_inline_check_no_false_positive_on_package_names(self, spec, tmp_path):
"""Short flags inside package names or extras must not trigger false positive."""
p = _make_node_pack(str(tmp_path), "pack_a", spec + "\n")
r = _resolver([p])
deps = r.collect_requirements()
assert len(deps.requirements) == 1, f"'{spec}' was incorrectly rejected"
# ===========================================================================
# --index-url separation
@ -1021,3 +1093,143 @@ class TestCollectBaseRequirements:
(tmp_path / "requirements.txt").write_text("torch\n")
result = collect_base_requirements(str(tmp_path))
assert result == ["torch"]
# ===========================================================================
# _parse_conflicts (direct unit tests)
# ===========================================================================
class TestParseConflicts:
def test_extracts_conflict_lines(self):
stderr = (
"Resolved 10 packages\n"
"error: package torch has conflicting requirements\n"
" conflict between numpy>=2.0 and numpy<1.25\n"
"some other info\n"
)
result = UnifiedDepResolver._parse_conflicts(stderr)
assert len(result) == 2
assert "conflicting" in result[0]
assert "conflict" in result[1]
def test_extracts_error_lines(self):
stderr = "ERROR: No matching distribution found for nonexistent-pkg\n"
result = UnifiedDepResolver._parse_conflicts(stderr)
assert len(result) == 1
assert "nonexistent-pkg" in result[0]
def test_empty_stderr(self):
result = UnifiedDepResolver._parse_conflicts("")
assert result == []
def test_whitespace_only_stderr(self):
result = UnifiedDepResolver._parse_conflicts(" \n\n ")
assert result == []
def test_no_conflict_keywords_falls_back_to_full_stderr(self):
stderr = "resolution failed due to incompatible versions"
result = UnifiedDepResolver._parse_conflicts(stderr)
# No 'conflict' or 'error' keyword → falls back to [stderr.strip()]
assert result == [stderr.strip()]
def test_mixed_lines(self):
stderr = (
"info: checking packages\n"
"error: failed to resolve\n"
"debug: trace output\n"
)
result = UnifiedDepResolver._parse_conflicts(stderr)
assert len(result) == 1
assert "failed to resolve" in result[0]
# ===========================================================================
# _parse_install_output (direct unit tests)
# ===========================================================================
class TestParseInstallOutput:
def test_installed_packages(self):
result = subprocess.CompletedProcess(
[], 0,
stdout="Installed numpy-1.24.0\nInstalled requests-2.31.0\n",
stderr="",
)
installed, skipped = UnifiedDepResolver._parse_install_output(result)
assert len(installed) == 2
assert any("numpy" in p for p in installed)
def test_skipped_packages(self):
result = subprocess.CompletedProcess(
[], 0,
stdout="Requirement already satisfied: numpy==1.24.0\n",
stderr="",
)
installed, skipped = UnifiedDepResolver._parse_install_output(result)
assert len(installed) == 0
assert len(skipped) == 1
assert "already" in skipped[0].lower()
def test_mixed_installed_and_skipped(self):
result = subprocess.CompletedProcess(
[], 0,
stdout=(
"Requirement already satisfied: numpy==1.24.0\n"
"Installed requests-2.31.0\n"
"Updated torch-2.1.0\n"
),
stderr="",
)
installed, skipped = UnifiedDepResolver._parse_install_output(result)
assert len(installed) == 2 # "Installed" + "Updated"
assert len(skipped) == 1 # "already satisfied"
def test_empty_output(self):
result = subprocess.CompletedProcess([], 0, stdout="", stderr="")
installed, skipped = UnifiedDepResolver._parse_install_output(result)
assert installed == []
assert skipped == []
def test_unrecognized_lines_ignored(self):
result = subprocess.CompletedProcess(
[], 0,
stdout="Resolving dependencies...\nDownloading numpy-1.24.0.whl\n",
stderr="",
)
installed, skipped = UnifiedDepResolver._parse_install_output(result)
assert installed == []
assert skipped == []
# ===========================================================================
# resolve_and_install: general Exception path
# ===========================================================================
class TestResolveAndInstallExceptionPath:
def test_unexpected_exception_returns_error_result(self, tmp_path):
"""Non-UvNotAvailableError exceptions should be caught and returned."""
p = _make_node_pack(str(tmp_path), "pack_a", "numpy\n")
r = _resolver([p])
with mock.patch.object(
r, "collect_requirements",
side_effect=RuntimeError("unexpected disk failure"),
):
result = r.resolve_and_install()
assert not result.success
assert "unexpected disk failure" in result.error
def test_unexpected_exception_during_compile(self, tmp_path):
"""Exception in compile_lockfile should be caught by resolve_and_install."""
p = _make_node_pack(str(tmp_path), "pack_a", "numpy\n")
r = _resolver([p])
with mock.patch.object(r, "_get_uv_cmd", return_value=["uv"]):
with mock.patch.object(
r, "compile_lockfile",
side_effect=OSError("permission denied"),
):
result = r.resolve_and_install()
assert not result.success
assert "permission denied" in result.error