import os from pathlib import Path, PureWindowsPath from typing import Literal import folder_paths _NON_MODEL_FOLDER_NAMES = frozenset({"configs", "custom_nodes"}) def get_comfy_models_folders() -> list[tuple[str, list[str]]]: """Build list of (folder_name, base_paths[]) for all model locations. Includes every category registered in folder_names_and_paths, regardless of whether its paths are under the main models_dir, but excludes non-model entries like configs and custom_nodes. """ targets: list[tuple[str, list[str]]] = [] for name, values in folder_paths.folder_names_and_paths.items(): if name in _NON_MODEL_FOLDER_NAMES: continue paths, _exts = values[0], values[1] if paths: targets.append((name, paths)) return targets def _validate_subfolder(subfolder: str | None) -> list[str]: if not subfolder: return [] if "\\" in subfolder: raise ValueError("invalid subfolder path") windows_path = PureWindowsPath(subfolder) if windows_path.drive or windows_path.root: raise ValueError("invalid subfolder path") parts = Path(subfolder).parts invalid = {"", ".", ".."} if Path(subfolder).is_absolute() or any(part in invalid for part in parts): raise ValueError("invalid subfolder path") if any("/" in part or "\\" in part for part in parts): raise ValueError("invalid subfolder path") return list(parts) def resolve_destination_from_tags( tags: list[str], subfolder: str | None = None ) -> tuple[str, list[str]]: """Validates and maps upload routing tags -> (base_dir, subdirs_for_fs). The request tags are only used to choose the write destination. Extra tags remain labels; they do not become path components or trusted classification. Explicit subfolder is the only request field that can add path components. """ destination_roles = [t for t in tags if t in {"input", "models", "output"}] if len(destination_roles) != 1: raise ValueError("uploads require exactly one destination role: input, models, or output") root = destination_roles[0] if root == "models": model_type_tags = [t for t in tags if t.startswith("model_type:")] if len(model_type_tags) != 1: raise ValueError("models uploads require exactly one model_type: tag") folder_name = model_type_tags[0].split(":", 1)[1] if not folder_name: raise ValueError("models uploads require exactly one model_type: tag") model_folder_paths = dict(get_comfy_models_folders()) try: bases = model_folder_paths[folder_name] except KeyError: raise ValueError(f"unknown model category '{folder_name}'") if not bases: raise ValueError(f"no base path configured for category '{folder_name}'") base_dir = os.path.abspath(bases[0]) elif root == "input": base_dir = os.path.abspath(folder_paths.get_input_directory()) else: base_dir = os.path.abspath(folder_paths.get_output_directory()) return base_dir, _validate_subfolder(subfolder) def validate_path_within_base(candidate: str, base: str) -> None: cand_abs = Path(os.path.abspath(candidate)) base_abs = Path(os.path.abspath(base)) if not cand_abs.is_relative_to(base_abs): raise ValueError("destination escapes base directory") def compute_relative_filename(file_path: str) -> str | None: """ Return the model's path relative to the last well-known folder (the model category), using forward slashes, eg: /.../models/checkpoints/flux/123/flux.safetensors -> "flux/123/flux.safetensors" /.../models/text_encoders/clip_g.safetensors -> "clip_g.safetensors" For non-model paths, returns None. """ try: root_category, rel_path = get_asset_category_and_relative_path(file_path) except ValueError: return None p = Path(rel_path) parts = [seg for seg in p.parts if seg not in (".", "..", p.anchor)] if not parts: return None if root_category == "models": # parts[0] is the category ("checkpoints", "vae", etc) – drop it inside = parts[1:] if len(parts) > 1 else [parts[0]] return "/".join(inside) return "/".join(parts) # input/output: keep all parts def get_asset_category_and_relative_path( file_path: str, ) -> tuple[Literal["input", "output", "temp", "models"], str]: """Determine which root category a file path belongs to. Categories: - 'input': under folder_paths.get_input_directory() - 'output': under folder_paths.get_output_directory() - 'temp': under folder_paths.get_temp_directory() - 'models': under any base path from get_comfy_models_folders() Returns: (root_category, relative_path_inside_that_root) Raises: ValueError: path does not belong to any known root. """ fp_abs = os.path.abspath(file_path) def _check_is_within(child: str, parent: str) -> bool: return Path(child).is_relative_to(parent) def _compute_relative(child: str, parent: str) -> str: # Normalize relative path, stripping any leading ".." components # by anchoring to root (os.sep) then computing relpath back from it. return os.path.relpath( os.path.join(os.sep, os.path.relpath(child, parent)), os.sep ) # 1) input input_base = os.path.abspath(folder_paths.get_input_directory()) if _check_is_within(fp_abs, input_base): return "input", _compute_relative(fp_abs, input_base) # 2) output output_base = os.path.abspath(folder_paths.get_output_directory()) if _check_is_within(fp_abs, output_base): return "output", _compute_relative(fp_abs, output_base) # 3) temp temp_base = os.path.abspath(folder_paths.get_temp_directory()) if _check_is_within(fp_abs, temp_base): return "temp", _compute_relative(fp_abs, temp_base) # 4) models (check deepest matching base to avoid ambiguity) best: tuple[int, str, str] | None = None # (base_len, bucket, rel_inside_bucket) for bucket, bases in get_comfy_models_folders(): for b in bases: base_abs = os.path.abspath(b) if not _check_is_within(fp_abs, base_abs): continue cand = (len(base_abs), bucket, _compute_relative(fp_abs, base_abs)) if best is None or cand[0] > best[0]: best = cand if best is not None: _, bucket, rel_inside = best combined = os.path.join(bucket, rel_inside) return "models", os.path.relpath(os.path.join(os.sep, combined), os.sep) raise ValueError( f"Path is not within input, output, temp, or configured model bases: {file_path}" ) def get_backend_system_tags_from_path(path: str) -> list[str]: """Return trusted backend tags derived from current filesystem facts. The returned tags are only the backend-generated system tags: ``models``, ``model_type:``, ``input``, ``output``, and ``temp``. Model type tags are based on registered folder names, not path components. """ fp_abs = os.path.abspath(path) fp_path = Path(fp_abs) tags: list[str] = [] def _add(tag: str) -> None: if tag not in tags: tags.append(tag) for role, base in ( ("input", folder_paths.get_input_directory()), ("output", folder_paths.get_output_directory()), ("temp", folder_paths.get_temp_directory()), ): if fp_path.is_relative_to(os.path.abspath(base)): _add(role) model_types: list[str] = [] for folder_name, bases in get_comfy_models_folders(): for base in bases: if fp_path.is_relative_to(os.path.abspath(base)): model_types.append(folder_name) break if model_types: _add("models") for folder_name in model_types: _add(f"model_type:{folder_name}") if not tags: raise ValueError( f"Path is not within input, output, temp, or configured model bases: {path}" ) return tags def get_name_and_tags_from_asset_path(file_path: str) -> tuple[str, list[str]]: """Return (name, tags) derived from a filesystem path. - name: base filename with extension - tags: trusted backend classification tags derived from the path Raises: ValueError: path does not belong to any known root. """ return Path(file_path).name, get_backend_system_tags_from_path(file_path)