mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-15 00:30:55 +08:00
337 lines
12 KiB
Python
337 lines
12 KiB
Python
import os
|
|
import torch
|
|
import hashlib
|
|
import folder_paths
|
|
from pathlib import Path
|
|
import time
|
|
from datetime import datetime
|
|
|
|
CACHE_DIR = os.path.join(folder_paths.output_directory, "cfz_conditioning_cache")
|
|
|
|
class AlwaysEqualProxy(str):
|
|
def __eq__(self, _):
|
|
return True
|
|
|
|
def __ne__(self, _):
|
|
return False
|
|
|
|
any_type = AlwaysEqualProxy("*")
|
|
|
|
def compare_revision(target_revision):
|
|
"""Simple version check - defaults to supporting lazy loading"""
|
|
try:
|
|
import comfy
|
|
if hasattr(comfy, 'model_management') and hasattr(comfy.model_management, 'get_torch_device'):
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
lazy_options = {"lazy": True} if compare_revision(2543) else {}
|
|
|
|
# Global timer storage - shared across all marker instances
|
|
TIMER_STORAGE = {}
|
|
|
|
class save_conditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"conditioning": ("CONDITIONING",),
|
|
"cache_name": ("STRING", {"default": "my_conditioning"}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
RETURN_NAMES = ("conditioning",)
|
|
FUNCTION = "save_conditioning"
|
|
CATEGORY = "CFZ Save-Load Conditioning"
|
|
|
|
def save_conditioning(self, conditioning, cache_name):
|
|
"""Save conditioning to cache with custom name, supporting subdirectories"""
|
|
|
|
if not cache_name.strip():
|
|
raise ValueError("Cache name cannot be empty")
|
|
|
|
# Normalize path separators (handle both \ and /)
|
|
cache_name = cache_name.replace('\\', os.sep).replace('/', os.sep)
|
|
|
|
# Split into directory and filename
|
|
path_parts = cache_name.split(os.sep)
|
|
sanitized_parts = [self.sanitize_filename(part) for part in path_parts]
|
|
|
|
# Reconstruct the relative path
|
|
relative_path = os.path.join(*sanitized_parts)
|
|
|
|
# Create full file path
|
|
full_path = os.path.join(CACHE_DIR, relative_path)
|
|
|
|
# Extract directory and ensure it exists
|
|
directory = os.path.dirname(full_path)
|
|
if directory:
|
|
os.makedirs(directory, exist_ok=True)
|
|
print(f"[CFZ Save] Created/verified directory: {directory}")
|
|
else:
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
|
|
# Add .pt extension if not present
|
|
if not full_path.endswith('.pt'):
|
|
full_path += '.pt'
|
|
|
|
file_path = self._resolve_path(full_path)
|
|
|
|
print(f"[CFZ Save] Processed info:")
|
|
print(f" - original name: '{cache_name}'")
|
|
print(f" - sanitized path: '{relative_path}'")
|
|
print(f" - full file path: {file_path}")
|
|
|
|
# Check if conditioning is provided
|
|
if conditioning is None:
|
|
print("[CFZ Save] ❌ ERROR: Conditioning is None!")
|
|
print("[CFZ Save] This suggests the CLIP Text Encode node is not connected properly")
|
|
print("[CFZ Save] Or there's an issue with the node execution order")
|
|
raise ValueError(f"No conditioning input provided for cache '{cache_name}'. Please connect conditioning input.")
|
|
|
|
try:
|
|
print(f"[CFZ Save] Attempting to save conditioning...")
|
|
torch.save(conditioning, file_path)
|
|
print(f"[CFZ Save] ✅ Successfully saved: {relative_path}.pt")
|
|
except Exception as e:
|
|
print(f"[CFZ Save] ❌ Error saving: {e}")
|
|
raise ValueError(f"Failed to save conditioning '{cache_name}': {str(e)}")
|
|
|
|
return (conditioning,)
|
|
|
|
def sanitize_filename(self, filename):
|
|
"""Remove invalid characters from filename/directory name"""
|
|
invalid_chars = '<>:"|?*' # Removed / and \ to allow path separators
|
|
for char in invalid_chars:
|
|
filename = filename.replace(char, '_')
|
|
|
|
filename = filename.strip(' .')
|
|
|
|
if not filename:
|
|
filename = "unnamed"
|
|
|
|
return filename
|
|
|
|
def _resolve_path(self, path_str):
|
|
"""Resolve file path using ComfyUI's path system"""
|
|
try:
|
|
return Path(folder_paths.get_annotated_filepath(str(path_str)))
|
|
except:
|
|
return Path(path_str)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(cls, conditioning, cache_name):
|
|
return hashlib.sha256(f"{cache_name}".encode('utf-8')).hexdigest()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(cls, conditioning, cache_name):
|
|
"""Validate inputs before execution"""
|
|
if not cache_name.strip():
|
|
return "Cache name cannot be empty"
|
|
return True
|
|
|
|
|
|
class load_conditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
cached_files = cls.get_cached_files()
|
|
|
|
return {
|
|
"required": {
|
|
"cache_name": (cached_files, {"default": cached_files[0] if cached_files else ""}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
RETURN_NAMES = ("conditioning",)
|
|
FUNCTION = "load_conditioning"
|
|
CATEGORY = "CFZ Save-Load Conditioning"
|
|
|
|
@classmethod
|
|
def get_cached_files(cls):
|
|
"""Get list of available cached conditioning files, including subdirectories"""
|
|
try:
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
cache_files = []
|
|
|
|
if not os.path.exists(CACHE_DIR):
|
|
print(f"[CFZ Load] Cache directory doesn't exist: {CACHE_DIR}")
|
|
return ["no_cache_directory"]
|
|
|
|
# Walk through all subdirectories
|
|
for root, dirs, files in os.walk(CACHE_DIR):
|
|
for filename in files:
|
|
if filename.endswith('.pt'):
|
|
# Get relative path from CACHE_DIR
|
|
full_path = os.path.join(root, filename)
|
|
relative_path = os.path.relpath(full_path, CACHE_DIR)
|
|
|
|
# Remove .pt extension
|
|
cache_name = relative_path[:-3]
|
|
|
|
# Normalize path separators for display
|
|
cache_name = cache_name.replace(os.sep, '/')
|
|
|
|
cache_files.append(cache_name)
|
|
|
|
cache_files.sort()
|
|
|
|
if cache_files:
|
|
# print(f"[CFZ Load] Found {len(cache_files)} cached files")
|
|
return cache_files
|
|
else:
|
|
print("[CFZ Load] No cache files found")
|
|
return ["no_cache_files_found"]
|
|
|
|
except Exception as e:
|
|
print(f"[CFZ Load] Error reading cache directory: {e}")
|
|
return ["error_reading_cache"]
|
|
|
|
def load_conditioning(self, cache_name):
|
|
"""Load conditioning from selected cached file, supporting subdirectories"""
|
|
|
|
if cache_name in ["no_cache_files_found", "error_reading_cache", "no_cache_directory", ""]:
|
|
raise ValueError("No valid cached conditioning file selected")
|
|
|
|
# Normalize path separators
|
|
cache_name = cache_name.replace('/', os.sep).replace('\\', os.sep)
|
|
|
|
# Build full path
|
|
file_path = os.path.join(CACHE_DIR, cache_name)
|
|
|
|
# Add .pt extension if not present
|
|
if not file_path.endswith('.pt'):
|
|
file_path += '.pt'
|
|
|
|
file_path = self._resolve_path(file_path)
|
|
|
|
# print(f"[CFZ Load] Loading conditioning from: {cache_name}")
|
|
|
|
if not os.path.exists(file_path):
|
|
raise ValueError(f"Cached conditioning not found: {cache_name}.pt")
|
|
|
|
try:
|
|
cached_tensor = torch.load(file_path, map_location='cpu')
|
|
print(f"[CFZ Load Cached Conditioning] ✅ Successfully loaded: {cache_name}.pt")
|
|
return (cached_tensor,)
|
|
except Exception as e:
|
|
print(f"[CFZ Load] ❌ Error loading: {e}")
|
|
raise ValueError(f"Error loading cached conditioning '{cache_name}': {str(e)}")
|
|
|
|
def _resolve_path(self, path_str):
|
|
"""Resolve file path using ComfyUI's path system"""
|
|
try:
|
|
return Path(folder_paths.get_annotated_filepath(str(path_str)))
|
|
except:
|
|
return Path(path_str)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(cls, cache_name):
|
|
return cache_name
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(cls, cache_name):
|
|
"""Validate inputs before execution"""
|
|
if cache_name in ["no_cache_files_found", "error_reading_cache", "no_cache_directory", ""]:
|
|
return "No cached conditioning files available"
|
|
|
|
# Normalize path
|
|
cache_name_normalized = cache_name.replace('/', os.sep).replace('\\', os.sep)
|
|
cache_path = os.path.join(CACHE_DIR, cache_name_normalized)
|
|
|
|
if not cache_path.endswith('.pt'):
|
|
cache_path += '.pt'
|
|
|
|
if not os.path.exists(cache_path):
|
|
return f"Selected cache file does not exist: {cache_name}.pt"
|
|
|
|
return True
|
|
|
|
class CFZ_PrintMarker:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
return {
|
|
"required": {
|
|
"message": ("STRING", {"default": "Reached this step!", "multiline": True}),
|
|
"timer_name": ("STRING", {"default": "workflow_timer"}),
|
|
"is_start_point": ("BOOLEAN", {"default": False}),
|
|
"is_end_point": ("BOOLEAN", {"default": False}),
|
|
"show_current_time": ("BOOLEAN", {"default": True}),
|
|
"clear_terminal": ("BOOLEAN", {"default": False}),
|
|
},
|
|
"optional": {
|
|
"trigger": (any_type, {}),
|
|
},
|
|
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO"}
|
|
}
|
|
|
|
RETURN_TYPES = (any_type,)
|
|
RETURN_NAMES = ("output",)
|
|
OUTPUT_NODE = True
|
|
FUNCTION = "run"
|
|
CATEGORY = "CFZ Utils/Debug"
|
|
|
|
def run(self, message, timer_name="workflow_timer", is_start_point=False, is_end_point=False,
|
|
show_current_time=True, clear_terminal=False, trigger=None, unique_id=None, extra_pnginfo=None):
|
|
|
|
# Clear terminal if requested (before any output)
|
|
if clear_terminal:
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
|
|
current_time = time.time()
|
|
current_timestamp = datetime.fromtimestamp(current_time).strftime("%H:%M:%S.%f")[:-3]
|
|
|
|
# Handle timer logic
|
|
timer_info = ""
|
|
|
|
if is_start_point:
|
|
TIMER_STORAGE[timer_name] = current_time
|
|
timer_info = f" | TIMER START: '{timer_name}'"
|
|
|
|
if is_end_point:
|
|
if timer_name in TIMER_STORAGE:
|
|
start_time = TIMER_STORAGE[timer_name]
|
|
elapsed = current_time - start_time
|
|
|
|
# Format elapsed time nicely
|
|
if elapsed < 1:
|
|
elapsed_str = f"{elapsed*1000:.1f}ms"
|
|
elif elapsed < 60:
|
|
elapsed_str = f"{elapsed:.2f}s"
|
|
else:
|
|
minutes = int(elapsed // 60)
|
|
seconds = elapsed % 60
|
|
elapsed_str = f"{minutes}m {seconds:.2f}s"
|
|
|
|
timer_info = f" | TIMER END: '{timer_name}' - Elapsed: {elapsed_str}"
|
|
|
|
# Clean up the timer
|
|
del TIMER_STORAGE[timer_name]
|
|
else:
|
|
timer_info = f" | TIMER ERROR: No start point found for '{timer_name}'"
|
|
|
|
# Build the output message
|
|
output_parts = []
|
|
|
|
if show_current_time:
|
|
output_parts.append(f"[{current_timestamp}]")
|
|
|
|
output_parts.append(f"CFZ Marker")
|
|
output_parts.append(message)
|
|
|
|
if timer_info:
|
|
output_parts.append(timer_info)
|
|
|
|
final_message = " ".join(output_parts)
|
|
print(f"\n{final_message}\n")
|
|
|
|
return (trigger,)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(cls, **kwargs):
|
|
# Always execute to ensure timing is accurate
|
|
return float("NaN")
|