ComfyUI-Manager/comfyui_manager/glob/utils/security_utils.py
Dr.Lt.Data 4410ebc6a6
Some checks are pending
Publish to PyPI / build-and-publish (push) Waiting to run
Python Linting / Run Ruff (push) Waiting to run
fix(security): harden CSRF with Content-Type gate and expand E2E coverage (#2818)
Defense-in-depth over GET→POST alone: reject the three CORS-safelisted
simple-form Content-Types (x-www-form-urlencoded, multipart/form-data,
text/plain) on 16 no-body POST handlers (glob + legacy) to block
<form method=POST> CSRF that bypasses method-only gating. Move
comfyui_switch_version to a JSON body so the preflight requirement applies.
Split db_mode/policy/update/channel_url_list into GET(read) + POST(write).
Tighten do_fix (high → high+) and gate three previously-ungated config
setters at middle. Resynchronize openapi.yaml (27 paths, 30 operations,
ComfyUISwitchVersionParams as a shared $ref component). Add E2E harness
variants, Playwright config, CSRF/secgate suites, 39-endpoint coverage,
and a CHANGELOG.

Breaking: legacy per-op POST routes (install/uninstall/fix/disable/update/
reinstall/abort_current) are removed; callers already use queue/batch.
Legacy /manager/notice (v1) is removed; /v2/manager/notice is retained.

Reported-by: XlabAI Team of Tencent Xuanwu Lab
CVSS: 8.1 (AV:N/AC:L/PR:N/UI:R/S:U/C:N/I:H/A:H)
2026-04-22 05:04:30 +09:00

76 lines
2.7 KiB
Python

from comfyui_manager.glob import manager_core as core
from comfy.cli_args import args
from comfyui_manager.data_models import SecurityLevel, RiskLevel, ManagerDatabaseSource
from comfyui_manager.common.manager_security import (
is_loopback,
is_safe_path_target,
get_safe_file_path,
reject_simple_form_post,
)
# Re-export for backward compatibility
__all__ = [
'is_loopback',
'is_safe_path_target',
'get_safe_file_path',
'reject_simple_form_post',
'is_allowed_security_level',
'get_risky_level',
]
def is_allowed_security_level(level):
is_local_mode = is_loopback(args.listen)
is_personal_cloud = core.get_config()['network_mode'].lower() == 'personal_cloud'
if level == RiskLevel.block.value:
return False
elif level == RiskLevel.high_.value:
if is_local_mode:
return core.get_config()['security_level'] in [SecurityLevel.weak.value, SecurityLevel.normal_.value]
elif is_personal_cloud:
return core.get_config()['security_level'] == SecurityLevel.weak.value
else:
return False
elif level == RiskLevel.high.value:
if is_local_mode:
return core.get_config()['security_level'] in [SecurityLevel.weak.value, SecurityLevel.normal_.value]
else:
return core.get_config()['security_level'] == SecurityLevel.weak.value
elif level == RiskLevel.middle_.value:
if is_local_mode or is_personal_cloud:
return core.get_config()['security_level'] in [SecurityLevel.weak.value, SecurityLevel.normal.value, SecurityLevel.normal_.value]
else:
return False
elif level == RiskLevel.middle.value:
return core.get_config()['security_level'] in [SecurityLevel.weak.value, SecurityLevel.normal.value, SecurityLevel.normal_.value]
else:
return True
async def get_risky_level(files, pip_packages):
json_data1 = await core.get_data_by_mode(ManagerDatabaseSource.local.value, "custom-node-list.json")
json_data2 = await core.get_data_by_mode(
ManagerDatabaseSource.cache.value,
"custom-node-list.json",
channel_url="https://raw.githubusercontent.com/ltdrdata/ComfyUI-Manager/main",
)
all_urls = set()
for x in json_data1["custom_nodes"] + json_data2["custom_nodes"]:
all_urls.update(x.get("files", []))
for x in files:
if x not in all_urls:
return RiskLevel.high_.value
all_pip_packages = set()
for x in json_data1["custom_nodes"] + json_data2["custom_nodes"]:
all_pip_packages.update(x.get("pip", []))
for p in pip_packages:
if p not in all_pip_packages:
return RiskLevel.block.value
return RiskLevel.middle_.value