Add sandbox behind --enable-sandbox flag

This commit is contained in:
Deepanjan Roy 2025-05-30 12:00:47 -04:00
parent c69af655aa
commit 03aaf1b7b7
9 changed files with 279 additions and 2 deletions

1
.gitignore vendored
View File

@ -24,3 +24,4 @@ web_custom_versions/
openapi.yaml
filtered-openapi.yaml
uv.lock
/write-permitted/

View File

@ -208,6 +208,13 @@ database_default_path = os.path.abspath(
)
parser.add_argument("--database-url", type=str, default=f"sqlite:///{database_default_path}", help="Specify the database URL, e.g. for an in-memory database you can use 'sqlite:///:memory:'.")
parser.add_argument(
"--enable-sandbox",
default=False,
action="store_true",
help="Enable sandbox mode. (default: False)",
)
if comfy.options.args_parsing:
args = parser.parse_args()
else:

View File

@ -47,10 +47,15 @@ folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")]
folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""})
output_directory = os.path.join(base_path, "output")
temp_directory = os.path.join(base_path, "temp")
input_directory = os.path.join(base_path, "input")
user_directory = os.path.join(base_path, "user")
write_permitted_base_dir = os.path.join(base_path, "write-permitted")
# Temp is a subdirectory of write-permitted so the entire directory can be
# deleted and recreated as needed.
temp_directory = os.path.join(write_permitted_base_dir, "temp")
filename_list_cache: dict[str, tuple[list[str], dict[str, float], float]] = {}
class CacheHelper:
@ -130,6 +135,8 @@ def set_user_directory(user_dir: str) -> None:
global user_directory
user_directory = user_dir
def get_write_permitted_base_directory() -> str:
return write_permitted_base_dir
#NOTE: used in http server so don't put folders that should not be accessed remotely
def get_directory_by_type(type_name: str) -> str | None:

31
main.py
View File

@ -10,12 +10,15 @@ from app.logger import setup_logger
import itertools
import utils.extra_config
import logging
from sandbox import windows_sandbox
import sys
import subprocess
if __name__ == "__main__":
#NOTE: These do not do anything on core ComfyUI, they are for custom nodes.
os.environ['HF_HUB_DISABLE_TELEMETRY'] = '1'
os.environ['DO_NOT_TRACK'] = '1'
setup_logger(log_level=args.verbose, use_stdout=args.log_stdout)
@ -54,6 +57,31 @@ def apply_custom_paths():
folder_paths.set_user_directory(user_dir)
def try_enable_sandbox():
if not args.enable_sandbox: return
# Sandbox is only supported on Windows
if os.name != 'nt': return
if any([
args.output_directory,
args.user_directory,
args.base_directory,
args.temp_directory
]):
# Note: If we ever support custom directories, we should warn users if
# the directories are in a senstive location (e.g. a high level
# directory like C:\ or the user's home directory).
raise Exception("Sandbox mode is not supported when using --output-directory, "
"--user-directory, --base-directory, or --temp-directory.")
success = windows_sandbox.try_enable_sandbox()
if not success:
raise Exception("Unable to run ComfyUI with sandbox enabled. "
"You can rerun without --enable-sandbox.")
def execute_prestartup_script():
def execute_script(script_path):
module_name = os.path.splitext(script_path)[0]
@ -95,6 +123,7 @@ def execute_prestartup_script():
logging.info("")
apply_custom_paths()
try_enable_sandbox() # Must run before executing custom node prestartup scripts
execute_prestartup_script()
@ -257,6 +286,8 @@ def start_comfyui(asyncio_loop=None):
folder_paths.set_temp_directory(temp_dir)
cleanup_temp()
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True)
if args.windows_standalone_build:
try:
import new_updater

View File

@ -28,3 +28,6 @@ soundfile
av>=14.2.0
pydantic~=2.0
pydantic-settings~=2.0
# Windows only dependencies:
pywin32 ; sys_platform == "win32"

View File

@ -0,0 +1,29 @@
@echo off
rem Check if any arguments were provided
if "%~1"=="" (
echo No folders specified. Please provide folder names as arguments.
echo Usage: %~nx0 folder1 folder2 folder3 ...
exit /b 1
)
rem Process each argument as a folder
:process_folders
if "%~1"=="" goto :done
if not exist "%~1" (
echo Creating directory: %~1
mkdir "%~1"
)
echo icacls "%~1" /setintegritylevel "(OI)(CI)Low"
icacls "%~1" /setintegritylevel "(OI)(CI)Low" || goto :errorexit
shift
goto :process_folders
:done
echo Permissions set up successfully
exit /b 0
:errorexit
echo Sandbox permission setup script failed
pause
exit /b 1

163
sandbox/windows_sandbox.py Normal file
View File

@ -0,0 +1,163 @@
import logging
import win32con
import win32process
import win32security
import subprocess
import os
from win32com.shell import shellcon, shell
import win32api
import win32event
import folder_paths
LOW_INTEGRITY_SID_STRING = "S-1-16-4096"
# Use absolute path to prevent command injection
ICACLS_PATH = r"C:\Windows\System32\icacls.exe"
def set_process_integrity_level_to_low():
# Get current process handle
current_process = win32process.GetCurrentProcess()
# Open process token
token = win32security.OpenProcessToken(
current_process,
win32con.TOKEN_ALL_ACCESS,
)
low_integrity_sid = win32security.ConvertStringSidToSid(LOW_INTEGRITY_SID_STRING)
# Set the integrity level of the token
win32security.SetTokenInformation(
token, win32security.TokenIntegrityLevel, (low_integrity_sid, 0)
)
logging.info("Sandbox enabled: Process now running with low integrity token")
def permits_low_integrity_write(icacls_output):
permissions = [l.strip() for l in icacls_output.split("\n")]
LOW_INTEGRITY_LABEL = r"Mandatory Label\Low Mandatory Level"
for p in permissions:
if LOW_INTEGRITY_LABEL not in p:
continue
# Check the Low integrity label line - it should be something like
# Mandatory Label\Low Mandatory Level:(OI)(CI)(NW) or
# Mandatory Label\Low Mandatory Level:(I)(OI)(CI)(NW)
# Note: This is a bit of a crude check with icacls - it is possible for
# a low integrity process to have write access to a directory without
# having these exact ACLs reported by icacls. Implement a more robust
# check if this situation ever occurs.
return all(
[
# OI: Object Inheritance - all files in the directory with have low
# integrity
"(OI)" in p,
# CI: Container Inheritance - all subdirectories will have low
# integrity
"(CI)" in p,
# NW: No Writeup - processes with lower integrity cannot write to
# this directory
"(NW)" in p,
]
)
def path_is_low_integrity_writable(path):
"""Check if the directory has a writable ACL by low integrity process"""
result = subprocess.run([ICACLS_PATH, path], capture_output=True, text=True)
if result.returncode != 0:
# icacls command failed. Can happen because path doesn't exist
# or we're not allowed to access acl information of the path.
return False
return permits_low_integrity_write(result.stdout)
def ensure_directories_exist(dirs):
for dir in dirs:
os.makedirs(dir, exist_ok=True)
def check_directory_acls(dirs):
acls_correct = True
for dir in dirs:
if not path_is_low_integrity_writable(dir):
logging.info(
f'Directory "{dir}" must be writable by low integrity '
"processes for sandbox mode."
)
acls_correct = False
return acls_correct
def setup_permissions(dirs):
script_dir = os.path.dirname(os.path.abspath(__file__))
bat_path = os.path.join(script_dir, "setup_sandbox_permissions.bat")
execute_info = {
"lpVerb": "runas", # Run as administrator
"lpFile": bat_path,
"lpParameters": " ".join(dirs),
"nShow": win32con.SW_SHOWNORMAL,
# This flag is necessary to wait for the process to finish.
"fMask": shellcon.SEE_MASK_NOCLOSEPROCESS,
}
# This is equivalent to right-clicking the bat file and selecting "Run as
# administrator"
proc_info = shell.ShellExecuteEx(**execute_info)
hProcess = proc_info["hProcess"]
# Setup script should take a few milliseconds. Time out at 10 seconds.
win32event.WaitForSingleObject(hProcess, 10 * 1000)
exit_code = win32process.GetExitCodeProcess(hProcess)
try:
if exit_code == win32con.STATUS_PENDING:
raise Exception("Sandbox permission script timed out")
if exit_code != 0:
raise Exception(
"Sandbox permission setup script failed. " f"Exit code: {exit_code}"
)
finally:
win32api.CloseHandle(hProcess)
def try_enable_sandbox():
write_permitted_dirs = [
folder_paths.get_write_permitted_base_directory(),
folder_paths.get_output_directory(),
folder_paths.get_user_directory(),
]
ensure_directories_exist(write_permitted_dirs)
if check_directory_acls(write_permitted_dirs):
set_process_integrity_level_to_low()
return True
# Directory permissions are not set up correctly. Try to fix.
logging.critical(
"Some directories do not have the correct permissions for sandbox mode "
"to work. Would you like ComfyUI to fix these permissions? You will "
"receive a UAC elevation prompt. [y/n]"
)
if input() != "y":
return False
setup_permissions(write_permitted_dirs)
# Check directory permissions again before enabling sandbox.
if check_directory_acls(write_permitted_dirs):
set_process_integrity_level_to_low()
return True
# Directory permissions are still not set up correctly. Give up.
return False

View File

@ -124,7 +124,7 @@ def test_base_path_changes(set_base_dir):
assert folder_paths.models_dir == os.path.join(test_dir, "models")
assert folder_paths.input_directory == os.path.join(test_dir, "input")
assert folder_paths.output_directory == os.path.join(test_dir, "output")
assert folder_paths.temp_directory == os.path.join(test_dir, "temp")
assert folder_paths.temp_directory == os.path.join(test_dir, "write-permitted", "temp")
assert folder_paths.user_directory == os.path.join(test_dir, "user")
assert os.path.join(test_dir, "custom_nodes") in folder_paths.get_folder_paths("custom_nodes")

View File

@ -0,0 +1,36 @@
import pytest
from sandbox import windows_sandbox
def test_icacl_no_low_integrity_label():
icacl_output = r"""
foo NT AUTHORITY\SYSTEM:(OI)(CI)(F)
"""
assert not windows_sandbox.permits_low_integrity_write(icacl_output)
def test_icacl_missing_inherit_flags():
icacl_output = r"""
foo Mandatory Label\Low Mandatory Level:(NW)
"""
assert not windows_sandbox.permits_low_integrity_write(icacl_output)
icacl_output = r"""
foo Mandatory Label\Low Mandatory Level:(OI)(NW)
"""
assert not windows_sandbox.permits_low_integrity_write(icacl_output)
icacl_output = r"""
foo Mandatory Label\Low Mandatory Level:(CI)(NW)
"""
assert not windows_sandbox.permits_low_integrity_write(icacl_output)
def test_icacl_correct_acls():
icacl_output = r"""
foo Mandatory Label\Low Mandatory Level:(I)(OI)(CI)(NW)
"""
assert windows_sandbox.permits_low_integrity_write(icacl_output)
icacl_output = r"""
foo Mandatory Label\Low Mandatory Level:(OI)(CI)(NW)
"""
assert windows_sandbox.permits_low_integrity_write(icacl_output)