From 03aaf1b7b708bfbda30d95c0ca5ad156f0f792ca Mon Sep 17 00:00:00 2001 From: Deepanjan Roy Date: Fri, 30 May 2025 12:00:47 -0400 Subject: [PATCH] Add sandbox behind --enable-sandbox flag --- .gitignore | 1 + comfy/cli_args.py | 7 + folder_paths.py | 9 +- main.py | 31 ++++ requirements.txt | 3 + sandbox/setup_sandbox_permissions.bat | 29 ++++ sandbox/windows_sandbox.py | 163 ++++++++++++++++++++++ tests-unit/comfy_test/folder_path_test.py | 2 +- tests-unit/comfy_test/sandbox_test.py | 36 +++++ 9 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 sandbox/setup_sandbox_permissions.bat create mode 100644 sandbox/windows_sandbox.py create mode 100644 tests-unit/comfy_test/sandbox_test.py diff --git a/.gitignore b/.gitignore index 4e8cea71e..dcb2a56a3 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ web_custom_versions/ openapi.yaml filtered-openapi.yaml uv.lock +/write-permitted/ diff --git a/comfy/cli_args.py b/comfy/cli_args.py index 741ecac3f..a3187ac90 100644 --- a/comfy/cli_args.py +++ b/comfy/cli_args.py @@ -208,6 +208,13 @@ database_default_path = os.path.abspath( ) parser.add_argument("--database-url", type=str, default=f"sqlite:///{database_default_path}", help="Specify the database URL, e.g. for an in-memory database you can use 'sqlite:///:memory:'.") +parser.add_argument( + "--enable-sandbox", + default=False, + action="store_true", + help="Enable sandbox mode. (default: False)", +) + if comfy.options.args_parsing: args = parser.parse_args() else: diff --git a/folder_paths.py b/folder_paths.py index f0b3fd103..10d145d3f 100644 --- a/folder_paths.py +++ b/folder_paths.py @@ -47,10 +47,15 @@ folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")] folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""}) output_directory = os.path.join(base_path, "output") -temp_directory = os.path.join(base_path, "temp") input_directory = os.path.join(base_path, "input") user_directory = os.path.join(base_path, "user") +write_permitted_base_dir = os.path.join(base_path, "write-permitted") +# Temp is a subdirectory of write-permitted so the entire directory can be +# deleted and recreated as needed. +temp_directory = os.path.join(write_permitted_base_dir, "temp") + + filename_list_cache: dict[str, tuple[list[str], dict[str, float], float]] = {} class CacheHelper: @@ -130,6 +135,8 @@ def set_user_directory(user_dir: str) -> None: global user_directory user_directory = user_dir +def get_write_permitted_base_directory() -> str: + return write_permitted_base_dir #NOTE: used in http server so don't put folders that should not be accessed remotely def get_directory_by_type(type_name: str) -> str | None: diff --git a/main.py b/main.py index c8c4194d4..0b8a69354 100644 --- a/main.py +++ b/main.py @@ -10,12 +10,15 @@ from app.logger import setup_logger import itertools import utils.extra_config import logging +from sandbox import windows_sandbox import sys +import subprocess if __name__ == "__main__": #NOTE: These do not do anything on core ComfyUI, they are for custom nodes. os.environ['HF_HUB_DISABLE_TELEMETRY'] = '1' os.environ['DO_NOT_TRACK'] = '1' + setup_logger(log_level=args.verbose, use_stdout=args.log_stdout) @@ -54,6 +57,31 @@ def apply_custom_paths(): folder_paths.set_user_directory(user_dir) +def try_enable_sandbox(): + if not args.enable_sandbox: return + + # Sandbox is only supported on Windows + if os.name != 'nt': return + + if any([ + args.output_directory, + args.user_directory, + args.base_directory, + args.temp_directory + ]): + # Note: If we ever support custom directories, we should warn users if + # the directories are in a senstive location (e.g. a high level + # directory like C:\ or the user's home directory). + raise Exception("Sandbox mode is not supported when using --output-directory, " + "--user-directory, --base-directory, or --temp-directory.") + + success = windows_sandbox.try_enable_sandbox() + + if not success: + raise Exception("Unable to run ComfyUI with sandbox enabled. " + "You can rerun without --enable-sandbox.") + + def execute_prestartup_script(): def execute_script(script_path): module_name = os.path.splitext(script_path)[0] @@ -95,6 +123,7 @@ def execute_prestartup_script(): logging.info("") apply_custom_paths() +try_enable_sandbox() # Must run before executing custom node prestartup scripts execute_prestartup_script() @@ -257,6 +286,8 @@ def start_comfyui(asyncio_loop=None): folder_paths.set_temp_directory(temp_dir) cleanup_temp() + os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) + if args.windows_standalone_build: try: import new_updater diff --git a/requirements.txt b/requirements.txt index 94dafea58..c3ea016c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,3 +28,6 @@ soundfile av>=14.2.0 pydantic~=2.0 pydantic-settings~=2.0 + +# Windows only dependencies: +pywin32 ; sys_platform == "win32" diff --git a/sandbox/setup_sandbox_permissions.bat b/sandbox/setup_sandbox_permissions.bat new file mode 100644 index 000000000..c301a6a30 --- /dev/null +++ b/sandbox/setup_sandbox_permissions.bat @@ -0,0 +1,29 @@ +@echo off + +rem Check if any arguments were provided +if "%~1"=="" ( + echo No folders specified. Please provide folder names as arguments. + echo Usage: %~nx0 folder1 folder2 folder3 ... + exit /b 1 +) + +rem Process each argument as a folder +:process_folders +if "%~1"=="" goto :done + if not exist "%~1" ( + echo Creating directory: %~1 + mkdir "%~1" + ) + echo icacls "%~1" /setintegritylevel "(OI)(CI)Low" + icacls "%~1" /setintegritylevel "(OI)(CI)Low" || goto :errorexit + shift + goto :process_folders + +:done +echo Permissions set up successfully +exit /b 0 + +:errorexit +echo Sandbox permission setup script failed +pause +exit /b 1 \ No newline at end of file diff --git a/sandbox/windows_sandbox.py b/sandbox/windows_sandbox.py new file mode 100644 index 000000000..3fe643b67 --- /dev/null +++ b/sandbox/windows_sandbox.py @@ -0,0 +1,163 @@ +import logging +import win32con +import win32process +import win32security +import subprocess +import os +from win32com.shell import shellcon, shell +import win32api +import win32event +import folder_paths + + +LOW_INTEGRITY_SID_STRING = "S-1-16-4096" + +# Use absolute path to prevent command injection +ICACLS_PATH = r"C:\Windows\System32\icacls.exe" + + +def set_process_integrity_level_to_low(): + # Get current process handle + current_process = win32process.GetCurrentProcess() + + # Open process token + token = win32security.OpenProcessToken( + current_process, + win32con.TOKEN_ALL_ACCESS, + ) + + low_integrity_sid = win32security.ConvertStringSidToSid(LOW_INTEGRITY_SID_STRING) + + # Set the integrity level of the token + win32security.SetTokenInformation( + token, win32security.TokenIntegrityLevel, (low_integrity_sid, 0) + ) + + logging.info("Sandbox enabled: Process now running with low integrity token") + + +def permits_low_integrity_write(icacls_output): + permissions = [l.strip() for l in icacls_output.split("\n")] + LOW_INTEGRITY_LABEL = r"Mandatory Label\Low Mandatory Level" + + for p in permissions: + if LOW_INTEGRITY_LABEL not in p: + continue + + # Check the Low integrity label line - it should be something like + # Mandatory Label\Low Mandatory Level:(OI)(CI)(NW) or + # Mandatory Label\Low Mandatory Level:(I)(OI)(CI)(NW) + + # Note: This is a bit of a crude check with icacls - it is possible for + # a low integrity process to have write access to a directory without + # having these exact ACLs reported by icacls. Implement a more robust + # check if this situation ever occurs. + return all( + [ + # OI: Object Inheritance - all files in the directory with have low + # integrity + "(OI)" in p, + # CI: Container Inheritance - all subdirectories will have low + # integrity + "(CI)" in p, + # NW: No Writeup - processes with lower integrity cannot write to + # this directory + "(NW)" in p, + ] + ) + + +def path_is_low_integrity_writable(path): + """Check if the directory has a writable ACL by low integrity process""" + result = subprocess.run([ICACLS_PATH, path], capture_output=True, text=True) + + if result.returncode != 0: + # icacls command failed. Can happen because path doesn't exist + # or we're not allowed to access acl information of the path. + return False + + return permits_low_integrity_write(result.stdout) + + +def ensure_directories_exist(dirs): + for dir in dirs: + os.makedirs(dir, exist_ok=True) + + +def check_directory_acls(dirs): + acls_correct = True + for dir in dirs: + if not path_is_low_integrity_writable(dir): + logging.info( + f'Directory "{dir}" must be writable by low integrity ' + "processes for sandbox mode." + ) + acls_correct = False + + return acls_correct + + +def setup_permissions(dirs): + script_dir = os.path.dirname(os.path.abspath(__file__)) + bat_path = os.path.join(script_dir, "setup_sandbox_permissions.bat") + + execute_info = { + "lpVerb": "runas", # Run as administrator + "lpFile": bat_path, + "lpParameters": " ".join(dirs), + "nShow": win32con.SW_SHOWNORMAL, + # This flag is necessary to wait for the process to finish. + "fMask": shellcon.SEE_MASK_NOCLOSEPROCESS, + } + + # This is equivalent to right-clicking the bat file and selecting "Run as + # administrator" + proc_info = shell.ShellExecuteEx(**execute_info) + hProcess = proc_info["hProcess"] + + # Setup script should take a few milliseconds. Time out at 10 seconds. + win32event.WaitForSingleObject(hProcess, 10 * 1000) + exit_code = win32process.GetExitCodeProcess(hProcess) + + try: + if exit_code == win32con.STATUS_PENDING: + raise Exception("Sandbox permission script timed out") + if exit_code != 0: + raise Exception( + "Sandbox permission setup script failed. " f"Exit code: {exit_code}" + ) + finally: + win32api.CloseHandle(hProcess) + + +def try_enable_sandbox(): + write_permitted_dirs = [ + folder_paths.get_write_permitted_base_directory(), + folder_paths.get_output_directory(), + folder_paths.get_user_directory(), + ] + + ensure_directories_exist(write_permitted_dirs) + + if check_directory_acls(write_permitted_dirs): + set_process_integrity_level_to_low() + return True + + # Directory permissions are not set up correctly. Try to fix. + logging.critical( + "Some directories do not have the correct permissions for sandbox mode " + "to work. Would you like ComfyUI to fix these permissions? You will " + "receive a UAC elevation prompt. [y/n]" + ) + if input() != "y": + return False + + setup_permissions(write_permitted_dirs) + + # Check directory permissions again before enabling sandbox. + if check_directory_acls(write_permitted_dirs): + set_process_integrity_level_to_low() + return True + + # Directory permissions are still not set up correctly. Give up. + return False diff --git a/tests-unit/comfy_test/folder_path_test.py b/tests-unit/comfy_test/folder_path_test.py index 775e15c36..b93c4093a 100644 --- a/tests-unit/comfy_test/folder_path_test.py +++ b/tests-unit/comfy_test/folder_path_test.py @@ -124,7 +124,7 @@ def test_base_path_changes(set_base_dir): assert folder_paths.models_dir == os.path.join(test_dir, "models") assert folder_paths.input_directory == os.path.join(test_dir, "input") assert folder_paths.output_directory == os.path.join(test_dir, "output") - assert folder_paths.temp_directory == os.path.join(test_dir, "temp") + assert folder_paths.temp_directory == os.path.join(test_dir, "write-permitted", "temp") assert folder_paths.user_directory == os.path.join(test_dir, "user") assert os.path.join(test_dir, "custom_nodes") in folder_paths.get_folder_paths("custom_nodes") diff --git a/tests-unit/comfy_test/sandbox_test.py b/tests-unit/comfy_test/sandbox_test.py new file mode 100644 index 000000000..eb6dd9bc1 --- /dev/null +++ b/tests-unit/comfy_test/sandbox_test.py @@ -0,0 +1,36 @@ +import pytest +from sandbox import windows_sandbox + +def test_icacl_no_low_integrity_label(): + icacl_output = r""" + foo NT AUTHORITY\SYSTEM:(OI)(CI)(F) + """ + assert not windows_sandbox.permits_low_integrity_write(icacl_output) + +def test_icacl_missing_inherit_flags(): + icacl_output = r""" + foo Mandatory Label\Low Mandatory Level:(NW) + """ + assert not windows_sandbox.permits_low_integrity_write(icacl_output) + + icacl_output = r""" + foo Mandatory Label\Low Mandatory Level:(OI)(NW) + """ + assert not windows_sandbox.permits_low_integrity_write(icacl_output) + + icacl_output = r""" + foo Mandatory Label\Low Mandatory Level:(CI)(NW) + """ + assert not windows_sandbox.permits_low_integrity_write(icacl_output) + +def test_icacl_correct_acls(): + icacl_output = r""" + foo Mandatory Label\Low Mandatory Level:(I)(OI)(CI)(NW) + """ + assert windows_sandbox.permits_low_integrity_write(icacl_output) + + icacl_output = r""" + foo Mandatory Label\Low Mandatory Level:(OI)(CI)(NW) + """ + assert windows_sandbox.permits_low_integrity_write(icacl_output) + \ No newline at end of file