ComfyUI/comfy/cmd/folder_paths.py
2025-12-09 16:13:43 -08:00

601 lines
24 KiB
Python

from __future__ import annotations
import time
import collections.abc
import logging
import mimetypes
import os
from contextlib import nullcontext
from functools import reduce
from pathlib import Path, PurePosixPath
from typing import Optional, List, Literal
from ..cli_args_types import Configuration
from ..component_model.deprecation import _deprecate_method
from ..component_model.files import get_package_as_path
from ..component_model.folder_path_types import FolderNames, SaveImagePathTuple, ModelPaths
from ..component_model.folder_path_types import supported_pt_extensions, extension_mimetypes_cache
from ..component_model.module_property import create_module_properties
from ..component_model.platform_path import construct_path
from ..execution_context import current_execution_context
_module_properties = create_module_properties()
logger = logging.getLogger(__name__)
# todo: investigate what this is actually trying to do
# System User Protection - Protects system directories from HTTP endpoint access
# System Users are internal-only users that cannot be accessed via HTTP endpoints.
# They use the '__' prefix convention (similar to Python's private member convention).
SYSTEM_USER_PREFIX = "__"
@_module_properties.getter
def _supported_pt_extensions() -> set[str]:
return set(supported_pt_extensions)
@_module_properties.getter
def _extension_mimetypes_cache() -> dict[str, str]:
return extension_mimetypes_cache
# todo: this needs to be wrapped in a context and configurable
@_module_properties.getter
def _base_path():
return _folder_names_and_paths().base_paths[0]
def _resolve_path_with_compatibility(path: Path | str) -> PurePosixPath | Path:
"""
Absolute posix style paths (aka, paths starting with `/`) are always returned as-is, otherwise this is resolved.
:param path: a path or string to a path
:return: the resolved path
"""
if isinstance(path, PurePosixPath) and path.is_absolute():
return path
if not path.is_absolute():
base_path_to_path = _base_path() / path
if base_path_to_path.is_absolute():
return base_path_to_path
else:
return Path.resolve(_base_path() / path)
return Path(path).resolve()
def get_system_user_directory(name: str = "system") -> str:
"""
Get the path to a System User directory.
System User directories (prefixed with '__') are only accessible via internal API,
not through HTTP endpoints. Use this for storing system-internal data that
should not be exposed to users.
Args:
name: System user name (e.g., "system", "cache"). Must be alphanumeric
with underscores allowed, but cannot start with underscore.
Returns:
Absolute path to the system user directory.
Raises:
ValueError: If name is empty, invalid, or starts with underscore.
Example:
>>> get_system_user_directory("cache")
'/path/to/user/__cache'
"""
if not name or not isinstance(name, str):
raise ValueError("System user name cannot be empty")
if not name.replace("_", "").isalnum():
raise ValueError(f"Invalid system user name: '{name}'")
if name.startswith("_"):
raise ValueError("System user name should not start with underscore")
return os.path.join(get_user_directory(), f"{SYSTEM_USER_PREFIX}{name}")
def get_public_user_directory(user_id: str) -> str | None:
"""
Get the path to a Public User directory for HTTP endpoint access.
This function provides structural security by returning None for any
System User (prefixed with '__'). All HTTP endpoints should use this
function instead of directly constructing user paths.
Args:
user_id: User identifier from HTTP request.
Returns:
Absolute path to the user directory, or None if user_id is invalid
or refers to a System User.
Example:
>>> get_public_user_directory("default")
'/path/to/user/default'
>>> get_public_user_directory("__system")
None
"""
if not user_id or not isinstance(user_id, str):
return None
if user_id.startswith(SYSTEM_USER_PREFIX):
return None
return os.path.join(get_user_directory(), user_id)
def init_default_paths(folder_names_and_paths: FolderNames, configuration: Optional[Configuration] = None, create_all_directories=False, replace_existing=True, base_paths_from_configuration=True):
"""
Populates the folder names and paths object with the default, upstream model directories and custom_nodes directory.
:param folder_names_and_paths: the object to populate with paths
:param configuration: a configuration whose base_paths and other path settings will be used to set the values on this object
:param create_all_directories: create all the possible directories by calling create_directories() after the object is populated
:param replace_existing: when true, removes existing model paths objects for the built-in folder names; and, replaces the base paths
:param base_paths_from_configuration: when true (default), populates folder_names_and_paths using the configuration's base paths, otherwise does not alter base paths as passed from folder_names_and_paths.base_paths
:return:
"""
from ..cmd.main_pre import args
configuration = configuration or args
if base_paths_from_configuration:
base_paths = [Path(configuration.cwd) if configuration.cwd is not None else None] + [Path(configuration.base_directory) if configuration.base_directory is not None else None] + (configuration.base_paths or [])
base_paths = [Path(path) for path in base_paths if path is not None]
if len(base_paths) == 0:
base_paths = [Path(os.getcwd())]
base_paths = reduce(lambda uniq_list, item: uniq_list.append(item) or uniq_list if item not in uniq_list else uniq_list, base_paths, [])
if replace_existing:
folder_names_and_paths.base_paths.clear()
for base_path in base_paths:
folder_names_and_paths.add_base_path(base_path)
hf_cache_paths = ModelPaths(["huggingface_cache"], supported_extensions=set())
# TODO: explore if there is a better way to do this
if "HF_HUB_CACHE" in os.environ:
hf_cache_paths.additional_absolute_directory_paths.append(os.environ.get("HF_HUB_CACHE"))
hf_xet = ModelPaths(["xet"], supported_extensions=set())
if "HF_XET_CACHE" in os.environ:
hf_xet.additional_absolute_directory_paths.append(os.environ.get("HF_XET_CACHE"))
model_paths_to_add = [
ModelPaths(["checkpoints"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["configs"], additional_absolute_directory_paths=[get_package_as_path("comfy.configs")], supported_extensions={".yaml"}),
ModelPaths(["vae"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(folder_names=["text_encoders", "clip"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["loras"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(folder_names=["diffusion_models", "unet"], supported_extensions=set(supported_pt_extensions), folder_names_are_relative_directory_paths_too=True),
ModelPaths(["clip_vision"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["style_models"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["embeddings"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["diffusers"], supported_extensions=set()),
ModelPaths(["vae_approx"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(folder_names=["controlnet", "t2i_adapter", "diff_controlnet"], supported_extensions=set(supported_pt_extensions), folder_names_are_relative_directory_paths_too=True),
ModelPaths(["gligen"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["upscale_models"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["custom_nodes"], folder_name_base_path_subdir=construct_path(""), supported_extensions=set()),
ModelPaths(["hypernetworks"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["photomaker"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["classifiers"], supported_extensions=set()),
ModelPaths(["huggingface"], supported_extensions=set()),
ModelPaths(["model_patches"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["audio_encoders"], supported_extensions=set(supported_pt_extensions)),
ModelPaths(["latent_upscale_models"], supported_extensions=set(supported_pt_extensions)),
hf_cache_paths,
hf_xet,
]
for model_paths in model_paths_to_add:
if replace_existing:
for folder_name in model_paths.folder_names:
del folder_names_and_paths[folder_name]
folder_names_and_paths.add(model_paths)
if create_all_directories:
create_directories(folder_names_and_paths)
if configuration.output_directory is not None:
folder_names_and_paths.application_paths.output_directory = Path(configuration.output_directory)
if configuration.input_directory is not None:
folder_names_and_paths.application_paths.input_directory = Path(configuration.input_directory)
if configuration.temp_directory is not None:
folder_names_and_paths.application_paths.temp_directory = Path(configuration.temp_directory)
if configuration.user_directory is not None:
folder_names_and_paths.application_paths.user_directory = Path(configuration.user_directory)
@_module_properties.getter
def _folder_names_and_paths():
return current_execution_context().folder_names_and_paths
@_module_properties.getter
def _models_dir():
return str(Path(current_execution_context().folder_names_and_paths.base_paths[0]) / construct_path("models"))
@_module_properties.getter
def _user_directory() -> str:
return str(_resolve_path_with_compatibility(current_execution_context().folder_names_and_paths.application_paths.user_directory))
@_module_properties.getter
def _temp_directory() -> str:
return str(_resolve_path_with_compatibility(current_execution_context().folder_names_and_paths.application_paths.temp_directory))
@_module_properties.getter
def _input_directory() -> str:
return str(_resolve_path_with_compatibility(current_execution_context().folder_names_and_paths.application_paths.input_directory))
@_module_properties.getter
def _output_directory() -> str:
return str(_resolve_path_with_compatibility(current_execution_context().folder_names_and_paths.application_paths.output_directory))
@_deprecate_method(version="0.2.3", message="Mapping of previous folder names is already done by other mechanisms.")
def map_legacy(folder_name: str) -> str:
legacy = {"unet": "diffusion_models"}
return legacy.get(folder_name, folder_name)
def set_output_directory(output_dir: str | Path):
_folder_names_and_paths().application_paths.output_directory = construct_path(output_dir)
def set_temp_directory(temp_dir: str | Path):
_folder_names_and_paths().application_paths.temp_directory = construct_path(temp_dir)
def set_input_directory(input_dir: str | Path):
_folder_names_and_paths().application_paths.input_directory = construct_path(input_dir)
def get_output_directory() -> str:
return str(_resolve_path_with_compatibility(_folder_names_and_paths().application_paths.output_directory))
def get_temp_directory() -> str:
return str(_resolve_path_with_compatibility(_folder_names_and_paths().application_paths.temp_directory))
def get_input_directory(mkdirs=True) -> str:
res = str(_resolve_path_with_compatibility(_folder_names_and_paths().application_paths.input_directory))
if mkdirs:
try:
os.makedirs(res, exist_ok=True)
except Exception as exc_info:
logger.warning(f"could not create directory {res} when trying to access input directory", exc_info)
return res
def get_user_directory() -> str:
return str(_resolve_path_with_compatibility(_folder_names_and_paths().application_paths.user_directory))
def set_user_directory(user_dir: str | Path) -> None:
_folder_names_and_paths().application_paths.user_directory = construct_path(user_dir)
# NOTE: used in http server so don't put folders that should not be accessed remotely
def get_directory_by_type(type_name) -> str | None:
if type_name == "output":
return get_output_directory()
if type_name == "temp":
return get_temp_directory()
if type_name == "input":
return get_input_directory()
return None
# determine base_dir rely on annotation if name is 'filename.ext [annotation]' format
# otherwise use default_path as base_dir
def annotated_filepath(name: str) -> tuple[str, str | None]:
if name.endswith("[output]"):
base_dir = get_output_directory()
name = name[:-9]
elif name.endswith("[input]"):
base_dir = get_input_directory()
name = name[:-8]
elif name.endswith("[temp]"):
base_dir = get_temp_directory()
name = name[:-7]
else:
return name, None
return name, base_dir
def get_annotated_filepath(name, default_dir=None) -> str:
name, base_dir = annotated_filepath(name)
if base_dir is None:
if default_dir is not None:
base_dir = default_dir
else:
base_dir = get_input_directory() # fallback path
return os.path.join(base_dir, name)
def exists_annotated_filepath(name):
name, base_dir = annotated_filepath(name)
if base_dir is None:
base_dir = get_input_directory() # fallback path
filepath = os.path.join(base_dir, name)
return os.path.exists(filepath)
def add_model_folder_path(folder_name, full_folder_path: Optional[str] = None, extensions: Optional[set[str] | frozenset[str]] = None, is_default: bool = False, folder_names_and_paths: Optional[FolderNames] = None) -> str:
"""
Registers a model path for the given canonical name.
:param folder_name: the folder name
:param full_folder_path: When none, defaults to os.path.join(models_dir, folder_name) aka the folder as a subpath to the default models directory
:param extensions: supported file extensions
:return: the folder path
"""
folder_names_and_paths = folder_names_and_paths or _folder_names_and_paths()
if full_folder_path is None:
if folder_name not in folder_names_and_paths:
folder_names_and_paths.add(ModelPaths(folder_names=[folder_name], supported_extensions=set(extensions) if extensions is not None else _supported_pt_extensions()))
return [p for p in folder_names_and_paths.directory_paths(folder_name)][0]
else:
# todo: this should use the subdir pattern
full_folder_path = construct_path(folder_names_and_paths.base_paths[0]) / "models" / folder_name
folder_path = folder_names_and_paths[folder_name]
if full_folder_path not in folder_path.paths:
if is_default:
folder_path.paths.insert(0, full_folder_path)
else:
folder_path.paths.append(full_folder_path)
else:
try:
current_default = folder_path.paths.index(full_folder_path) == 0
except ValueError:
current_default = False
if current_default != is_default:
folder_path.paths.remove(full_folder_path)
if is_default:
folder_path.paths.insert(0, full_folder_path)
else:
folder_path.paths.append(full_folder_path)
if extensions is not None:
folder_path.supported_extensions |= extensions
return full_folder_path
def get_folder_paths(folder_name) -> List[str]:
return [path for path in _folder_names_and_paths()[folder_name].paths]
@_deprecate_method(version="1.0.0", message="Use os.scandir instead.")
def recursive_search(directory, excluded_dir_names=None) -> tuple[list[str], dict[str, float]]:
if not os.path.isdir(directory):
return [], {}
if excluded_dir_names is None:
excluded_dir_names = []
result = []
dirs = {}
# Attempt to add the initial directory to dirs with error handling
try:
dirs[directory] = os.path.getmtime(directory)
except FileNotFoundError:
logger.warning(f"Warning: Unable to access {directory}. Skipping this path.")
logger.debug("recursive file list on directory {}".format(directory))
dirpath: str
subdirs: list[str]
filenames: list[str]
for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True):
subdirs[:] = [d for d in subdirs if d not in excluded_dir_names]
for file_name in filenames:
try:
relative_path = os.path.relpath(os.path.join(dirpath, file_name), directory)
result.append(relative_path)
except:
logger.warning(f"Warning: Unable to access {file_name}. Skipping this file.")
continue
for d in subdirs:
path: str = os.path.join(dirpath, d)
try:
dirs[path] = os.path.getmtime(path)
except FileNotFoundError:
logger.warning(f"Warning: Unable to access {path}. Skipping this path.")
continue
logger.debug("found {} files".format(len(result)))
return result, dirs
def filter_files_extensions(files: collections.abc.Collection[str], extensions: collections.abc.Collection[str]):
return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files)))
def get_full_path(folder_name, filename) -> Optional[str | bytes | os.PathLike]:
"""
Gets the path to a filename inside a folder.
:param folder_name:
:param filename:
:return:
"""
path = _folder_names_and_paths().first_existing_or_none(folder_name, construct_path(filename))
return str(path) if path is not None else None
def get_full_path_or_raise(folder_name: str, filename: str) -> str:
full_path = get_full_path(folder_name, filename)
if full_path is None:
# todo: probably shouldn't say model
raise FileNotFoundError(f"Model in folder '{folder_name}' with filename '{filename}' not found.")
return full_path
def get_filename_list(folder_name: str) -> list[str]:
return [str(path) for path in _folder_names_and_paths().file_paths(folder_name=folder_name, relative=True)]
def get_save_image_path(filename_prefix, output_dir, image_width=0, image_height=0) -> SaveImagePathTuple:
def map_filename(filename: str) -> tuple[int, str]:
prefix_len = len(os.path.basename(filename_prefix))
prefix = filename[:prefix_len + 1]
try:
digits = int(filename[prefix_len + 1:].split('_')[0])
except:
digits = 0
return digits, prefix
def compute_vars(input: str, image_width: int, image_height: int) -> str:
input = input.replace("%width%", str(image_width))
input = input.replace("%height%", str(image_height))
now = time.localtime()
input = input.replace("%year%", str(now.tm_year))
input = input.replace("%month%", str(now.tm_mon).zfill(2))
input = input.replace("%day%", str(now.tm_mday).zfill(2))
input = input.replace("%hour%", str(now.tm_hour).zfill(2))
input = input.replace("%minute%", str(now.tm_min).zfill(2))
input = input.replace("%second%", str(now.tm_sec).zfill(2))
return input
filename_prefix = compute_vars(filename_prefix, image_width, image_height)
subfolder = os.path.dirname(os.path.normpath(filename_prefix))
filename = os.path.basename(os.path.normpath(filename_prefix))
full_output_folder = str(os.path.join(output_dir, subfolder))
try:
counter = max(filter(lambda a: a[1][:-1] == filename and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1
except ValueError:
counter = 1
except FileNotFoundError:
os.makedirs(full_output_folder, exist_ok=True)
counter = 1
return SaveImagePathTuple(full_output_folder, filename, counter, subfolder, filename_prefix)
def create_directories(paths: FolderNames | None = None):
# all configured paths should be created
paths = paths or _folder_names_and_paths()
for folder_path_spec in paths.values():
for path in folder_path_spec.paths:
# only create resolved paths
if not Path(path).is_absolute():
continue
os.makedirs(path, exist_ok=True)
for path in paths.application_paths:
path.mkdir(exist_ok=True, parents=True)
@_deprecate_method(version="0.2.3", message="Caching has been removed.")
def invalidate_cache(folder_name):
pass
def filter_files_content_types(files: list[str], content_types: list[Literal["image", "video", "audio", "model"]]) -> list[str]:
"""
Example:
files = os.listdir(folder_paths.get_input_directory())
filter_files_content_types(files, ["image", "audio", "video"])
"""
result = []
for file in files:
extension = file.split('.')[-1]
if extension not in extension_mimetypes_cache:
mime_type, _ = mimetypes.guess_type(file, strict=False)
if not mime_type:
continue
content_type = mime_type.split('/')[0]
extension_mimetypes_cache[extension] = content_type
else:
content_type = extension_mimetypes_cache[extension]
if content_type in content_types:
result.append(file)
return result
def get_input_subfolders() -> list[str]:
"""Returns a list of all subfolder paths in the input directory, recursively.
Returns:
List of folder paths relative to the input directory, excluding the root directory
"""
input_dir = get_input_directory()
folders = []
try:
if not os.path.exists(input_dir):
return []
for root, dirs, _ in os.walk(input_dir):
rel_path = os.path.relpath(root, input_dir)
if rel_path != ".": # Only include non-root directories
# Normalize path separators to forward slashes
folders.append(rel_path.replace(os.sep, '/'))
return sorted(folders)
except FileNotFoundError:
return []
@_module_properties.getter
def _cache_helper():
return nullcontext()
# todo: can this be done side effect free?
init_default_paths(_folder_names_and_paths())
__all__ = [
"supported_pt_extensions",
"extension_mimetypes_cache",
"base_path", # pylint: disable=undefined-all-variable
"folder_names_and_paths", # pylint: disable=undefined-all-variable
"models_dir", # pylint: disable=undefined-all-variable
"user_directory", # pylint: disable=undefined-all-variable
"output_directory", # pylint: disable=undefined-all-variable
"temp_directory", # pylint: disable=undefined-all-variable
"input_directory", # pylint: disable=undefined-all-variable
# Public functions
"init_default_paths",
"map_legacy",
"set_output_directory",
"set_temp_directory",
"set_input_directory",
"get_output_directory",
"get_temp_directory",
"get_input_directory",
"get_user_directory",
"set_user_directory",
"get_directory_by_type",
"annotated_filepath",
"get_annotated_filepath",
"exists_annotated_filepath",
"add_model_folder_path",
"get_folder_paths",
"recursive_search",
"filter_files_extensions",
"get_full_path",
"get_full_path_or_raise",
"get_filename_list",
"get_save_image_path",
"create_directories",
"invalidate_cache",
"filter_files_content_types",
"get_input_subfolders",
"get_system_user_directory",
"get_public_user_directory",
# todo: why? what is the purpose?
"SYSTEM_USER_PREFIX",
]