Update cfz_caching_condition.py

This commit is contained in:
patientx 2025-09-05 17:14:59 +03:00 committed by GitHub
parent 73774a5fac
commit 79f734309b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,36 +4,29 @@ import hashlib
import folder_paths import folder_paths
from pathlib import Path from pathlib import Path
# Standalone configuration - no external dependencies
CACHE_DIR = os.path.join(folder_paths.output_directory, "cfz_conditioning_cache") CACHE_DIR = os.path.join(folder_paths.output_directory, "cfz_conditioning_cache")
# Simple proxy class for any type matching class AlwaysEqualProxy(str):
class AlwaysEqualProxy: def __eq__(self, _):
def __init__(self, name):
self.name = name
def __eq__(self, other):
return True return True
def __ne__(self, other): def __ne__(self, _):
return False return False
any_type = AlwaysEqualProxy("*") any_type = AlwaysEqualProxy("*")
# Simple version comparison function
def compare_revision(target_revision): def compare_revision(target_revision):
"""Simple version check - defaults to supporting lazy loading""" """Simple version check - defaults to supporting lazy loading"""
try: try:
import comfy import comfy
if hasattr(comfy, 'model_management') and hasattr(comfy.model_management, 'get_torch_device'): if hasattr(comfy, 'model_management') and hasattr(comfy.model_management, 'get_torch_device'):
return True # Assume modern ComfyUI supports lazy loading return True
except: except:
pass pass
return False return False
lazy_options = {"lazy": True} if compare_revision(2543) else {} lazy_options = {"lazy": True} if compare_revision(2543) else {}
class save_conditioning: class save_conditioning:
@classmethod @classmethod
def INPUT_TYPES(cls): def INPUT_TYPES(cls):
@ -52,13 +45,11 @@ class save_conditioning:
def save_conditioning(self, conditioning, cache_name): def save_conditioning(self, conditioning, cache_name):
"""Save conditioning to cache with custom name""" """Save conditioning to cache with custom name"""
# Ensure cache directory exists
os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True)
if not cache_name.strip(): if not cache_name.strip():
raise ValueError("Cache name cannot be empty") raise ValueError("Cache name cannot be empty")
# Sanitize cache name (remove invalid characters)
sanitized_name = self.sanitize_filename(cache_name) sanitized_name = self.sanitize_filename(cache_name)
file_path = os.path.join(CACHE_DIR, f"{sanitized_name}.pt") file_path = os.path.join(CACHE_DIR, f"{sanitized_name}.pt")
file_path = self._resolve_path(file_path) file_path = self._resolve_path(file_path)
@ -74,7 +65,6 @@ class save_conditioning:
print("[CFZ Save] Or there's an issue with the node execution order") print("[CFZ Save] Or there's an issue with the node execution order")
raise ValueError(f"No conditioning input provided for cache '{sanitized_name}'. Please connect conditioning input.") raise ValueError(f"No conditioning input provided for cache '{sanitized_name}'. Please connect conditioning input.")
# Save conditioning (will overwrite if exists)
try: try:
print(f"[CFZ Save] Attempting to save conditioning...") print(f"[CFZ Save] Attempting to save conditioning...")
torch.save(conditioning, file_path) torch.save(conditioning, file_path)
@ -87,15 +77,12 @@ class save_conditioning:
def sanitize_filename(self, filename): def sanitize_filename(self, filename):
"""Remove invalid characters from filename""" """Remove invalid characters from filename"""
# Remove or replace invalid characters for filenames
invalid_chars = '<>:"/\\|?*' invalid_chars = '<>:"/\\|?*'
for char in invalid_chars: for char in invalid_chars:
filename = filename.replace(char, '_') filename = filename.replace(char, '_')
# Remove leading/trailing spaces and dots
filename = filename.strip(' .') filename = filename.strip(' .')
# Ensure filename is not empty after sanitization
if not filename: if not filename:
filename = "unnamed_conditioning" filename = "unnamed_conditioning"
@ -106,12 +93,10 @@ class save_conditioning:
try: try:
return Path(folder_paths.get_annotated_filepath(str(path_str))) return Path(folder_paths.get_annotated_filepath(str(path_str)))
except: except:
# Fallback to simple path if annotation fails
return Path(path_str) return Path(path_str)
@classmethod @classmethod
def IS_CHANGED(cls, conditioning, cache_name): def IS_CHANGED(cls, conditioning, cache_name):
# Change detection based on cache name
return hashlib.sha256(f"{cache_name}".encode('utf-8')).hexdigest() return hashlib.sha256(f"{cache_name}".encode('utf-8')).hexdigest()
@classmethod @classmethod
@ -125,7 +110,6 @@ class save_conditioning:
class load_conditioning: class load_conditioning:
@classmethod @classmethod
def INPUT_TYPES(cls): def INPUT_TYPES(cls):
# Get list of cached files for dropdown
cached_files = cls.get_cached_files() cached_files = cls.get_cached_files()
return { return {
@ -146,23 +130,19 @@ class load_conditioning:
os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True)
cache_files = [] cache_files = []
# Check if directory exists and is readable
if not os.path.exists(CACHE_DIR): if not os.path.exists(CACHE_DIR):
print(f"[CFZ Load] Cache directory doesn't exist: {CACHE_DIR}") print(f"[CFZ Load] Cache directory doesn't exist: {CACHE_DIR}")
return ["no_cache_directory"] return ["no_cache_directory"]
for filename in os.listdir(CACHE_DIR): for filename in os.listdir(CACHE_DIR):
if filename.endswith('.pt'): if filename.endswith('.pt'):
# Remove .pt extension for display
cache_name = filename[:-3] cache_name = filename[:-3]
cache_files.append(cache_name) cache_files.append(cache_name)
# Sort alphabetically
cache_files.sort() cache_files.sort()
# Return list with at least one option
if cache_files: if cache_files:
print(f"[CFZ Load] Found {len(cache_files)} cached files") # print(f"[CFZ Load] Found {len(cache_files)} cached files")
return cache_files return cache_files
else: else:
print("[CFZ Load] No cache files found") print("[CFZ Load] No cache files found")
@ -174,8 +154,8 @@ class load_conditioning:
def load_conditioning(self, cache_name): def load_conditioning(self, cache_name):
"""Load conditioning from selected cached file""" """Load conditioning from selected cached file"""
print(f"[CFZ Load] Loading conditioning:") # print(f"[CFZ Load] Loading conditioning:")
print(f" - cache_name: '{cache_name}'") # print(f" - cache_name: '{cache_name}'")
if cache_name in ["no_cache_files_found", "error_reading_cache", "no_cache_directory", ""]: if cache_name in ["no_cache_files_found", "error_reading_cache", "no_cache_directory", ""]:
raise ValueError("No valid cached conditioning file selected") raise ValueError("No valid cached conditioning file selected")
@ -183,16 +163,15 @@ class load_conditioning:
file_path = os.path.join(CACHE_DIR, f"{cache_name}.pt") file_path = os.path.join(CACHE_DIR, f"{cache_name}.pt")
file_path = self._resolve_path(file_path) file_path = self._resolve_path(file_path)
print(f" - file path: {file_path}") # print(f" - file path: {file_path}")
print(f" - file exists: {os.path.exists(file_path)}") # print(f" - file exists: {os.path.exists(file_path)}")
if not os.path.exists(file_path): if not os.path.exists(file_path):
raise ValueError(f"Cached conditioning not found: {cache_name}.pt") raise ValueError(f"Cached conditioning not found: {cache_name}.pt")
try: try:
cached_tensor = torch.load(file_path, map_location='cpu') cached_tensor = torch.load(file_path, map_location='cpu')
print(f"[CFZ Load] ✓ Successfully loaded: {cache_name}.pt") print(f"[CFZ Load Cached Conditioning] ✓ Successfully loaded: {cache_name}.pt")
print(f" - tensor type: {type(cached_tensor)}")
return (cached_tensor,) return (cached_tensor,)
except Exception as e: except Exception as e:
print(f"[CFZ Load] ✗ Error loading: {e}") print(f"[CFZ Load] ✗ Error loading: {e}")
@ -203,12 +182,10 @@ class load_conditioning:
try: try:
return Path(folder_paths.get_annotated_filepath(str(path_str))) return Path(folder_paths.get_annotated_filepath(str(path_str)))
except: except:
# Fallback to simple path if annotation fails
return Path(path_str) return Path(path_str)
@classmethod @classmethod
def IS_CHANGED(cls, cache_name): def IS_CHANGED(cls, cache_name):
# Change detection based on selected cache file
return cache_name return cache_name
@classmethod @classmethod
@ -221,4 +198,27 @@ class load_conditioning:
if not os.path.exists(cache_path): if not os.path.exists(cache_path):
return f"Selected cache file does not exist: {cache_name}.pt" return f"Selected cache file does not exist: {cache_name}.pt"
return True return True
class CFZ_PrintMarker:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"message": ("STRING", {"default": "Reached this step!", "multiline": True}),
},
"optional": {
"trigger": (any_type, {}),
},
"hidden": {"unique_id": "UNIQUE_ID", "extra_pnginfo": "EXTRA_PNGINFO"}
}
RETURN_TYPES = (any_type,) # Pass through whatever was received
RETURN_NAMES = ("output",)
OUTPUT_NODE = True
FUNCTION = "run"
CATEGORY = "CFZ Utils/Debug"
def run(self, message, trigger=None, unique_id=None, extra_pnginfo=None):
print(f"\n[🔔 CFZ Marker] {message}\n")
return (trigger,)