mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-05-10 09:12:31 +08:00
fix: unify path-traversal validation across server.py upload/view endpoints
This commit is contained in:
parent
6bcd8b96ab
commit
43672850ca
0
comfy/security/__init__.py
Normal file
0
comfy/security/__init__.py
Normal file
54
comfy/security/path_validator.py
Normal file
54
comfy/security/path_validator.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
"""Centralized path-traversal validation for HTTP-exposed file paths.
|
||||||
|
|
||||||
|
Replaces ad-hoc checks (`'..' in filename`, `commonpath` comparisons, etc.)
|
||||||
|
that previously lived inline across server.py upload/view endpoints.
|
||||||
|
Issue: https://github.com/Comfy-Org/ComfyUI/issues/13742
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
PathLike = Union[str, os.PathLike]
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_safe_path(base_dir: PathLike, *user_parts: PathLike) -> Path | None:
|
||||||
|
"""Resolve user-supplied path parts against a trusted base directory.
|
||||||
|
|
||||||
|
Joins ``user_parts`` to ``base_dir``, fully resolves the result (following
|
||||||
|
symlinks), and returns it only if it stays inside ``base_dir``. Returns
|
||||||
|
``None`` for any unsafe input — null bytes, absolute user parts that escape
|
||||||
|
the base, ``..`` segments that walk above the base, symlinks that point
|
||||||
|
outside the base, or any OS-level resolution failure.
|
||||||
|
|
||||||
|
Callers must use the returned ``Path`` for filesystem operations and must
|
||||||
|
not re-join the original user input afterwards, or the validation is moot.
|
||||||
|
"""
|
||||||
|
for part in user_parts:
|
||||||
|
if _has_unsafe_chars(part):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
base = Path(base_dir).resolve(strict=False)
|
||||||
|
except (OSError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
joined = base.joinpath(*(os.fspath(p) for p in user_parts))
|
||||||
|
candidate = joined.resolve(strict=False)
|
||||||
|
except (OSError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
candidate.relative_to(base)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return candidate
|
||||||
|
|
||||||
|
|
||||||
|
def _has_unsafe_chars(part: PathLike) -> bool:
|
||||||
|
s = os.fspath(part)
|
||||||
|
# NUL byte: some platforms truncate paths at \x00, which can defeat
|
||||||
|
# subsequent containment checks performed on the truncated string.
|
||||||
|
return "\x00" in s
|
||||||
41
server.py
41
server.py
@ -29,6 +29,7 @@ import mimetypes
|
|||||||
from comfy.cli_args import args
|
from comfy.cli_args import args
|
||||||
import comfy.utils
|
import comfy.utils
|
||||||
import comfy.model_management
|
import comfy.model_management
|
||||||
|
from comfy.security.path_validator import resolve_safe_path
|
||||||
from comfy_api import feature_flags
|
from comfy_api import feature_flags
|
||||||
import node_helpers
|
import node_helpers
|
||||||
from comfyui_version import __version__
|
from comfyui_version import __version__
|
||||||
@ -396,11 +397,11 @@ class PromptServer():
|
|||||||
return web.Response(status=400)
|
return web.Response(status=400)
|
||||||
|
|
||||||
subfolder = post.get("subfolder", "")
|
subfolder = post.get("subfolder", "")
|
||||||
full_output_folder = os.path.join(upload_dir, os.path.normpath(subfolder))
|
resolved = resolve_safe_path(upload_dir, subfolder, filename)
|
||||||
filepath = os.path.abspath(os.path.join(full_output_folder, filename))
|
if resolved is None:
|
||||||
|
|
||||||
if os.path.commonpath((upload_dir, filepath)) != upload_dir:
|
|
||||||
return web.Response(status=400)
|
return web.Response(status=400)
|
||||||
|
filepath = str(resolved)
|
||||||
|
full_output_folder = str(resolved.parent)
|
||||||
|
|
||||||
if not os.path.exists(full_output_folder):
|
if not os.path.exists(full_output_folder):
|
||||||
os.makedirs(full_output_folder)
|
os.makedirs(full_output_folder)
|
||||||
@ -464,10 +465,6 @@ class PromptServer():
|
|||||||
if not filename:
|
if not filename:
|
||||||
return web.Response(status=400)
|
return web.Response(status=400)
|
||||||
|
|
||||||
# validation for security: prevent accessing arbitrary path
|
|
||||||
if filename[0] == '/' or '..' in filename:
|
|
||||||
return web.Response(status=400)
|
|
||||||
|
|
||||||
if output_dir is None:
|
if output_dir is None:
|
||||||
type = original_ref.get("type", "output")
|
type = original_ref.get("type", "output")
|
||||||
output_dir = folder_paths.get_directory_by_type(type)
|
output_dir = folder_paths.get_directory_by_type(type)
|
||||||
@ -475,13 +472,11 @@ class PromptServer():
|
|||||||
if output_dir is None:
|
if output_dir is None:
|
||||||
return web.Response(status=400)
|
return web.Response(status=400)
|
||||||
|
|
||||||
if original_ref.get("subfolder", "") != "":
|
subfolder = original_ref.get("subfolder", "")
|
||||||
full_output_dir = os.path.join(output_dir, original_ref["subfolder"])
|
resolved = resolve_safe_path(output_dir, subfolder, filename)
|
||||||
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
|
if resolved is None:
|
||||||
return web.Response(status=403)
|
return web.Response(status=400)
|
||||||
output_dir = full_output_dir
|
file = str(resolved)
|
||||||
|
|
||||||
file = os.path.join(output_dir, filename)
|
|
||||||
|
|
||||||
if os.path.isfile(file):
|
if os.path.isfile(file):
|
||||||
with Image.open(file) as original_pil:
|
with Image.open(file) as original_pil:
|
||||||
@ -521,10 +516,6 @@ class PromptServer():
|
|||||||
if not filename:
|
if not filename:
|
||||||
return web.Response(status=400)
|
return web.Response(status=400)
|
||||||
|
|
||||||
# validation for security: prevent accessing arbitrary path
|
|
||||||
if filename[0] == '/' or '..' in filename:
|
|
||||||
return web.Response(status=400)
|
|
||||||
|
|
||||||
if output_dir is None:
|
if output_dir is None:
|
||||||
type = request.rel_url.query.get("type", "output")
|
type = request.rel_url.query.get("type", "output")
|
||||||
output_dir = folder_paths.get_directory_by_type(type)
|
output_dir = folder_paths.get_directory_by_type(type)
|
||||||
@ -532,14 +523,12 @@ class PromptServer():
|
|||||||
if output_dir is None:
|
if output_dir is None:
|
||||||
return web.Response(status=400)
|
return web.Response(status=400)
|
||||||
|
|
||||||
if "subfolder" in request.rel_url.query:
|
subfolder = request.rel_url.query.get("subfolder", "")
|
||||||
full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"])
|
|
||||||
if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir:
|
|
||||||
return web.Response(status=403)
|
|
||||||
output_dir = full_output_dir
|
|
||||||
|
|
||||||
filename = os.path.basename(filename)
|
filename = os.path.basename(filename)
|
||||||
file = os.path.join(output_dir, filename)
|
resolved = resolve_safe_path(output_dir, subfolder, filename)
|
||||||
|
if resolved is None:
|
||||||
|
return web.Response(status=400)
|
||||||
|
file = str(resolved)
|
||||||
|
|
||||||
if os.path.isfile(file):
|
if os.path.isfile(file):
|
||||||
if 'preview' in request.rel_url.query:
|
if 'preview' in request.rel_url.query:
|
||||||
|
|||||||
0
tests-unit/comfy_test/security/__init__.py
Normal file
0
tests-unit/comfy_test/security/__init__.py
Normal file
109
tests-unit/comfy_test/security/path_validator_test.py
Normal file
109
tests-unit/comfy_test/security/path_validator_test.py
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from comfy.security.path_validator import resolve_safe_path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def base():
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
yield Path(tmp).resolve()
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_filename_inside_base(base):
|
||||||
|
result = resolve_safe_path(base, "foo.png")
|
||||||
|
assert result == base / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_subfolder_plus_filename(base):
|
||||||
|
result = resolve_safe_path(base, "subdir", "foo.png")
|
||||||
|
assert result == base / "subdir" / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_subfolder(base):
|
||||||
|
result = resolve_safe_path(base, "", "foo.png")
|
||||||
|
assert result == base / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dotdot_segment_rejected(base):
|
||||||
|
assert resolve_safe_path(base, "..", "etc") is None
|
||||||
|
assert resolve_safe_path(base, "../etc") is None
|
||||||
|
assert resolve_safe_path(base, "sub/../..", "etc") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_absolute_user_path_rejected(base):
|
||||||
|
if sys.platform == "win32":
|
||||||
|
assert resolve_safe_path(base, "C:\\Windows\\System32") is None
|
||||||
|
else:
|
||||||
|
assert resolve_safe_path(base, "/etc/passwd") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_null_byte_rejected(base):
|
||||||
|
assert resolve_safe_path(base, "foo\x00.png") is None
|
||||||
|
assert resolve_safe_path(base, "sub\x00", "foo.png") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_dotdot_inside_base_is_allowed(base):
|
||||||
|
(base / "sub").mkdir()
|
||||||
|
result = resolve_safe_path(base, "sub", "..", "foo.png")
|
||||||
|
assert result == base / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.platform == "win32", reason="symlink permissions on Windows CI")
|
||||||
|
def test_symlink_escape_rejected(base):
|
||||||
|
outside = Path(tempfile.mkdtemp())
|
||||||
|
try:
|
||||||
|
link = base / "escape"
|
||||||
|
link.symlink_to(outside)
|
||||||
|
assert resolve_safe_path(base, "escape", "secret") is None
|
||||||
|
finally:
|
||||||
|
# cleanup outside dir
|
||||||
|
try:
|
||||||
|
outside.rmdir()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.platform == "win32", reason="symlink permissions on Windows CI")
|
||||||
|
def test_symlink_inside_base_is_allowed(base):
|
||||||
|
target = base / "real"
|
||||||
|
target.mkdir()
|
||||||
|
link = base / "alias"
|
||||||
|
link.symlink_to(target)
|
||||||
|
result = resolve_safe_path(base, "alias", "foo.png")
|
||||||
|
assert result == (target / "foo.png").resolve()
|
||||||
|
|
||||||
|
|
||||||
|
def test_base_canonicalizes_via_resolve(base):
|
||||||
|
# Pass an unresolved base (with trailing slash) and verify containment still works.
|
||||||
|
unresolved = str(base) + os.sep
|
||||||
|
assert resolve_safe_path(unresolved, "foo.png") == base / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dot_segments_are_normalized(base):
|
||||||
|
assert resolve_safe_path(base, ".", "foo.png") == base / "foo.png"
|
||||||
|
assert resolve_safe_path(base, "./sub/./foo.png") == base / "sub" / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_path_objects_accepted(base):
|
||||||
|
assert resolve_safe_path(base, Path("foo.png")) == base / "foo.png"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific path separators")
|
||||||
|
def test_windows_backslash_traversal_rejected(base):
|
||||||
|
assert resolve_safe_path(base, "..\\etc") is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific UNC paths")
|
||||||
|
def test_windows_unc_path_rejected(base):
|
||||||
|
assert resolve_safe_path(base, "\\\\server\\share\\file") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_returns_path_instance(base):
|
||||||
|
result = resolve_safe_path(base, "foo.png")
|
||||||
|
assert isinstance(result, Path)
|
||||||
|
assert result.is_absolute()
|
||||||
Loading…
Reference in New Issue
Block a user