mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2025-12-16 01:37:04 +08:00
217 lines
8.5 KiB
Python
217 lines
8.5 KiB
Python
import contextlib
|
||
import os
|
||
from aiohttp import web
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Literal, Any
|
||
|
||
import folder_paths
|
||
|
||
|
||
RootType = Literal["models", "input", "output"]
|
||
ALLOWED_ROOTS: tuple[RootType, ...] = ("models", "input", "output")
|
||
|
||
def get_query_dict(request: web.Request) -> dict[str, Any]:
|
||
"""
|
||
Gets a dictionary of query parameters from the request.
|
||
|
||
'request.query' is a MultiMapping[str], needs to be converted to a dictionary to be validated by Pydantic.
|
||
"""
|
||
query_dict = {
|
||
key: request.query.getall(key) if len(request.query.getall(key)) > 1 else request.query.get(key)
|
||
for key in request.query.keys()
|
||
}
|
||
return query_dict
|
||
|
||
def list_tree(base_dir: str) -> list[str]:
|
||
out: list[str] = []
|
||
base_abs = os.path.abspath(base_dir)
|
||
if not os.path.isdir(base_abs):
|
||
return out
|
||
for dirpath, _subdirs, filenames in os.walk(base_abs, topdown=True, followlinks=False):
|
||
for name in filenames:
|
||
out.append(os.path.abspath(os.path.join(dirpath, name)))
|
||
return out
|
||
|
||
def prefixes_for_root(root: RootType) -> list[str]:
|
||
if root == "models":
|
||
bases: list[str] = []
|
||
for _bucket, paths in get_comfy_models_folders():
|
||
bases.extend(paths)
|
||
return [os.path.abspath(p) for p in bases]
|
||
if root == "input":
|
||
return [os.path.abspath(folder_paths.get_input_directory())]
|
||
if root == "output":
|
||
return [os.path.abspath(folder_paths.get_output_directory())]
|
||
return []
|
||
|
||
def escape_like_prefix(s: str, escape: str = "!") -> tuple[str, str]:
|
||
"""Escapes %, _ and the escape char itself in a LIKE prefix.
|
||
Returns (escaped_prefix, escape_char). Caller should append '%' and pass escape=escape_char to .like().
|
||
"""
|
||
s = s.replace(escape, escape + escape) # escape the escape char first
|
||
s = s.replace("%", escape + "%").replace("_", escape + "_") # escape LIKE wildcards
|
||
return s, escape
|
||
|
||
def fast_asset_file_check(
|
||
*,
|
||
mtime_db: int | None,
|
||
size_db: int | None,
|
||
stat_result: os.stat_result,
|
||
) -> bool:
|
||
if mtime_db is None:
|
||
return False
|
||
actual_mtime_ns = getattr(stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1_000_000_000))
|
||
if int(mtime_db) != int(actual_mtime_ns):
|
||
return False
|
||
sz = int(size_db or 0)
|
||
if sz > 0:
|
||
return int(stat_result.st_size) == sz
|
||
return True
|
||
|
||
def utcnow() -> datetime:
|
||
"""Naive UTC timestamp (no tzinfo). We always treat DB datetimes as UTC."""
|
||
return datetime.now(timezone.utc).replace(tzinfo=None)
|
||
|
||
def get_comfy_models_folders() -> list[tuple[str, list[str]]]:
|
||
"""Build a list of (folder_name, base_paths[]) categories that are configured for model locations.
|
||
|
||
We trust `folder_paths.folder_names_and_paths` and include a category if
|
||
*any* of its base paths lies under the Comfy `models_dir`.
|
||
"""
|
||
targets: list[tuple[str, list[str]]] = []
|
||
models_root = os.path.abspath(folder_paths.models_dir)
|
||
for name, (paths, _exts) in folder_paths.folder_names_and_paths.items():
|
||
if any(os.path.abspath(p).startswith(models_root + os.sep) for p in paths):
|
||
targets.append((name, paths))
|
||
return targets
|
||
|
||
def compute_relative_filename(file_path: str) -> str | None:
|
||
"""
|
||
Return the model's path relative to the last well-known folder (the model category),
|
||
using forward slashes, eg:
|
||
/.../models/checkpoints/flux/123/flux.safetensors -> "flux/123/flux.safetensors"
|
||
/.../models/text_encoders/clip_g.safetensors -> "clip_g.safetensors"
|
||
|
||
For non-model paths, returns None.
|
||
NOTE: this is a temporary helper, used only for initializing metadata["filename"] field.
|
||
"""
|
||
try:
|
||
root_category, rel_path = get_relative_to_root_category_path_of_asset(file_path)
|
||
except ValueError:
|
||
return None
|
||
|
||
p = Path(rel_path)
|
||
parts = [seg for seg in p.parts if seg not in (".", "..", p.anchor)]
|
||
if not parts:
|
||
return None
|
||
|
||
if root_category == "models":
|
||
# parts[0] is the category ("checkpoints", "vae", etc) – drop it
|
||
inside = parts[1:] if len(parts) > 1 else [parts[0]]
|
||
return "/".join(inside)
|
||
return "/".join(parts) # input/output: keep all parts
|
||
|
||
|
||
def get_relative_to_root_category_path_of_asset(file_path: str) -> tuple[Literal["input", "output", "models"], str]:
|
||
"""Given an absolute or relative file path, determine which root category the path belongs to:
|
||
- 'input' if the file resides under `folder_paths.get_input_directory()`
|
||
- 'output' if the file resides under `folder_paths.get_output_directory()`
|
||
- 'models' if the file resides under any base path of categories returned by `get_comfy_models_folders()`
|
||
|
||
Returns:
|
||
(root_category, relative_path_inside_that_root)
|
||
For 'models', the relative path is prefixed with the category name:
|
||
e.g. ('models', 'vae/test/sub/ae.safetensors')
|
||
|
||
Raises:
|
||
ValueError: if the path does not belong to input, output, or configured model bases.
|
||
"""
|
||
fp_abs = os.path.abspath(file_path)
|
||
|
||
def _is_within(child: str, parent: str) -> bool:
|
||
try:
|
||
return os.path.commonpath([child, parent]) == parent
|
||
except Exception:
|
||
return False
|
||
|
||
def _rel(child: str, parent: str) -> str:
|
||
return os.path.relpath(os.path.join(os.sep, os.path.relpath(child, parent)), os.sep)
|
||
|
||
# 1) input
|
||
input_base = os.path.abspath(folder_paths.get_input_directory())
|
||
if _is_within(fp_abs, input_base):
|
||
return "input", _rel(fp_abs, input_base)
|
||
|
||
# 2) output
|
||
output_base = os.path.abspath(folder_paths.get_output_directory())
|
||
if _is_within(fp_abs, output_base):
|
||
return "output", _rel(fp_abs, output_base)
|
||
|
||
# 3) models (check deepest matching base to avoid ambiguity)
|
||
best: tuple[int, str, str] | None = None # (base_len, bucket, rel_inside_bucket)
|
||
for bucket, bases in get_comfy_models_folders():
|
||
for b in bases:
|
||
base_abs = os.path.abspath(b)
|
||
if not _is_within(fp_abs, base_abs):
|
||
continue
|
||
cand = (len(base_abs), bucket, _rel(fp_abs, base_abs))
|
||
if best is None or cand[0] > best[0]:
|
||
best = cand
|
||
|
||
if best is not None:
|
||
_, bucket, rel_inside = best
|
||
combined = os.path.join(bucket, rel_inside)
|
||
return "models", os.path.relpath(os.path.join(os.sep, combined), os.sep)
|
||
|
||
raise ValueError(f"Path is not within input, output, or configured model bases: {file_path}")
|
||
|
||
def get_name_and_tags_from_asset_path(file_path: str) -> tuple[str, list[str]]:
|
||
"""Return a tuple (name, tags) derived from a filesystem path.
|
||
|
||
Semantics:
|
||
- Root category is determined by `get_relative_to_root_category_path_of_asset`.
|
||
- The returned `name` is the base filename with extension from the relative path.
|
||
- The returned `tags` are:
|
||
[root_category] + parent folders of the relative path (in order)
|
||
For 'models', this means:
|
||
file '/.../ModelsDir/vae/test_tag/ae.safetensors'
|
||
-> root_category='models', some_path='vae/test_tag/ae.safetensors'
|
||
-> name='ae.safetensors', tags=['models', 'vae', 'test_tag']
|
||
|
||
Raises:
|
||
ValueError: if the path does not belong to input, output, or configured model bases.
|
||
"""
|
||
root_category, some_path = get_relative_to_root_category_path_of_asset(file_path)
|
||
p = Path(some_path)
|
||
parent_parts = [part for part in p.parent.parts if part not in (".", "..", p.anchor)]
|
||
return p.name, list(dict.fromkeys(normalize_tags([root_category, *parent_parts])))
|
||
|
||
def normalize_tags(tags: list[str] | None) -> list[str]:
|
||
"""
|
||
Normalize a list of tags by:
|
||
- Stripping whitespace and converting to lowercase.
|
||
- Removing duplicates.
|
||
"""
|
||
return [t.strip().lower() for t in (tags or []) if (t or "").strip()]
|
||
|
||
def collect_models_files() -> list[str]:
|
||
out: list[str] = []
|
||
for folder_name, bases in get_comfy_models_folders():
|
||
rel_files = folder_paths.get_filename_list(folder_name) or []
|
||
for rel_path in rel_files:
|
||
abs_path = folder_paths.get_full_path(folder_name, rel_path)
|
||
if not abs_path:
|
||
continue
|
||
abs_path = os.path.abspath(abs_path)
|
||
allowed = False
|
||
for b in bases:
|
||
base_abs = os.path.abspath(b)
|
||
with contextlib.suppress(Exception):
|
||
if os.path.commonpath([abs_path, base_abs]) == base_abs:
|
||
allowed = True
|
||
break
|
||
if allowed:
|
||
out.append(abs_path)
|
||
return out
|