ComfyUI/app/assets/services/path_utils.py
Simon Pinfold bf898cc552 fix(assets): move model asset on model_type: edit to stay loader-coherent
Under supports_model_type_tags, a model_type: edit on a filesystem-backed
model asset now relocates and re-registers the file to the folder matching
the new model_type:<folder_name>, so the file location stays coherent with
the label instead of a label-only relabel that left the file loader-orphaned.

Triggered off the POST /tags add (apply_tags): on a registered folder change
the file moves (preserving subfolder + filename), path-derived system tags
are re-derived, and filename metadata refreshed, all in one transaction with
file rollback on failure. Collisions are rejected (never clobber); an
unregistered model_type: stays a permissive label; non-model and hash-only
references are untouched. Shared on-disk folders keep their plural twins.
2026-06-20 14:40:08 +12:00

326 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from pathlib import Path, PureWindowsPath
from typing import Literal
import folder_paths
_NON_MODEL_FOLDER_NAMES = frozenset({"configs", "custom_nodes"})
def get_comfy_models_folders() -> list[tuple[str, list[str]]]:
"""Build list of (folder_name, base_paths[]) for all model locations.
Includes every category registered in folder_names_and_paths,
regardless of whether its paths are under the main models_dir,
but excludes non-model entries like configs and custom_nodes.
"""
targets: list[tuple[str, list[str]]] = []
for name, values in folder_paths.folder_names_and_paths.items():
if name in _NON_MODEL_FOLDER_NAMES:
continue
paths, _exts = values[0], values[1]
if paths:
targets.append((name, paths))
return targets
def get_model_base_for_folder(folder_name: str) -> str:
"""Resolve a registered model ``folder_name`` to its canonical base path.
This is the single-valued reverse lookup (model_type -> on-disk base) the
upload destination and the edit-type move (BE-1641) share: resolve the one
named folder to its first configured base. Never fans out.
Raises:
ValueError: the folder_name is not registered, or has no base path.
"""
model_folder_paths = dict(get_comfy_models_folders())
try:
bases = model_folder_paths[folder_name]
except KeyError:
raise ValueError(f"unknown model category '{folder_name}'")
if not bases:
raise ValueError(f"no base path configured for category '{folder_name}'")
return os.path.abspath(bases[0])
def model_folders_for_path(path: str) -> list[str]:
"""Return every model folder_name whose registered base covers ``path``.
Set-valued (spec-drift §1): a path shared by >=2 registered folders (e.g.
``diffusion_models`` and ``unet_gguf`` both registering ``models/unet``)
belongs to all of them. Purely lexical — does not require the file to exist.
Empty when ``path`` is not under any model base (e.g. an input/output file).
"""
fp_path = Path(os.path.abspath(path))
out: list[str] = []
for folder_name, bases in get_comfy_models_folders():
for base in bases:
if fp_path.is_relative_to(os.path.abspath(base)):
out.append(folder_name)
break
return out
def _validate_subfolder(subfolder: str | None) -> list[str]:
if not subfolder:
return []
if "\\" in subfolder:
raise ValueError("invalid subfolder path")
windows_path = PureWindowsPath(subfolder)
if windows_path.drive or windows_path.root:
raise ValueError("invalid subfolder path")
parts = Path(subfolder).parts
invalid = {"", ".", ".."}
if Path(subfolder).is_absolute() or any(part in invalid for part in parts):
raise ValueError("invalid subfolder path")
if any("/" in part or "\\" in part for part in parts):
raise ValueError("invalid subfolder path")
return list(parts)
def resolve_destination_from_tags(
tags: list[str], subfolder: str | None = None
) -> tuple[str, list[str]]:
"""Validates and maps upload routing tags -> (base_dir, subdirs_for_fs).
The request tags are only used to choose the write destination. Extra tags
remain labels; they do not become path components or trusted classification.
Explicit subfolder is the only request field that can add path components.
"""
destination_roles = [t for t in tags if t in {"input", "models", "output"}]
if len(destination_roles) != 1:
raise ValueError("uploads require exactly one destination role: input, models, or output")
root = destination_roles[0]
if root == "models":
model_type_tags = [t for t in tags if t.startswith("model_type:")]
if len(model_type_tags) != 1:
raise ValueError("models uploads require exactly one model_type:<folder_name> tag")
folder_name = model_type_tags[0].split(":", 1)[1]
if not folder_name:
raise ValueError("models uploads require exactly one model_type:<folder_name> tag")
base_dir = get_model_base_for_folder(folder_name)
elif root == "input":
base_dir = os.path.abspath(folder_paths.get_input_directory())
else:
base_dir = os.path.abspath(folder_paths.get_output_directory())
return base_dir, _validate_subfolder(subfolder)
def validate_path_within_base(candidate: str, base: str) -> None:
cand_abs = Path(os.path.abspath(candidate))
base_abs = Path(os.path.abspath(base))
if not cand_abs.is_relative_to(base_abs):
raise ValueError("destination escapes base directory")
def _compute_relative_path(child: str, parent: str) -> str:
rel = os.path.relpath(os.path.abspath(child), os.path.abspath(parent))
if rel == ".":
return ""
return rel.replace(os.sep, "/")
def _is_relative_to(child: str, parent: str) -> bool:
return Path(os.path.abspath(child)).is_relative_to(os.path.abspath(parent))
def compute_asset_response_paths(file_path: str) -> tuple[str, str | None] | None:
"""Return public (file_path, display_name) response fields for a file path.
These fields are storage locators, not model-loader namespaces. Registered
model-folder membership is represented by backend tags such as
``model_type:<folder_name>``; response paths only use known storage roots.
"""
fp_abs = os.path.abspath(file_path)
candidates: list[tuple[int, int, str, str]] = []
for order, (namespace, base) in enumerate(
(
("input", folder_paths.get_input_directory()),
("output", folder_paths.get_output_directory()),
("temp", folder_paths.get_temp_directory()),
("models", getattr(folder_paths, "models_dir", "")),
)
):
if not base:
continue
base_abs = os.path.abspath(base)
if _is_relative_to(fp_abs, base_abs):
candidates.append((len(base_abs), -order, namespace, base_abs))
if not candidates:
return None
_base_len, _order, namespace, base = max(candidates)
rel = _compute_relative_path(fp_abs, base)
public_path = f"{namespace}/{rel}" if rel else namespace
return public_path, rel or None
def compute_display_name(file_path: str) -> str | None:
"""Return the asset's `display_name`, or None for unknown paths."""
result = compute_asset_response_paths(file_path)
return result[1] if result else None
def compute_file_path(file_path: str) -> str | None:
"""Return the asset's logical storage `file_path`, or None for unknown paths."""
result = compute_asset_response_paths(file_path)
return result[0] if result else None
def compute_relative_filename(file_path: str) -> str | None:
"""
Return the model's path relative to the last well-known folder (the model category),
using forward slashes, eg:
/.../models/checkpoints/flux/123/flux.safetensors -> "flux/123/flux.safetensors"
/.../models/text_encoders/clip_g.safetensors -> "clip_g.safetensors"
This is legacy metadata/view filename logic, not the public Asset response
`display_name`. Response fields should use compute_asset_response_paths().
For non-model paths, returns None.
"""
try:
root_category, rel_path = get_asset_category_and_relative_path(file_path)
except ValueError:
return None
p = Path(rel_path)
parts = [seg for seg in p.parts if seg not in (".", "..", p.anchor)]
if not parts:
return None
if root_category == "models":
# parts[0] is the category ("checkpoints", "vae", etc) drop it
inside = parts[1:] if len(parts) > 1 else [parts[0]]
return "/".join(inside)
return "/".join(parts) # input/output: keep all parts
def get_asset_category_and_relative_path(
file_path: str,
) -> tuple[Literal["input", "output", "temp", "models"], str]:
"""Determine which root category a file path belongs to.
Categories:
- 'input': under folder_paths.get_input_directory()
- 'output': under folder_paths.get_output_directory()
- 'temp': under folder_paths.get_temp_directory()
- 'models': under any base path from get_comfy_models_folders()
Returns:
(root_category, relative_path_inside_that_root)
Raises:
ValueError: path does not belong to any known root.
"""
fp_abs = os.path.abspath(file_path)
def _check_is_within(child: str, parent: str) -> bool:
return Path(child).is_relative_to(parent)
def _compute_relative(child: str, parent: str) -> str:
# Normalize relative path, stripping any leading ".." components
# by anchoring to root (os.sep) then computing relpath back from it.
rel = os.path.relpath(
os.path.join(os.sep, os.path.relpath(child, parent)), os.sep
)
return "" if rel == "." else rel.replace(os.sep, "/")
# 1) input
input_base = os.path.abspath(folder_paths.get_input_directory())
if _check_is_within(fp_abs, input_base):
return "input", _compute_relative(fp_abs, input_base)
# 2) output
output_base = os.path.abspath(folder_paths.get_output_directory())
if _check_is_within(fp_abs, output_base):
return "output", _compute_relative(fp_abs, output_base)
# 3) temp
temp_base = os.path.abspath(folder_paths.get_temp_directory())
if _check_is_within(fp_abs, temp_base):
return "temp", _compute_relative(fp_abs, temp_base)
# 4) models (check deepest matching base to avoid ambiguity)
best: tuple[int, str, str] | None = None # (base_len, bucket, rel_inside_bucket)
for bucket, bases in get_comfy_models_folders():
for b in bases:
base_abs = os.path.abspath(b)
if not _check_is_within(fp_abs, base_abs):
continue
cand = (len(base_abs), bucket, _compute_relative(fp_abs, base_abs))
if best is None or cand[0] > best[0]:
best = cand
if best is not None:
_, bucket, rel_inside = best
combined = os.path.join(bucket, rel_inside)
normalized = os.path.relpath(os.path.join(os.sep, combined), os.sep)
return "models", normalized.replace(os.sep, "/")
raise ValueError(
f"Path is not within input, output, temp, or configured model bases: {file_path}"
)
def get_backend_system_tags_from_path(path: str) -> list[str]:
"""Return trusted backend tags derived from current filesystem facts.
The returned tags are only the backend-generated system tags: ``models``,
``model_type:<folder_name>``, ``input``, ``output``, and ``temp``. Model
type tags are based on registered folder names, not path components.
"""
fp_abs = os.path.abspath(path)
fp_path = Path(fp_abs)
tags: list[str] = []
def _add(tag: str) -> None:
if tag not in tags:
tags.append(tag)
for role, base in (
("input", folder_paths.get_input_directory()),
("output", folder_paths.get_output_directory()),
("temp", folder_paths.get_temp_directory()),
):
if fp_path.is_relative_to(os.path.abspath(base)):
_add(role)
model_types: list[str] = []
for folder_name, bases in get_comfy_models_folders():
for base in bases:
if fp_path.is_relative_to(os.path.abspath(base)):
model_types.append(folder_name)
break
if model_types:
_add("models")
for folder_name in model_types:
_add(f"model_type:{folder_name}")
if not tags:
raise ValueError(
f"Path is not within input, output, temp, or configured model bases: {path}"
)
return tags
def get_name_and_tags_from_asset_path(file_path: str) -> tuple[str, list[str]]:
"""Return (name, tags) derived from a filesystem path.
- name: base filename with extension
- tags: trusted backend classification tags derived from the path
Raises:
ValueError: path does not belong to any known root.
"""
return Path(file_path).name, get_backend_system_tags_from_path(file_path)