mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-02-06 19:42:34 +08:00
Rename functions across app/assets/ to follow verb-based naming: - is_scalar → check_is_scalar - project_kv → expand_metadata_to_rows - _visible_owner_clause → _build_visible_owner_clause - _chunk_rows → _iter_row_chunks - _at_least_one → _validate_at_least_one_field - _tags_norm → _normalize_tags_field - _ser_dt → _serialize_datetime - _ser_updated → _serialize_updated_at - _error_response → _build_error_response - _validation_error_response → _build_validation_error_response - file_sender → stream_file_chunks - seed_assets_endpoint → seed_assets - utcnow → get_utc_now - _safe_sort_field → _validate_sort_field - _safe_filename → _sanitize_filename - fast_asset_file_check → check_asset_file_fast - prefixes_for_root → get_prefixes_for_root - blake3_hash → compute_blake3_hash - blake3_hash_async → compute_blake3_hash_async - _is_within → _check_is_within - _rel → _compute_relative Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
47 lines
1.6 KiB
Python
47 lines
1.6 KiB
Python
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Literal, Sequence
|
|
|
|
|
|
def pick_best_live_path(states: Sequence) -> str:
|
|
"""
|
|
Return the best on-disk path among cache states:
|
|
1) Prefer a path that exists with needs_verify == False (already verified).
|
|
2) Otherwise, pick the first path that exists.
|
|
3) Otherwise return empty string.
|
|
"""
|
|
alive = [s for s in states if getattr(s, "file_path", None) and os.path.isfile(s.file_path)]
|
|
if not alive:
|
|
return ""
|
|
for s in alive:
|
|
if not getattr(s, "needs_verify", False):
|
|
return s.file_path
|
|
return alive[0].file_path
|
|
|
|
|
|
ALLOWED_ROOTS: tuple[Literal["models", "input", "output"], ...] = ("models", "input", "output")
|
|
|
|
|
|
def escape_like_prefix(s: str, escape: str = "!") -> tuple[str, str]:
|
|
"""Escapes %, _ and the escape char itself in a LIKE prefix.
|
|
Returns (escaped_prefix, escape_char). Caller should append '%' and pass escape=escape_char to .like().
|
|
"""
|
|
s = s.replace(escape, escape + escape) # escape the escape char first
|
|
s = s.replace("%", escape + "%").replace("_", escape + "_") # escape LIKE wildcards
|
|
return s, escape
|
|
|
|
|
|
def get_utc_now() -> datetime:
|
|
"""Naive UTC timestamp (no tzinfo). We always treat DB datetimes as UTC."""
|
|
return datetime.now(timezone.utc).replace(tzinfo=None)
|
|
|
|
|
|
def normalize_tags(tags: list[str] | None) -> list[str]:
|
|
"""
|
|
Normalize a list of tags by:
|
|
- Stripping whitespace and converting to lowercase.
|
|
- Removing duplicates.
|
|
"""
|
|
return [t.strip().lower() for t in (tags or []) if (t or "").strip()]
|
|
|