security(api): add path traversal and CRLF injection protection
Some checks failed
Publish to PyPI / build-and-publish (push) Has been cancelled
Python Linting / Run Ruff (push) Has been cancelled

- Add is_safe_path_target() and get_safe_file_path() utilities
- Validate history id and snapshot target parameters in API endpoints
- Sanitize config string values to prevent CRLF injection
This commit is contained in:
Dr.Lt.Data 2026-01-08 18:35:03 +09:00
parent a4138a89ee
commit ef8703a3d7
7 changed files with 92 additions and 19 deletions

View File

@ -1,4 +1,6 @@
import os
from enum import Enum
from typing import Optional
is_personal_cloud_mode = False
handler_policy = {}
@ -34,3 +36,35 @@ def add_handler_policy(x, policy):
multiple_remote_alert = do_nothing
def is_safe_path_target(target: str) -> bool:
"""
Check if target string is safe from path traversal attacks.
Args:
target: User-provided filename or identifier
Returns:
True if safe, False if contains path traversal characters
"""
if '/' in target or '\\' in target or '..' in target or '\x00' in target:
return False
return True
def get_safe_file_path(target: str, base_dir: str, extension: str = ".json") -> Optional[str]:
"""
Safely construct a file path, preventing path traversal attacks.
Args:
target: User-provided filename (without extension)
base_dir: Base directory path
extension: File extension to append (default: ".json")
Returns:
Safe file path or None if input contains path traversal attempts
"""
if not is_safe_path_target(target):
return None
return os.path.join(base_dir, f"{target}{extension}")

View File

@ -41,7 +41,7 @@ from ..common.enums import NetworkMode, SecurityLevel, DBMode
from ..common import context
version_code = [4, 0, 4]
version_code = [4, 0, 5]
version_str = f"V{version_code[0]}.{version_code[1]}" + (f'.{version_code[2]}' if len(version_code) > 2 else '')
@ -1619,6 +1619,11 @@ def write_config():
'verbose': get_config()['verbose'],
}
# Sanitize all string values to prevent CRLF injection attacks
for key, value in config['default'].items():
if isinstance(value, str):
config['default'][key] = value.replace('\r', '').replace('\n', '').replace('\x00', '')
directory = os.path.dirname(context.manager_config_path)
if not os.path.exists(directory):
os.makedirs(directory)

View File

@ -1294,11 +1294,17 @@ async def get_history(request):
try:
# Handle file-based batch history
if "id" in request.rel_url.query:
json_name = request.rel_url.query["id"] + ".json"
batch_path = os.path.join(context.manager_batch_history_path, json_name)
history_id = request.rel_url.query["id"]
# Prevent path traversal attacks
batch_path = security_utils.get_safe_file_path(history_id, context.manager_batch_history_path)
if batch_path is None:
logging.warning(f"[Security] Invalid history id rejected: {history_id}")
return web.Response(text="Invalid history id", status=400)
logging.debug(
"[ComfyUI-Manager] Fetching batch history: id=%s",
request.rel_url.query["id"],
history_id,
)
with open(batch_path, "r", encoding="utf-8") as file:
@ -1520,7 +1526,11 @@ async def remove_snapshot(request):
try:
target = request.rel_url.query["target"]
path = os.path.join(context.manager_snapshot_path, f"{target}.json")
path = security_utils.get_safe_file_path(target, context.manager_snapshot_path)
if path is None:
logging.warning(f"[Security] Invalid snapshot target rejected: {target}")
return web.Response(text="Invalid target", status=400)
if os.path.exists(path):
os.remove(path)
@ -1538,7 +1548,11 @@ async def restore_snapshot(request):
try:
target = request.rel_url.query["target"]
path = os.path.join(context.manager_snapshot_path, f"{target}.json")
path = security_utils.get_safe_file_path(target, context.manager_snapshot_path)
if path is None:
logging.warning(f"[Security] Invalid snapshot target rejected: {target}")
return web.Response(text="Invalid target", status=400)
if os.path.exists(path):
if not os.path.exists(context.manager_startup_script_path):
os.makedirs(context.manager_startup_script_path)

View File

@ -1,14 +1,14 @@
from comfyui_manager.glob import manager_core as core
from comfy.cli_args import args
from comfyui_manager.data_models import SecurityLevel, RiskLevel, ManagerDatabaseSource
from comfyui_manager.common.manager_security import (
is_loopback,
is_safe_path_target,
get_safe_file_path,
)
def is_loopback(address):
import ipaddress
try:
return ipaddress.ip_address(address).is_loopback
except ValueError:
return False
# Re-export for backward compatibility
__all__ = ['is_loopback', 'is_safe_path_target', 'get_safe_file_path', 'is_allowed_security_level', 'get_risky_level']
def is_allowed_security_level(level):

View File

@ -42,7 +42,7 @@ from ..common.enums import NetworkMode, SecurityLevel, DBMode
from ..common import context
version_code = [4, 0, 4]
version_code = [4, 0, 5]
version_str = f"V{version_code[0]}.{version_code[1]}" + (f'.{version_code[2]}' if len(version_code) > 2 else '')
@ -1612,6 +1612,11 @@ def write_config():
'db_mode': get_config()['db_mode'],
}
# Sanitize all string values to prevent CRLF injection attacks
for key, value in config['default'].items():
if isinstance(value, str):
config['default'][key] = value.replace('\r', '').replace('\n', '').replace('\x00', '')
directory = os.path.dirname(context.manager_config_path)
if not os.path.exists(directory):
os.makedirs(directory)

View File

@ -814,8 +814,13 @@ async def get_history_list(request):
@routes.get("/v2/manager/queue/history")
async def get_history(request):
try:
json_name = request.rel_url.query["id"]+'.json'
batch_path = os.path.join(context.manager_batch_history_path, json_name)
history_id = request.rel_url.query["id"]
# Prevent path traversal attacks
batch_path = manager_security.get_safe_file_path(history_id, context.manager_batch_history_path)
if batch_path is None:
logging.warning(f"[Security] Invalid history id rejected: {history_id}")
return web.Response(text="Invalid history id", status=400)
with open(batch_path, 'r', encoding='utf-8') as file:
json_str = file.read()
@ -1159,7 +1164,12 @@ async def remove_snapshot(request):
try:
target = request.rel_url.query["target"]
path = os.path.join(context.manager_snapshot_path, f"{target}.json")
# Prevent path traversal attacks
path = manager_security.get_safe_file_path(target, context.manager_snapshot_path)
if path is None:
logging.warning(f"[Security] Invalid snapshot target rejected: {target}")
return web.Response(text="Invalid target", status=400)
if os.path.exists(path):
os.remove(path)
@ -1177,7 +1187,12 @@ async def restore_snapshot(request):
try:
target = request.rel_url.query["target"]
path = os.path.join(context.manager_snapshot_path, f"{target}.json")
# Prevent path traversal attacks
path = manager_security.get_safe_file_path(target, context.manager_snapshot_path)
if path is None:
logging.warning(f"[Security] Invalid snapshot target rejected: {target}")
return web.Response(text="Invalid target", status=400)
if os.path.exists(path):
if not os.path.exists(context.manager_startup_script_path):
os.makedirs(context.manager_startup_script_path)

View File

@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "comfyui-manager"
license = { text = "GPL-3.0-only" }
version = "4.0.4"
version = "4.0.5"
requires-python = ">= 3.9"
description = "ComfyUI-Manager provides features to install and manage custom nodes for ComfyUI, as well as various functionalities to assist with ComfyUI."
readme = "README.md"