From 43672850ca73d2109f72bbe5610b55e9991e191b Mon Sep 17 00:00:00 2001 From: carladams1299-lab Date: Wed, 6 May 2026 19:29:47 -0400 Subject: [PATCH] fix: unify path-traversal validation across server.py upload/view endpoints --- comfy/security/__init__.py | 0 comfy/security/path_validator.py | 54 +++++++++ server.py | 41 +++---- tests-unit/comfy_test/security/__init__.py | 0 .../security/path_validator_test.py | 109 ++++++++++++++++++ 5 files changed, 178 insertions(+), 26 deletions(-) create mode 100644 comfy/security/__init__.py create mode 100644 comfy/security/path_validator.py create mode 100644 tests-unit/comfy_test/security/__init__.py create mode 100644 tests-unit/comfy_test/security/path_validator_test.py diff --git a/comfy/security/__init__.py b/comfy/security/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/comfy/security/path_validator.py b/comfy/security/path_validator.py new file mode 100644 index 000000000..bf3344879 --- /dev/null +++ b/comfy/security/path_validator.py @@ -0,0 +1,54 @@ +"""Centralized path-traversal validation for HTTP-exposed file paths. + +Replaces ad-hoc checks (`'..' in filename`, `commonpath` comparisons, etc.) +that previously lived inline across server.py upload/view endpoints. +Issue: https://github.com/Comfy-Org/ComfyUI/issues/13742 +""" +from __future__ import annotations + +import os +from pathlib import Path +from typing import Union + +PathLike = Union[str, os.PathLike] + + +def resolve_safe_path(base_dir: PathLike, *user_parts: PathLike) -> Path | None: + """Resolve user-supplied path parts against a trusted base directory. + + Joins ``user_parts`` to ``base_dir``, fully resolves the result (following + symlinks), and returns it only if it stays inside ``base_dir``. Returns + ``None`` for any unsafe input — null bytes, absolute user parts that escape + the base, ``..`` segments that walk above the base, symlinks that point + outside the base, or any OS-level resolution failure. + + Callers must use the returned ``Path`` for filesystem operations and must + not re-join the original user input afterwards, or the validation is moot. + """ + for part in user_parts: + if _has_unsafe_chars(part): + return None + + try: + base = Path(base_dir).resolve(strict=False) + except (OSError, ValueError): + return None + + try: + joined = base.joinpath(*(os.fspath(p) for p in user_parts)) + candidate = joined.resolve(strict=False) + except (OSError, ValueError): + return None + + try: + candidate.relative_to(base) + except ValueError: + return None + return candidate + + +def _has_unsafe_chars(part: PathLike) -> bool: + s = os.fspath(part) + # NUL byte: some platforms truncate paths at \x00, which can defeat + # subsequent containment checks performed on the truncated string. + return "\x00" in s diff --git a/server.py b/server.py index 2f3b438bb..efba9a7eb 100644 --- a/server.py +++ b/server.py @@ -29,6 +29,7 @@ import mimetypes from comfy.cli_args import args import comfy.utils import comfy.model_management +from comfy.security.path_validator import resolve_safe_path from comfy_api import feature_flags import node_helpers from comfyui_version import __version__ @@ -396,11 +397,11 @@ class PromptServer(): return web.Response(status=400) subfolder = post.get("subfolder", "") - full_output_folder = os.path.join(upload_dir, os.path.normpath(subfolder)) - filepath = os.path.abspath(os.path.join(full_output_folder, filename)) - - if os.path.commonpath((upload_dir, filepath)) != upload_dir: + resolved = resolve_safe_path(upload_dir, subfolder, filename) + if resolved is None: return web.Response(status=400) + filepath = str(resolved) + full_output_folder = str(resolved.parent) if not os.path.exists(full_output_folder): os.makedirs(full_output_folder) @@ -464,10 +465,6 @@ class PromptServer(): if not filename: return web.Response(status=400) - # validation for security: prevent accessing arbitrary path - if filename[0] == '/' or '..' in filename: - return web.Response(status=400) - if output_dir is None: type = original_ref.get("type", "output") output_dir = folder_paths.get_directory_by_type(type) @@ -475,13 +472,11 @@ class PromptServer(): if output_dir is None: return web.Response(status=400) - if original_ref.get("subfolder", "") != "": - full_output_dir = os.path.join(output_dir, original_ref["subfolder"]) - if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: - return web.Response(status=403) - output_dir = full_output_dir - - file = os.path.join(output_dir, filename) + subfolder = original_ref.get("subfolder", "") + resolved = resolve_safe_path(output_dir, subfolder, filename) + if resolved is None: + return web.Response(status=400) + file = str(resolved) if os.path.isfile(file): with Image.open(file) as original_pil: @@ -521,10 +516,6 @@ class PromptServer(): if not filename: return web.Response(status=400) - # validation for security: prevent accessing arbitrary path - if filename[0] == '/' or '..' in filename: - return web.Response(status=400) - if output_dir is None: type = request.rel_url.query.get("type", "output") output_dir = folder_paths.get_directory_by_type(type) @@ -532,14 +523,12 @@ class PromptServer(): if output_dir is None: return web.Response(status=400) - if "subfolder" in request.rel_url.query: - full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"]) - if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: - return web.Response(status=403) - output_dir = full_output_dir - + subfolder = request.rel_url.query.get("subfolder", "") filename = os.path.basename(filename) - file = os.path.join(output_dir, filename) + resolved = resolve_safe_path(output_dir, subfolder, filename) + if resolved is None: + return web.Response(status=400) + file = str(resolved) if os.path.isfile(file): if 'preview' in request.rel_url.query: diff --git a/tests-unit/comfy_test/security/__init__.py b/tests-unit/comfy_test/security/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests-unit/comfy_test/security/path_validator_test.py b/tests-unit/comfy_test/security/path_validator_test.py new file mode 100644 index 000000000..9dbc6b95c --- /dev/null +++ b/tests-unit/comfy_test/security/path_validator_test.py @@ -0,0 +1,109 @@ +import os +import sys +import tempfile +from pathlib import Path + +import pytest + +from comfy.security.path_validator import resolve_safe_path + + +@pytest.fixture +def base(): + with tempfile.TemporaryDirectory() as tmp: + yield Path(tmp).resolve() + + +def test_simple_filename_inside_base(base): + result = resolve_safe_path(base, "foo.png") + assert result == base / "foo.png" + + +def test_subfolder_plus_filename(base): + result = resolve_safe_path(base, "subdir", "foo.png") + assert result == base / "subdir" / "foo.png" + + +def test_empty_subfolder(base): + result = resolve_safe_path(base, "", "foo.png") + assert result == base / "foo.png" + + +def test_dotdot_segment_rejected(base): + assert resolve_safe_path(base, "..", "etc") is None + assert resolve_safe_path(base, "../etc") is None + assert resolve_safe_path(base, "sub/../..", "etc") is None + + +def test_absolute_user_path_rejected(base): + if sys.platform == "win32": + assert resolve_safe_path(base, "C:\\Windows\\System32") is None + else: + assert resolve_safe_path(base, "/etc/passwd") is None + + +def test_null_byte_rejected(base): + assert resolve_safe_path(base, "foo\x00.png") is None + assert resolve_safe_path(base, "sub\x00", "foo.png") is None + + +def test_dotdot_inside_base_is_allowed(base): + (base / "sub").mkdir() + result = resolve_safe_path(base, "sub", "..", "foo.png") + assert result == base / "foo.png" + + +@pytest.mark.skipif(sys.platform == "win32", reason="symlink permissions on Windows CI") +def test_symlink_escape_rejected(base): + outside = Path(tempfile.mkdtemp()) + try: + link = base / "escape" + link.symlink_to(outside) + assert resolve_safe_path(base, "escape", "secret") is None + finally: + # cleanup outside dir + try: + outside.rmdir() + except OSError: + pass + + +@pytest.mark.skipif(sys.platform == "win32", reason="symlink permissions on Windows CI") +def test_symlink_inside_base_is_allowed(base): + target = base / "real" + target.mkdir() + link = base / "alias" + link.symlink_to(target) + result = resolve_safe_path(base, "alias", "foo.png") + assert result == (target / "foo.png").resolve() + + +def test_base_canonicalizes_via_resolve(base): + # Pass an unresolved base (with trailing slash) and verify containment still works. + unresolved = str(base) + os.sep + assert resolve_safe_path(unresolved, "foo.png") == base / "foo.png" + + +def test_dot_segments_are_normalized(base): + assert resolve_safe_path(base, ".", "foo.png") == base / "foo.png" + assert resolve_safe_path(base, "./sub/./foo.png") == base / "sub" / "foo.png" + + +def test_path_objects_accepted(base): + assert resolve_safe_path(base, Path("foo.png")) == base / "foo.png" + + +@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific path separators") +def test_windows_backslash_traversal_rejected(base): + assert resolve_safe_path(base, "..\\etc") is None + + +@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific UNC paths") +def test_windows_unc_path_rejected(base): + assert resolve_safe_path(base, "\\\\server\\share\\file") is None + + +def test_returns_path_instance(base): + result = resolve_safe_path(base, "foo.png") + assert isinstance(result, Path) + assert result.is_absolute()