This commit is contained in:
Svein Ove Aas 2025-12-14 11:02:46 +01:00 committed by GitHub
commit ec0eed43ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 343 additions and 0 deletions

View File

@ -47,6 +47,9 @@ parser.add_argument("--extra-model-paths-config", type=str, default=None, metava
parser.add_argument("--output-directory", type=str, default=None, help="Set the ComfyUI output directory. Overrides --base-directory.")
parser.add_argument("--temp-directory", type=str, default=None, help="Set the ComfyUI temp directory (default is in the ComfyUI directory). Overrides --base-directory.")
parser.add_argument("--input-directory", type=str, default=None, help="Set the ComfyUI input directory. Overrides --base-directory.")
parser.add_argument("--enable-landlock", action="store_true", help="Use the Linux Landlock LSM to restrict filesystem writes to known ComfyUI and cache directories.")
parser.add_argument("--landlock-allow-writable", action="append", default=[], metavar="PATH", help="Extra directories that remain writable when --enable-landlock is set. Can be provided multiple times.")
parser.add_argument("--landlock-allow-readable", action="append", default=[], metavar="PATH", help="Extra directories to allow read access when --enable-landlock is set. Can be provided multiple times.")
parser.add_argument("--auto-launch", action="store_true", help="Automatically launch ComfyUI in the default browser.")
parser.add_argument("--disable-auto-launch", action="store_true", help="Disable auto launching the browser.")
parser.add_argument("--cuda-device", type=int, default=None, metavar="DEVICE_ID", help="Set the id of the cuda device this instance will use. All other devices will not be visible.")

View File

@ -9,6 +9,7 @@ from comfy.cli_args import args
from app.logger import setup_logger
import itertools
import utils.extra_config
import utils.landlock
import logging
import sys
from comfy_execution.progress import get_progress_state
@ -341,6 +342,13 @@ def start_comfyui(asyncio_loop=None):
folder_paths.set_temp_directory(temp_dir)
cleanup_temp()
if args.enable_landlock:
logging.info("Enabling Landlock")
landlock_ok = utils.landlock.enable_landlock(args, logging.getLogger("landlock"))
if not landlock_ok:
logging.critical("Requested Landlock sandbox but it could not be enabled. Exiting.")
sys.exit(1)
if args.windows_standalone_build:
try:
import new_updater

332
utils/landlock.py Normal file
View File

@ -0,0 +1,332 @@
import ctypes
import errno
import logging
import os
import sys
import tempfile
from dataclasses import dataclass
# Landlock constants copied from linux/landlock.h
PR_SET_NO_NEW_PRIVS = 38
LANDLOCK_RULE_PATH_BENEATH = 1
LANDLOCK_CREATE_RULESET_VERSION = 1
LANDLOCK_ACCESS_FS_EXECUTE = 1 << 0
LANDLOCK_ACCESS_FS_WRITE_FILE = 1 << 1
LANDLOCK_ACCESS_FS_READ_FILE = 1 << 2
LANDLOCK_ACCESS_FS_READ_DIR = 1 << 3
LANDLOCK_ACCESS_FS_REMOVE_DIR = 1 << 4
LANDLOCK_ACCESS_FS_REMOVE_FILE = 1 << 5
LANDLOCK_ACCESS_FS_MAKE_CHAR = 1 << 6
LANDLOCK_ACCESS_FS_MAKE_DIR = 1 << 7
LANDLOCK_ACCESS_FS_MAKE_REG = 1 << 8
LANDLOCK_ACCESS_FS_MAKE_SOCK = 1 << 9
LANDLOCK_ACCESS_FS_MAKE_FIFO = 1 << 10
LANDLOCK_ACCESS_FS_MAKE_BLOCK = 1 << 11
LANDLOCK_ACCESS_FS_MAKE_SYM = 1 << 12
LANDLOCK_ACCESS_FS_REFER = 1 << 13
LANDLOCK_ACCESS_FS_TRUNCATE = 1 << 14
LANDLOCK_ACCESS_FS_IOCTL_DEV = 1 << 15 # ABI v5+
# Pre-computed access masks
FS_READ_ACCESS = (
LANDLOCK_ACCESS_FS_READ_FILE
| LANDLOCK_ACCESS_FS_READ_DIR
| LANDLOCK_ACCESS_FS_EXECUTE
)
FS_WRITE_ACCESS = (
FS_READ_ACCESS
| LANDLOCK_ACCESS_FS_WRITE_FILE
| LANDLOCK_ACCESS_FS_MAKE_DIR
| LANDLOCK_ACCESS_FS_MAKE_REG
| LANDLOCK_ACCESS_FS_MAKE_SOCK
| LANDLOCK_ACCESS_FS_MAKE_FIFO
| LANDLOCK_ACCESS_FS_MAKE_BLOCK
| LANDLOCK_ACCESS_FS_MAKE_CHAR
| LANDLOCK_ACCESS_FS_MAKE_SYM
| LANDLOCK_ACCESS_FS_REMOVE_DIR
| LANDLOCK_ACCESS_FS_REMOVE_FILE
)
# Syscall numbers are ABI-stable across all 64-bit Linux architectures
SYS_LANDLOCK_CREATE_RULESET = 444
SYS_LANDLOCK_ADD_RULE = 445
SYS_LANDLOCK_RESTRICT_SELF = 446
class _RulesetAttr(ctypes.Structure):
_fields_ = [("handled_access_fs", ctypes.c_uint64)]
class _PathBeneathAttr(ctypes.Structure):
_fields_ = [
("allowed_access", ctypes.c_uint64),
("parent_fd", ctypes.c_int32),
("reserved", ctypes.c_uint32),
]
@dataclass(frozen=True)
class LandlockRules:
read_paths: set[str]
write_paths: set[str]
ioctl_paths: set[str]
def _normalize_paths(paths: set[str]) -> set[str]:
normalized = set()
for path in paths:
if not path:
continue
normalized.add(os.path.realpath(path))
return normalized
class LandlockEnforcer:
def __init__(self, logger: logging.Logger | None = None):
self.log = logger or logging.getLogger(__name__)
self.libc = ctypes.CDLL(None, use_errno=True)
self.libc.syscall.restype = ctypes.c_long
self.libc.prctl.restype = ctypes.c_int
def _syscall(self, syscall_nr, *args) -> tuple[int | None, int]:
ctypes.set_errno(0)
res = self.libc.syscall(ctypes.c_long(syscall_nr), *args)
if res == -1:
return None, ctypes.get_errno()
return res, 0
def _abi_version(self) -> int:
res, err = self._syscall(
SYS_LANDLOCK_CREATE_RULESET,
ctypes.c_void_p(0),
ctypes.c_size_t(0),
ctypes.c_uint(LANDLOCK_CREATE_RULESET_VERSION),
)
if res is None:
if err in (errno.ENOSYS, errno.EOPNOTSUPP):
return 0
return -err
return res
def _create_ruleset(self, handled_access: int) -> tuple[int | None, int]:
ruleset = _RulesetAttr(ctypes.c_uint64(handled_access))
return self._syscall(
SYS_LANDLOCK_CREATE_RULESET,
ctypes.byref(ruleset),
ctypes.c_size_t(ctypes.sizeof(ruleset)),
ctypes.c_uint(0),
)
def _add_rule(self, ruleset_fd: int, path: str, access_mask: int, allow_ioctl: bool) -> bool:
if allow_ioctl:
access_mask |= LANDLOCK_ACCESS_FS_IOCTL_DEV
try:
dir_fd = os.open(path, os.O_PATH | os.O_CLOEXEC)
except OSError as exc:
self.log.warning("Landlock: skipping %s (%s)", path, exc)
return False
try:
rule = _PathBeneathAttr(
ctypes.c_uint64(access_mask), ctypes.c_int32(dir_fd), ctypes.c_uint32(0)
)
res, err = self._syscall(
SYS_LANDLOCK_ADD_RULE,
ctypes.c_int(ruleset_fd),
ctypes.c_int(LANDLOCK_RULE_PATH_BENEATH),
ctypes.byref(rule),
ctypes.c_uint(0),
)
if res is None:
self.log.warning("Landlock: failed to add %s (errno=%s)", path, err)
return False
return True
finally:
os.close(dir_fd)
def _restrict_self(self, ruleset_fd: int) -> bool:
if self.libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0:
self.log.warning(
"Landlock: prctl(PR_SET_NO_NEW_PRIVS) failed (errno=%s)",
ctypes.get_errno(),
)
return False
res, err = self._syscall(SYS_LANDLOCK_RESTRICT_SELF, ctypes.c_int(ruleset_fd), ctypes.c_uint(0))
if res is None:
self.log.warning("Landlock: restrict_self failed (errno=%s)", err)
return False
return True
def apply(self, rules: LandlockRules) -> bool:
if not sys.platform.startswith("linux"):
self.log.info("Landlock: not a Linux platform, skipping.")
return False
abi_version = self._abi_version()
if abi_version <= 0:
self.log.info("Landlock: not available on this kernel (abi=%s).", abi_version)
return False
read_access = FS_READ_ACCESS
handled_write_access = FS_WRITE_ACCESS
allowed_write_access = (
read_access
| LANDLOCK_ACCESS_FS_WRITE_FILE
| LANDLOCK_ACCESS_FS_MAKE_DIR
| LANDLOCK_ACCESS_FS_MAKE_REG
| LANDLOCK_ACCESS_FS_REMOVE_DIR
| LANDLOCK_ACCESS_FS_REMOVE_FILE
) # leave other handled rights (symlinks, device nodes, sockets) denied
if abi_version >= 2:
handled_write_access |= LANDLOCK_ACCESS_FS_TRUNCATE
allowed_write_access |= LANDLOCK_ACCESS_FS_TRUNCATE
if abi_version >= 3:
handled_write_access |= LANDLOCK_ACCESS_FS_REFER
allowed_write_access |= LANDLOCK_ACCESS_FS_REFER
handled_access = handled_write_access | read_access
ioctl_supported = abi_version >= 5
if ioctl_supported:
handled_access |= LANDLOCK_ACCESS_FS_IOCTL_DEV
write_paths = _normalize_paths(rules.write_paths)
read_paths = _normalize_paths(rules.read_paths) - write_paths
# In theory these could require write or read access. Though in practice it's just /dev.
ioctl_paths = _normalize_paths(rules.ioctl_paths)
if ioctl_paths and not ioctl_supported:
self.log.info(
"Landlock: ioctl access requested but ABI %s has no support; continuing without ioctl.",
abi_version,
)
ruleset_fd = None
ruleset_fd, err = self._create_ruleset(handled_access)
if ruleset_fd is None:
self.log.warning("Landlock: failed to create ruleset (errno=%s)", err)
return False
try:
for path in write_paths:
if path != os.path.sep:
try:
os.makedirs(path, exist_ok=True)
except Exception as exc:
self.log.warning("Landlock: unable to prepare %s (%s)", path, exc)
return False
if not self._add_rule(
ruleset_fd, path, allowed_write_access, ioctl_supported and path in ioctl_paths
):
return False
for path in read_paths:
self._add_rule(ruleset_fd, path, read_access, ioctl_supported and path in ioctl_paths)
if not self._restrict_self(ruleset_fd):
return False
finally:
if ruleset_fd is not None:
os.close(ruleset_fd)
if write_paths:
self.log.info(
"Landlock enabled (ABI %s). Writable roots: %s",
abi_version,
", ".join(sorted(write_paths)),
)
else:
self.log.info("Landlock enabled (ABI %s). No writable roots configured.", abi_version)
return True
_landlock_applied = False
def build_default_rules(args) -> LandlockRules:
import folder_paths
from urllib.parse import urlparse
write_paths: set[str] = {
folder_paths.get_output_directory(),
folder_paths.get_input_directory(),
folder_paths.get_temp_directory(),
folder_paths.get_user_directory(),
}
ioctl_paths: set[str] = set()
# Torch and some backends use system temp and /dev/shm
write_paths.add(tempfile.gettempdir())
if args.temp_directory:
write_paths.add(os.path.join(os.path.abspath(args.temp_directory), "temp"))
db_url = getattr(args, "database_url", None)
if db_url and db_url.startswith("sqlite"):
parsed = urlparse(db_url)
if parsed.scheme == "sqlite" and parsed.path:
write_paths.add(os.path.abspath(os.path.dirname(parsed.path)))
for path in args.landlock_allow_writable or []:
if path:
write_paths.add(path)
# Build read paths - only what's actually needed
read_paths: set[str] = set()
# ComfyUI codebase
read_paths.add(folder_paths.base_path)
# All configured model directories (includes extra_model_paths.yaml)
for folder_name in folder_paths.folder_names_and_paths:
for path in folder_paths.folder_names_and_paths[folder_name][0]:
read_paths.add(path)
# Python installation and site-packages
read_paths.add(sys.prefix)
if sys.base_prefix != sys.prefix:
read_paths.add(sys.base_prefix)
for path in sys.path:
if path and os.path.isdir(path):
read_paths.add(path)
# System libraries (required for shared libs, CUDA, etc.)
for system_path in ["/usr", "/lib", "/lib64", "/opt", "/etc", "/proc", "/sys"]:
if os.path.exists(system_path):
read_paths.add(system_path)
# NixOS: /nix/store contains the entire system
if os.path.exists("/nix"):
read_paths.add("/nix")
# /dev needs write + ioctl for CUDA/GPU access
write_paths.add("/dev")
ioctl_paths.add("/dev")
# User-specified additional read paths
for path in getattr(args, "landlock_allow_readable", None) or []:
if path:
read_paths.add(path)
return LandlockRules(read_paths=_normalize_paths(read_paths), write_paths=_normalize_paths(write_paths), ioctl_paths=_normalize_paths(ioctl_paths))
def enable_landlock(args, logger: logging.Logger | None = None) -> bool:
global _landlock_applied
if _landlock_applied:
return True
if not getattr(args, "enable_landlock", False):
return False
enforcer = LandlockEnforcer(logger)
try:
_landlock_applied = enforcer.apply(build_default_rules(args))
except Exception:
enforcer.log.exception("Landlock: unexpected failure while applying ruleset.")
_landlock_applied = False
return _landlock_applied