This commit is contained in:
carladams1299-lab 2026-05-08 15:11:53 -07:00 committed by GitHub
commit 24b9de3bb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 178 additions and 26 deletions

View File

View File

@ -0,0 +1,54 @@
"""Centralized path-traversal validation for HTTP-exposed file paths.
Replaces ad-hoc checks (`'..' in filename`, `commonpath` comparisons, etc.)
that previously lived inline across server.py upload/view endpoints.
Issue: https://github.com/Comfy-Org/ComfyUI/issues/13742
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Union
PathLike = Union[str, os.PathLike]
def resolve_safe_path(base_dir: PathLike, *user_parts: PathLike) -> Path | None:
"""Resolve user-supplied path parts against a trusted base directory.
Joins ``user_parts`` to ``base_dir``, fully resolves the result (following
symlinks), and returns it only if it stays inside ``base_dir``. Returns
``None`` for any unsafe input null bytes, absolute user parts that escape
the base, ``..`` segments that walk above the base, symlinks that point
outside the base, or any OS-level resolution failure.
Callers must use the returned ``Path`` for filesystem operations and must
not re-join the original user input afterwards, or the validation is moot.
"""
for part in user_parts:
if _has_unsafe_chars(part):
return None
try:
base = Path(base_dir).resolve(strict=False)
except (OSError, ValueError):
return None
try:
joined = base.joinpath(*(os.fspath(p) for p in user_parts))
candidate = joined.resolve(strict=False)
except (OSError, ValueError):
return None
try:
candidate.relative_to(base)
except ValueError:
return None
return candidate
def _has_unsafe_chars(part: PathLike) -> bool:
s = os.fspath(part)
# NUL byte: some platforms truncate paths at \x00, which can defeat
# subsequent containment checks performed on the truncated string.
return "\x00" in s

View File

@ -29,6 +29,7 @@ import mimetypes
from comfy.cli_args import args
import comfy.utils
import comfy.model_management
from comfy.security.path_validator import resolve_safe_path
from comfy_api import feature_flags
import node_helpers
from comfyui_version import __version__
@ -396,11 +397,11 @@ class PromptServer():
return web.Response(status=400)
subfolder = post.get("subfolder", "")
full_output_folder = os.path.join(upload_dir, os.path.normpath(subfolder))
filepath = os.path.abspath(os.path.join(full_output_folder, filename))
if os.path.commonpath((upload_dir, filepath)) != upload_dir:
resolved = resolve_safe_path(upload_dir, subfolder, filename)
if resolved is None:
return web.Response(status=400)
filepath = str(resolved)
full_output_folder = str(resolved.parent)
if not os.path.exists(full_output_folder):
os.makedirs(full_output_folder)
@ -464,10 +465,6 @@ class PromptServer():
if not filename:
return web.Response(status=400)
# validation for security: prevent accessing arbitrary path
if filename[0] == '/' or '..' in filename:
return web.Response(status=400)
if output_dir is None:
type = original_ref.get("type", "output")
output_dir = folder_paths.get_directory_by_type(type)
@ -475,13 +472,11 @@ class PromptServer():
if output_dir is None:
return web.Response(status=400)
if original_ref.get("subfolder", "") != "":
full_output_dir = os.path.join(output_dir, original_ref["subfolder"])
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
return web.Response(status=403)
output_dir = full_output_dir
file = os.path.join(output_dir, filename)
subfolder = original_ref.get("subfolder", "")
resolved = resolve_safe_path(output_dir, subfolder, filename)
if resolved is None:
return web.Response(status=400)
file = str(resolved)
if os.path.isfile(file):
with Image.open(file) as original_pil:
@ -521,10 +516,6 @@ class PromptServer():
if not filename:
return web.Response(status=400)
# validation for security: prevent accessing arbitrary path
if filename[0] == '/' or '..' in filename:
return web.Response(status=400)
if output_dir is None:
type = request.rel_url.query.get("type", "output")
output_dir = folder_paths.get_directory_by_type(type)
@ -532,14 +523,12 @@ class PromptServer():
if output_dir is None:
return web.Response(status=400)
if "subfolder" in request.rel_url.query:
full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"])
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
return web.Response(status=403)
output_dir = full_output_dir
subfolder = request.rel_url.query.get("subfolder", "")
filename = os.path.basename(filename)
file = os.path.join(output_dir, filename)
resolved = resolve_safe_path(output_dir, subfolder, filename)
if resolved is None:
return web.Response(status=400)
file = str(resolved)
if os.path.isfile(file):
if 'preview' in request.rel_url.query:

View File

@ -0,0 +1,109 @@
import os
import sys
import tempfile
from pathlib import Path
import pytest
from comfy.security.path_validator import resolve_safe_path
@pytest.fixture
def base():
with tempfile.TemporaryDirectory() as tmp:
yield Path(tmp).resolve()
def test_simple_filename_inside_base(base):
result = resolve_safe_path(base, "foo.png")
assert result == base / "foo.png"
def test_subfolder_plus_filename(base):
result = resolve_safe_path(base, "subdir", "foo.png")
assert result == base / "subdir" / "foo.png"
def test_empty_subfolder(base):
result = resolve_safe_path(base, "", "foo.png")
assert result == base / "foo.png"
def test_dotdot_segment_rejected(base):
assert resolve_safe_path(base, "..", "etc") is None
assert resolve_safe_path(base, "../etc") is None
assert resolve_safe_path(base, "sub/../..", "etc") is None
def test_absolute_user_path_rejected(base):
if sys.platform == "win32":
assert resolve_safe_path(base, "C:\\Windows\\System32") is None
else:
assert resolve_safe_path(base, "/etc/passwd") is None
def test_null_byte_rejected(base):
assert resolve_safe_path(base, "foo\x00.png") is None
assert resolve_safe_path(base, "sub\x00", "foo.png") is None
def test_dotdot_inside_base_is_allowed(base):
(base / "sub").mkdir()
result = resolve_safe_path(base, "sub", "..", "foo.png")
assert result == base / "foo.png"
@pytest.mark.skipif(sys.platform == "win32", reason="symlink permissions on Windows CI")
def test_symlink_escape_rejected(base):
outside = Path(tempfile.mkdtemp())
try:
link = base / "escape"
link.symlink_to(outside)
assert resolve_safe_path(base, "escape", "secret") is None
finally:
# cleanup outside dir
try:
outside.rmdir()
except OSError:
pass
@pytest.mark.skipif(sys.platform == "win32", reason="symlink permissions on Windows CI")
def test_symlink_inside_base_is_allowed(base):
target = base / "real"
target.mkdir()
link = base / "alias"
link.symlink_to(target)
result = resolve_safe_path(base, "alias", "foo.png")
assert result == (target / "foo.png").resolve()
def test_base_canonicalizes_via_resolve(base):
# Pass an unresolved base (with trailing slash) and verify containment still works.
unresolved = str(base) + os.sep
assert resolve_safe_path(unresolved, "foo.png") == base / "foo.png"
def test_dot_segments_are_normalized(base):
assert resolve_safe_path(base, ".", "foo.png") == base / "foo.png"
assert resolve_safe_path(base, "./sub/./foo.png") == base / "sub" / "foo.png"
def test_path_objects_accepted(base):
assert resolve_safe_path(base, Path("foo.png")) == base / "foo.png"
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific path separators")
def test_windows_backslash_traversal_rejected(base):
assert resolve_safe_path(base, "..\\etc") is None
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific UNC paths")
def test_windows_unc_path_rejected(base):
assert resolve_safe_path(base, "\\\\server\\share\\file") is None
def test_returns_path_instance(base):
result = resolve_safe_path(base, "foo.png")
assert isinstance(result, Path)
assert result.is_absolute()