From ef8703a3d7ab4e6ecda8f96e0c5816c23d1cb262 Mon Sep 17 00:00:00 2001 From: "Dr.Lt.Data" Date: Thu, 8 Jan 2026 18:35:03 +0900 Subject: [PATCH] security(api): add path traversal and CRLF injection protection - Add is_safe_path_target() and get_safe_file_path() utilities - Validate history id and snapshot target parameters in API endpoints - Sanitize config string values to prevent CRLF injection --- comfyui_manager/common/manager_security.py | 34 ++++++++++++++++++++ comfyui_manager/glob/manager_core.py | 7 +++- comfyui_manager/glob/manager_server.py | 24 +++++++++++--- comfyui_manager/glob/utils/security_utils.py | 14 ++++---- comfyui_manager/legacy/manager_core.py | 7 +++- comfyui_manager/legacy/manager_server.py | 23 ++++++++++--- pyproject.toml | 2 +- 7 files changed, 92 insertions(+), 19 deletions(-) diff --git a/comfyui_manager/common/manager_security.py b/comfyui_manager/common/manager_security.py index 6f0775aa..835ff9d8 100644 --- a/comfyui_manager/common/manager_security.py +++ b/comfyui_manager/common/manager_security.py @@ -1,4 +1,6 @@ +import os from enum import Enum +from typing import Optional is_personal_cloud_mode = False handler_policy = {} @@ -34,3 +36,35 @@ def add_handler_policy(x, policy): multiple_remote_alert = do_nothing + + +def is_safe_path_target(target: str) -> bool: + """ + Check if target string is safe from path traversal attacks. + + Args: + target: User-provided filename or identifier + + Returns: + True if safe, False if contains path traversal characters + """ + if '/' in target or '\\' in target or '..' in target or '\x00' in target: + return False + return True + + +def get_safe_file_path(target: str, base_dir: str, extension: str = ".json") -> Optional[str]: + """ + Safely construct a file path, preventing path traversal attacks. + + Args: + target: User-provided filename (without extension) + base_dir: Base directory path + extension: File extension to append (default: ".json") + + Returns: + Safe file path or None if input contains path traversal attempts + """ + if not is_safe_path_target(target): + return None + return os.path.join(base_dir, f"{target}{extension}") diff --git a/comfyui_manager/glob/manager_core.py b/comfyui_manager/glob/manager_core.py index aa29ccfe..f97cda7a 100644 --- a/comfyui_manager/glob/manager_core.py +++ b/comfyui_manager/glob/manager_core.py @@ -41,7 +41,7 @@ from ..common.enums import NetworkMode, SecurityLevel, DBMode from ..common import context -version_code = [4, 0, 4] +version_code = [4, 0, 5] version_str = f"V{version_code[0]}.{version_code[1]}" + (f'.{version_code[2]}' if len(version_code) > 2 else '') @@ -1619,6 +1619,11 @@ def write_config(): 'verbose': get_config()['verbose'], } + # Sanitize all string values to prevent CRLF injection attacks + for key, value in config['default'].items(): + if isinstance(value, str): + config['default'][key] = value.replace('\r', '').replace('\n', '').replace('\x00', '') + directory = os.path.dirname(context.manager_config_path) if not os.path.exists(directory): os.makedirs(directory) diff --git a/comfyui_manager/glob/manager_server.py b/comfyui_manager/glob/manager_server.py index 8ff83ec7..407aac91 100644 --- a/comfyui_manager/glob/manager_server.py +++ b/comfyui_manager/glob/manager_server.py @@ -1294,11 +1294,17 @@ async def get_history(request): try: # Handle file-based batch history if "id" in request.rel_url.query: - json_name = request.rel_url.query["id"] + ".json" - batch_path = os.path.join(context.manager_batch_history_path, json_name) + history_id = request.rel_url.query["id"] + + # Prevent path traversal attacks + batch_path = security_utils.get_safe_file_path(history_id, context.manager_batch_history_path) + if batch_path is None: + logging.warning(f"[Security] Invalid history id rejected: {history_id}") + return web.Response(text="Invalid history id", status=400) + logging.debug( "[ComfyUI-Manager] Fetching batch history: id=%s", - request.rel_url.query["id"], + history_id, ) with open(batch_path, "r", encoding="utf-8") as file: @@ -1520,7 +1526,11 @@ async def remove_snapshot(request): try: target = request.rel_url.query["target"] - path = os.path.join(context.manager_snapshot_path, f"{target}.json") + path = security_utils.get_safe_file_path(target, context.manager_snapshot_path) + if path is None: + logging.warning(f"[Security] Invalid snapshot target rejected: {target}") + return web.Response(text="Invalid target", status=400) + if os.path.exists(path): os.remove(path) @@ -1538,7 +1548,11 @@ async def restore_snapshot(request): try: target = request.rel_url.query["target"] - path = os.path.join(context.manager_snapshot_path, f"{target}.json") + path = security_utils.get_safe_file_path(target, context.manager_snapshot_path) + if path is None: + logging.warning(f"[Security] Invalid snapshot target rejected: {target}") + return web.Response(text="Invalid target", status=400) + if os.path.exists(path): if not os.path.exists(context.manager_startup_script_path): os.makedirs(context.manager_startup_script_path) diff --git a/comfyui_manager/glob/utils/security_utils.py b/comfyui_manager/glob/utils/security_utils.py index 1f2ac581..d6c332e9 100644 --- a/comfyui_manager/glob/utils/security_utils.py +++ b/comfyui_manager/glob/utils/security_utils.py @@ -1,14 +1,14 @@ from comfyui_manager.glob import manager_core as core from comfy.cli_args import args from comfyui_manager.data_models import SecurityLevel, RiskLevel, ManagerDatabaseSource +from comfyui_manager.common.manager_security import ( + is_loopback, + is_safe_path_target, + get_safe_file_path, +) - -def is_loopback(address): - import ipaddress - try: - return ipaddress.ip_address(address).is_loopback - except ValueError: - return False +# Re-export for backward compatibility +__all__ = ['is_loopback', 'is_safe_path_target', 'get_safe_file_path', 'is_allowed_security_level', 'get_risky_level'] def is_allowed_security_level(level): diff --git a/comfyui_manager/legacy/manager_core.py b/comfyui_manager/legacy/manager_core.py index bbf35f01..eaf5b8d8 100644 --- a/comfyui_manager/legacy/manager_core.py +++ b/comfyui_manager/legacy/manager_core.py @@ -42,7 +42,7 @@ from ..common.enums import NetworkMode, SecurityLevel, DBMode from ..common import context -version_code = [4, 0, 4] +version_code = [4, 0, 5] version_str = f"V{version_code[0]}.{version_code[1]}" + (f'.{version_code[2]}' if len(version_code) > 2 else '') @@ -1612,6 +1612,11 @@ def write_config(): 'db_mode': get_config()['db_mode'], } + # Sanitize all string values to prevent CRLF injection attacks + for key, value in config['default'].items(): + if isinstance(value, str): + config['default'][key] = value.replace('\r', '').replace('\n', '').replace('\x00', '') + directory = os.path.dirname(context.manager_config_path) if not os.path.exists(directory): os.makedirs(directory) diff --git a/comfyui_manager/legacy/manager_server.py b/comfyui_manager/legacy/manager_server.py index e463d4ef..02668171 100644 --- a/comfyui_manager/legacy/manager_server.py +++ b/comfyui_manager/legacy/manager_server.py @@ -814,8 +814,13 @@ async def get_history_list(request): @routes.get("/v2/manager/queue/history") async def get_history(request): try: - json_name = request.rel_url.query["id"]+'.json' - batch_path = os.path.join(context.manager_batch_history_path, json_name) + history_id = request.rel_url.query["id"] + + # Prevent path traversal attacks + batch_path = manager_security.get_safe_file_path(history_id, context.manager_batch_history_path) + if batch_path is None: + logging.warning(f"[Security] Invalid history id rejected: {history_id}") + return web.Response(text="Invalid history id", status=400) with open(batch_path, 'r', encoding='utf-8') as file: json_str = file.read() @@ -1159,7 +1164,12 @@ async def remove_snapshot(request): try: target = request.rel_url.query["target"] - path = os.path.join(context.manager_snapshot_path, f"{target}.json") + # Prevent path traversal attacks + path = manager_security.get_safe_file_path(target, context.manager_snapshot_path) + if path is None: + logging.warning(f"[Security] Invalid snapshot target rejected: {target}") + return web.Response(text="Invalid target", status=400) + if os.path.exists(path): os.remove(path) @@ -1177,7 +1187,12 @@ async def restore_snapshot(request): try: target = request.rel_url.query["target"] - path = os.path.join(context.manager_snapshot_path, f"{target}.json") + # Prevent path traversal attacks + path = manager_security.get_safe_file_path(target, context.manager_snapshot_path) + if path is None: + logging.warning(f"[Security] Invalid snapshot target rejected: {target}") + return web.Response(text="Invalid target", status=400) + if os.path.exists(path): if not os.path.exists(context.manager_startup_script_path): os.makedirs(context.manager_startup_script_path) diff --git a/pyproject.toml b/pyproject.toml index f19a58fc..16a77cc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "comfyui-manager" license = { text = "GPL-3.0-only" } -version = "4.0.4" +version = "4.0.5" requires-python = ">= 3.9" description = "ComfyUI-Manager provides features to install and manage custom nodes for ComfyUI, as well as various functionalities to assist with ComfyUI." readme = "README.md"