Compare commits

...

18 Commits

Author SHA1 Message Date
Deep Mehta
01cd571aee
Merge 0141af0786 into ba5bf3f1a8 2026-02-02 19:23:20 -08:00
Alexander Piskun
ba5bf3f1a8
[API Nodes] HitPaw API nodes (#12117)
* feat(api-nodes): add HitPaw API nodes

* remove face_soft_2x model as not working

---------

Co-authored-by: Robin Huang <robin.j.huang@gmail.com>
2026-02-02 19:17:59 -08:00
comfyanonymous
c05a08ae66
Add back function. (#12234)
Some checks are pending
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Waiting to run
Execution Tests / test (macos-latest) (push) Waiting to run
Execution Tests / test (ubuntu-latest) (push) Waiting to run
Execution Tests / test (windows-latest) (push) Waiting to run
Test server launches without errors / test (push) Waiting to run
Unit Tests / test (macos-latest) (push) Waiting to run
Unit Tests / test (ubuntu-latest) (push) Waiting to run
Unit Tests / test (windows-2022) (push) Waiting to run
2026-02-02 19:52:07 -05:00
rattus
de9ada6a41
Dynamic VRAM unloading fix (#12227)
* mp: fix full dynamic unloading

This was not unloading dynamic models when requesting a full unload via
the unpatch() code path.

This was ok, i your workflow was all dynamic models but fails with big
VRAM leaks if you need to fully unload something for a regular ModelPatcher

It also fices the "unload models" button.

* mm: load models outside of Aimdo Mempool

In dynamic_vram mode, escape the Aimdo mempool and load into the regular
mempool. Use a dummy thread to do it.
2026-02-02 17:35:20 -05:00
rattus
37f711d4a1
mm: Fix cast buffers with intel offloading (#12229)
Intel has offloading support but there were some nvidia calls in the
new cast buffer stuff.
2026-02-02 17:34:46 -05:00
comfyanonymous
dd86b15521
Enable embeddings for some qwen 3 models. (#12218)
Some checks are pending
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Waiting to run
Execution Tests / test (macos-latest) (push) Waiting to run
Execution Tests / test (ubuntu-latest) (push) Waiting to run
Execution Tests / test (windows-latest) (push) Waiting to run
Test server launches without errors / test (push) Waiting to run
Unit Tests / test (macos-latest) (push) Waiting to run
Unit Tests / test (ubuntu-latest) (push) Waiting to run
Unit Tests / test (windows-2022) (push) Waiting to run
2026-02-02 03:51:09 -05:00
Deep Mehta
0141af0786 refactor: rename _is_cacheable_value to _is_external_cacheable_value
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Clearer name since objects are also cached locally - this specifically
checks for external caching eligibility.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-30 00:46:10 +05:30
Deep Mehta
0440ebcf6e feat: add optional ui field to CacheValue
Some checks are pending
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
- Add ui field to CacheValue dataclass (default None)
- Pass ui when creating CacheValue for external providers
- Use result.ui (or default {}) when returning from external cache lookup

This allows external cache implementations to store/retrieve UI data
if desired, while remaining optional for implementations that skip it.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-29 20:22:04 +05:30
Deep Mehta
4afa80dc07 docs: make should_cache docstring implementation-agnostic
Remove prescriptive filtering suggestions - let implementations
decide their own caching logic based on their use case.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-29 19:52:06 +05:30
Deep Mehta
d755f7ca19 docs: clarify should_cache filtering criteria
Change docstring from "Skip large values" to "Skip if download time > compute time"
which better captures the cost/benefit tradeoff for external caching.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-29 19:50:12 +05:30
Deep Mehta
2049066cff refactor: expose CacheProvider API via comfy_api.latest.Caching
- Add Caching class to comfy_api/latest/__init__.py that re-exports
  from comfy_execution.cache_provider (source of truth)
- Fix docstring: "Skip large values" instead of "Skip small values"
  (small compute-heavy values are good cache targets)
- Maintain backward compatibility: comfy_execution.cache_provider
  imports still work

Usage:
    from comfy_api.latest import Caching

    class MyProvider(Caching.CacheProvider):
        def on_lookup(self, context): ...
        def on_store(self, context, value): ...

    Caching.register_provider(MyProvider())

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-29 19:42:34 +05:30
Deep Mehta
9b0ca8b95c Merge remote-tracking branch 'origin/master' into feat/cache-provider-api
Some checks failed
Build package / Build Test (3.10) (push) Has been cancelled
Build package / Build Test (3.11) (push) Has been cancelled
Build package / Build Test (3.12) (push) Has been cancelled
Build package / Build Test (3.13) (push) Has been cancelled
Build package / Build Test (3.14) (push) Has been cancelled
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
2026-01-28 13:30:03 +05:30
Deep Mehta
dcf686857c fix: use hashable types in frozenset test and add dict test
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Frozensets can only contain hashable types, so use nested frozensets
instead of dicts. Added separate test for dict handling via serialize_cache_key.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-24 15:47:53 +05:30
Deep Mehta
17eed38750 fix: move _torch_available before usage and use importlib.util.find_spec
Fixes ruff F821 (undefined name) and F401 (unused import) errors.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-24 15:34:29 +05:30
Deep Mehta
f4623c0e1b style: remove unused imports in test_cache_provider.py
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-24 14:25:00 +05:30
Deep Mehta
5e4bbca1ad test: add unit tests for CacheProvider API
- Add comprehensive tests for _canonicalize deterministic ordering
- Add tests for serialize_cache_key hash consistency
- Add tests for contains_nan utility
- Add tests for estimate_value_size
- Add tests for provider registry (register, unregister, clear)
- Move json import to top-level (fix inline import)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-24 14:20:49 +05:30
Deep Mehta
e17571d9be fix: use deterministic hash for cache keys instead of pickle
Pickle serialization is NOT deterministic across Python sessions due
to hash randomization affecting frozenset iteration order. This causes
distributed caching to fail because different pods compute different
hashes for identical cache keys.

Fix: Use _canonicalize() + JSON serialization which ensures deterministic
ordering regardless of Python's hash randomization.

This is critical for cross-pod cache key consistency in Kubernetes
deployments.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-24 14:07:46 +05:30
Deep Mehta
6540aa0400 feat: Add CacheProvider API for external distributed caching
Introduces a public API for external cache providers, enabling distributed
caching across multiple ComfyUI instances (e.g., Kubernetes pods).

New files:
- comfy_execution/cache_provider.py: CacheProvider ABC, CacheContext/CacheValue
  dataclasses, thread-safe provider registry, serialization utilities

Modified files:
- comfy_execution/caching.py: Add provider hooks to BasicCache (_notify_providers_store,
  _check_providers_lookup), subcache exclusion, prompt ID propagation
- execution.py: Add prompt lifecycle hooks (on_prompt_start/on_prompt_end) to
  PromptExecutor, set _current_prompt_id on caches

Key features:
- Local-first caching (check local before external for performance)
- NaN detection to prevent incorrect external cache hits
- Subcache exclusion (ephemeral subgraph results not cached externally)
- Thread-safe provider snapshot caching
- Graceful error handling (provider errors logged, never break execution)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-19 16:43:13 +05:30
13 changed files with 1349 additions and 74 deletions

View File

@ -19,7 +19,8 @@
import psutil
import logging
from enum import Enum
from comfy.cli_args import args, PerformanceFeature
from comfy.cli_args import args, PerformanceFeature, enables_dynamic_vram
import threading
import torch
import sys
import platform
@ -650,7 +651,7 @@ def free_memory(memory_required, device, keep_loaded=[], for_dynamic=False, ram_
soft_empty_cache()
return unloaded_models
def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimum_memory_required=None, force_full_load=False):
def load_models_gpu_orig(models, memory_required=0, force_patch_weights=False, minimum_memory_required=None, force_full_load=False):
cleanup_models_gc()
global vram_state
@ -746,6 +747,26 @@ def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimu
current_loaded_models.insert(0, loaded_model)
return
def load_models_gpu_thread(models, memory_required, force_patch_weights, minimum_memory_required, force_full_load):
with torch.inference_mode():
load_models_gpu_orig(models, memory_required, force_patch_weights, minimum_memory_required, force_full_load)
soft_empty_cache()
def load_models_gpu(models, memory_required=0, force_patch_weights=False, minimum_memory_required=None, force_full_load=False):
#Deliberately load models outside of the Aimdo mempool so they can be retained accross
#nodes. Use a dummy thread to do it as pytorch documents that mempool contexts are
#thread local. So exploit that to escape context
if enables_dynamic_vram():
t = threading.Thread(
target=load_models_gpu_thread,
args=(models, memory_required, force_patch_weights, minimum_memory_required, force_full_load)
)
t.start()
t.join()
else:
load_models_gpu_orig(models, memory_required=memory_required, force_patch_weights=force_patch_weights,
minimum_memory_required=minimum_memory_required, force_full_load=force_full_load)
def load_model_gpu(model):
return load_models_gpu([model])
@ -1112,11 +1133,11 @@ def get_cast_buffer(offload_stream, device, size, ref):
return None
if cast_buffer is not None and cast_buffer.numel() > 50 * (1024 ** 2):
#I want my wrongly sized 50MB+ of VRAM back from the caching allocator right now
torch.cuda.synchronize()
synchronize()
del STREAM_CAST_BUFFERS[offload_stream]
del cast_buffer
#FIXME: This doesn't work in Aimdo because mempool cant clear cache
torch.cuda.empty_cache()
soft_empty_cache()
with wf_context:
cast_buffer = torch.empty((size), dtype=torch.int8, device=device)
STREAM_CAST_BUFFERS[offload_stream] = cast_buffer
@ -1132,9 +1153,7 @@ def reset_cast_buffers():
for offload_stream in STREAM_CAST_BUFFERS:
offload_stream.synchronize()
STREAM_CAST_BUFFERS.clear()
if comfy.memory_management.aimdo_allocator is None:
#Pytorch 2.7 and earlier crashes if you try and empty_cache when mempools exist
torch.cuda.empty_cache()
soft_empty_cache()
def get_offload_stream(device):
stream_counter = stream_counters.get(device, 0)
@ -1284,7 +1303,7 @@ def discard_cuda_async_error():
a = torch.tensor([1], dtype=torch.uint8, device=get_torch_device())
b = torch.tensor([1], dtype=torch.uint8, device=get_torch_device())
_ = a + b
torch.cuda.synchronize()
synchronize()
except torch.AcceleratorError:
#Dump it! We already know about it from the synchronous return
pass
@ -1688,6 +1707,12 @@ def lora_compute_dtype(device):
LORA_COMPUTE_DTYPES[device] = dtype
return dtype
def synchronize():
if is_intel_xpu():
torch.xpu.synchronize()
elif torch.cuda.is_available():
torch.cuda.synchronize()
def soft_empty_cache(force=False):
global cpu_state
if cpu_state == CPUState.MPS:
@ -1713,9 +1738,6 @@ def debug_memory_summary():
return torch.cuda.memory.memory_summary()
return ""
#TODO: might be cleaner to put this somewhere else
import threading
class InterruptProcessingException(Exception):
pass

View File

@ -1597,7 +1597,7 @@ class ModelPatcherDynamic(ModelPatcher):
if unpatch_weights:
self.partially_unload_ram(1e32)
self.partially_unload(None)
self.partially_unload(None, 1e32)
def partially_load(self, device_to, extra_memory=0, force_patch_weights=False):
assert not force_patch_weights #See above

View File

@ -8,7 +8,7 @@ import torch
class Qwen3Tokenizer(sd1_clip.SDTokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}):
tokenizer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "qwen25_tokenizer")
super().__init__(tokenizer_path, pad_with_end=False, embedding_size=1024, embedding_key='qwen3_06b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=1, pad_token=151643, tokenizer_data=tokenizer_data)
super().__init__(tokenizer_path, pad_with_end=False, embedding_directory=embedding_directory, embedding_size=1024, embedding_key='qwen3_06b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=1, pad_token=151643, tokenizer_data=tokenizer_data)
class T5XXLTokenizer(sd1_clip.SDTokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}):

View File

@ -118,7 +118,7 @@ class MistralTokenizerClass:
class Mistral3Tokenizer(sd1_clip.SDTokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}):
self.tekken_data = tokenizer_data.get("tekken_model", None)
super().__init__("", pad_with_end=False, embedding_size=5120, embedding_key='mistral3_24b', tokenizer_class=MistralTokenizerClass, has_end_token=False, pad_to_max_length=False, pad_token=11, start_token=1, max_length=99999999, min_length=1, pad_left=True, tokenizer_args=load_mistral_tokenizer(self.tekken_data), tokenizer_data=tokenizer_data)
super().__init__("", pad_with_end=False, embedding_directory=embedding_directory, embedding_size=5120, embedding_key='mistral3_24b', tokenizer_class=MistralTokenizerClass, has_end_token=False, pad_to_max_length=False, pad_token=11, start_token=1, max_length=99999999, min_length=1, pad_left=True, tokenizer_args=load_mistral_tokenizer(self.tekken_data), tokenizer_data=tokenizer_data)
def state_dict(self):
return {"tekken_model": self.tekken_data}
@ -176,12 +176,12 @@ def flux2_te(dtype_llama=None, llama_quantization_metadata=None, pruned=False):
class Qwen3Tokenizer(sd1_clip.SDTokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}):
tokenizer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "qwen25_tokenizer")
super().__init__(tokenizer_path, pad_with_end=False, embedding_size=2560, embedding_key='qwen3_4b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=512, pad_token=151643, tokenizer_data=tokenizer_data)
super().__init__(tokenizer_path, pad_with_end=False, embedding_directory=embedding_directory, embedding_size=2560, embedding_key='qwen3_4b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=512, pad_token=151643, tokenizer_data=tokenizer_data)
class Qwen3Tokenizer8B(sd1_clip.SDTokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}):
tokenizer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "qwen25_tokenizer")
super().__init__(tokenizer_path, pad_with_end=False, embedding_size=4096, embedding_key='qwen3_8b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=512, pad_token=151643, tokenizer_data=tokenizer_data)
super().__init__(tokenizer_path, pad_with_end=False, embedding_directory=embedding_directory, embedding_size=4096, embedding_key='qwen3_8b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=512, pad_token=151643, tokenizer_data=tokenizer_data)
class KleinTokenizer(sd1_clip.SD1Tokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}, name="qwen3_4b"):

View File

@ -6,7 +6,7 @@ import os
class Qwen3Tokenizer(sd1_clip.SDTokenizer):
def __init__(self, embedding_directory=None, tokenizer_data={}):
tokenizer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "qwen25_tokenizer")
super().__init__(tokenizer_path, pad_with_end=False, embedding_size=2560, embedding_key='qwen3_4b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=1, pad_token=151643, tokenizer_data=tokenizer_data)
super().__init__(tokenizer_path, pad_with_end=False, embedding_directory=embedding_directory, embedding_size=2560, embedding_key='qwen3_4b', tokenizer_class=Qwen2Tokenizer, has_start_token=False, has_end_token=False, pad_to_max_length=False, max_length=99999999, min_length=1, pad_token=151643, tokenizer_data=tokenizer_data)
class ZImageTokenizer(sd1_clip.SD1Tokenizer):

View File

@ -106,6 +106,42 @@ class Types:
MESH = MESH
VOXEL = VOXEL
class Caching:
"""
External cache provider API for distributed caching.
Enables sharing cached results across multiple ComfyUI instances
(e.g., Kubernetes pods) without monkey-patching internal methods.
Example usage:
from comfy_api.latest import Caching
class MyRedisProvider(Caching.CacheProvider):
def on_lookup(self, context):
# Check Redis for cached result
...
def on_store(self, context, value):
# Store to Redis (can be async internally)
...
Caching.register_provider(MyRedisProvider())
"""
# Import from comfy_execution.cache_provider (source of truth)
from comfy_execution.cache_provider import (
CacheProvider,
CacheContext,
CacheValue,
register_cache_provider as register_provider,
unregister_cache_provider as unregister_provider,
get_cache_providers as get_providers,
has_cache_providers as has_providers,
clear_cache_providers as clear_providers,
estimate_value_size,
)
ComfyAPI = ComfyAPI_latest
# Create a synchronous version of the API
@ -125,6 +161,7 @@ __all__ = [
"Input",
"InputImpl",
"Types",
"Caching",
"ComfyExtension",
"io",
"IO",

View File

@ -0,0 +1,51 @@
from typing import TypedDict
from pydantic import BaseModel, Field
class InputVideoModel(TypedDict):
model: str
resolution: str
class ImageEnhanceTaskCreateRequest(BaseModel):
model_name: str = Field(...)
img_url: str = Field(...)
extension: str = Field(".png")
exif: bool = Field(False)
DPI: int | None = Field(None)
class VideoEnhanceTaskCreateRequest(BaseModel):
video_url: str = Field(...)
extension: str = Field(".mp4")
model_name: str | None = Field(...)
resolution: list[int] = Field(..., description="Target resolution [width, height]")
original_resolution: list[int] = Field(..., description="Original video resolution [width, height]")
class TaskCreateDataResponse(BaseModel):
job_id: str = Field(...)
consume_coins: int | None = Field(None)
class TaskStatusPollRequest(BaseModel):
job_id: str = Field(...)
class TaskCreateResponse(BaseModel):
code: int = Field(...)
message: str = Field(...)
data: TaskCreateDataResponse | None = Field(None)
class TaskStatusDataResponse(BaseModel):
job_id: str = Field(...)
status: str = Field(...)
res_url: str = Field("")
class TaskStatusResponse(BaseModel):
code: int = Field(...)
message: str = Field(...)
data: TaskStatusDataResponse = Field(...)

View File

@ -0,0 +1,342 @@
import math
from typing_extensions import override
from comfy_api.latest import IO, ComfyExtension, Input
from comfy_api_nodes.apis.hitpaw import (
ImageEnhanceTaskCreateRequest,
InputVideoModel,
TaskCreateDataResponse,
TaskCreateResponse,
TaskStatusPollRequest,
TaskStatusResponse,
VideoEnhanceTaskCreateRequest,
)
from comfy_api_nodes.util import (
ApiEndpoint,
download_url_to_image_tensor,
download_url_to_video_output,
downscale_image_tensor,
get_image_dimensions,
poll_op,
sync_op,
upload_image_to_comfyapi,
upload_video_to_comfyapi,
validate_video_duration,
)
VIDEO_MODELS_MODELS_MAP = {
"Portrait Restore Model (1x)": "portrait_restore_1x",
"Portrait Restore Model (2x)": "portrait_restore_2x",
"General Restore Model (1x)": "general_restore_1x",
"General Restore Model (2x)": "general_restore_2x",
"General Restore Model (4x)": "general_restore_4x",
"Ultra HD Model (2x)": "ultrahd_restore_2x",
"Generative Model (1x)": "generative_1x",
}
# Resolution name to target dimension (shorter side) in pixels
RESOLUTION_TARGET_MAP = {
"720p": 720,
"1080p": 1080,
"2K/QHD": 1440,
"4K/UHD": 2160,
"8K": 4320,
}
# Square (1:1) resolutions use standard square dimensions
RESOLUTION_SQUARE_MAP = {
"720p": 720,
"1080p": 1080,
"2K/QHD": 1440,
"4K/UHD": 2048, # DCI 4K square
"8K": 4096, # DCI 8K square
}
# Models with limited resolution support (no 8K)
LIMITED_RESOLUTION_MODELS = {"Generative Model (1x)"}
# Resolution options for different model types
RESOLUTIONS_LIMITED = ["original", "720p", "1080p", "2K/QHD", "4K/UHD"]
RESOLUTIONS_FULL = ["original", "720p", "1080p", "2K/QHD", "4K/UHD", "8K"]
# Maximum output resolution in pixels
MAX_PIXELS_GENERATIVE = 32_000_000
MAX_MP_GENERATIVE = MAX_PIXELS_GENERATIVE // 1_000_000
class HitPawGeneralImageEnhance(IO.ComfyNode):
@classmethod
def define_schema(cls):
return IO.Schema(
node_id="HitPawGeneralImageEnhance",
display_name="HitPaw General Image Enhance",
category="api node/image/HitPaw",
description="Upscale low-resolution images to super-resolution, eliminate artifacts and noise. "
f"Maximum output: {MAX_MP_GENERATIVE} megapixels.",
inputs=[
IO.Combo.Input("model", options=["generative_portrait", "generative"]),
IO.Image.Input("image"),
IO.Combo.Input("upscale_factor", options=[1, 2, 4]),
IO.Boolean.Input(
"auto_downscale",
default=False,
tooltip="Automatically downscale input image if output would exceed the limit.",
),
],
outputs=[
IO.Image.Output(),
],
hidden=[
IO.Hidden.auth_token_comfy_org,
IO.Hidden.api_key_comfy_org,
IO.Hidden.unique_id,
],
is_api_node=True,
price_badge=IO.PriceBadge(
depends_on=IO.PriceBadgeDepends(widgets=["model"]),
expr="""
(
$prices := {
"generative_portrait": {"min": 0.02, "max": 0.06},
"generative": {"min": 0.05, "max": 0.15}
};
$price := $lookup($prices, widgets.model);
{
"type": "range_usd",
"min_usd": $price.min,
"max_usd": $price.max
}
)
""",
),
)
@classmethod
async def execute(
cls,
model: str,
image: Input.Image,
upscale_factor: int,
auto_downscale: bool,
) -> IO.NodeOutput:
height, width = get_image_dimensions(image)
requested_scale = upscale_factor
output_pixels = height * width * requested_scale * requested_scale
if output_pixels > MAX_PIXELS_GENERATIVE:
if auto_downscale:
input_pixels = width * height
scale = 1
max_input_pixels = MAX_PIXELS_GENERATIVE
for candidate in [4, 2, 1]:
if candidate > requested_scale:
continue
scale_output_pixels = input_pixels * candidate * candidate
if scale_output_pixels <= MAX_PIXELS_GENERATIVE:
scale = candidate
max_input_pixels = None
break
# Check if we can downscale input by at most 2x to fit
downscale_ratio = math.sqrt(scale_output_pixels / MAX_PIXELS_GENERATIVE)
if downscale_ratio <= 2.0:
scale = candidate
max_input_pixels = MAX_PIXELS_GENERATIVE // (candidate * candidate)
break
if max_input_pixels is not None:
image = downscale_image_tensor(image, total_pixels=max_input_pixels)
upscale_factor = scale
else:
output_width = width * requested_scale
output_height = height * requested_scale
raise ValueError(
f"Output size ({output_width}x{output_height} = {output_pixels:,} pixels) "
f"exceeds maximum allowed size of {MAX_PIXELS_GENERATIVE:,} pixels ({MAX_MP_GENERATIVE}MP). "
f"Enable auto_downscale or use a smaller input image or a lower upscale factor."
)
initial_res = await sync_op(
cls,
ApiEndpoint(path="/proxy/hitpaw/api/photo-enhancer", method="POST"),
response_model=TaskCreateResponse,
data=ImageEnhanceTaskCreateRequest(
model_name=f"{model}_{upscale_factor}x",
img_url=await upload_image_to_comfyapi(cls, image, total_pixels=None),
),
wait_label="Creating task",
final_label_on_success="Task created",
)
if initial_res.code != 200:
raise ValueError(f"Task creation failed with code {initial_res.code}: {initial_res.message}")
request_price = initial_res.data.consume_coins / 1000
final_response = await poll_op(
cls,
ApiEndpoint(path="/proxy/hitpaw/api/task-status", method="POST"),
data=TaskCreateDataResponse(job_id=initial_res.data.job_id),
response_model=TaskStatusResponse,
status_extractor=lambda x: x.data.status,
price_extractor=lambda x: request_price,
poll_interval=10.0,
max_poll_attempts=480,
)
return IO.NodeOutput(await download_url_to_image_tensor(final_response.data.res_url))
class HitPawVideoEnhance(IO.ComfyNode):
@classmethod
def define_schema(cls):
model_options = []
for model_name in VIDEO_MODELS_MODELS_MAP:
if model_name in LIMITED_RESOLUTION_MODELS:
resolutions = RESOLUTIONS_LIMITED
else:
resolutions = RESOLUTIONS_FULL
model_options.append(
IO.DynamicCombo.Option(
model_name,
[IO.Combo.Input("resolution", options=resolutions)],
)
)
return IO.Schema(
node_id="HitPawVideoEnhance",
display_name="HitPaw Video Enhance",
category="api node/video/HitPaw",
description="Upscale low-resolution videos to high resolution, eliminate artifacts and noise. "
"Prices shown are per second of video.",
inputs=[
IO.DynamicCombo.Input("model", options=model_options),
IO.Video.Input("video"),
],
outputs=[
IO.Video.Output(),
],
hidden=[
IO.Hidden.auth_token_comfy_org,
IO.Hidden.api_key_comfy_org,
IO.Hidden.unique_id,
],
is_api_node=True,
price_badge=IO.PriceBadge(
depends_on=IO.PriceBadgeDepends(widgets=["model", "model.resolution"]),
expr="""
(
$m := $lookup(widgets, "model");
$res := $lookup(widgets, "model.resolution");
$standard_model_prices := {
"original": {"min": 0.01, "max": 0.198},
"720p": {"min": 0.01, "max": 0.06},
"1080p": {"min": 0.015, "max": 0.09},
"2k/qhd": {"min": 0.02, "max": 0.117},
"4k/uhd": {"min": 0.025, "max": 0.152},
"8k": {"min": 0.033, "max": 0.198}
};
$ultra_hd_model_prices := {
"original": {"min": 0.015, "max": 0.264},
"720p": {"min": 0.015, "max": 0.092},
"1080p": {"min": 0.02, "max": 0.12},
"2k/qhd": {"min": 0.026, "max": 0.156},
"4k/uhd": {"min": 0.034, "max": 0.203},
"8k": {"min": 0.044, "max": 0.264}
};
$generative_model_prices := {
"original": {"min": 0.015, "max": 0.338},
"720p": {"min": 0.008, "max": 0.090},
"1080p": {"min": 0.05, "max": 0.15},
"2k/qhd": {"min": 0.038, "max": 0.225},
"4k/uhd": {"min": 0.056, "max": 0.338}
};
$prices := $contains($m, "ultra hd") ? $ultra_hd_model_prices :
$contains($m, "generative") ? $generative_model_prices :
$standard_model_prices;
$price := $lookup($prices, $res);
{
"type": "range_usd",
"min_usd": $price.min,
"max_usd": $price.max,
"format": {"approximate": true, "suffix": "/second"}
}
)
""",
),
)
@classmethod
async def execute(
cls,
model: InputVideoModel,
video: Input.Video,
) -> IO.NodeOutput:
validate_video_duration(video, min_duration=0.5, max_duration=60 * 60)
resolution = model["resolution"]
src_width, src_height = video.get_dimensions()
if resolution == "original":
output_width = src_width
output_height = src_height
else:
if src_width == src_height:
target_size = RESOLUTION_SQUARE_MAP[resolution]
if target_size < src_width:
raise ValueError(
f"Selected resolution {resolution} ({target_size}x{target_size}) is smaller than "
f"the input video ({src_width}x{src_height}). Please select a higher resolution or 'original'."
)
output_width = target_size
output_height = target_size
else:
min_dimension = min(src_width, src_height)
target_size = RESOLUTION_TARGET_MAP[resolution]
if target_size < min_dimension:
raise ValueError(
f"Selected resolution {resolution} ({target_size}p) is smaller than "
f"the input video's shorter dimension ({min_dimension}p). "
f"Please select a higher resolution or 'original'."
)
if src_width > src_height:
output_height = target_size
output_width = int(target_size * (src_width / src_height))
else:
output_width = target_size
output_height = int(target_size * (src_height / src_width))
initial_res = await sync_op(
cls,
ApiEndpoint(path="/proxy/hitpaw/api/video-enhancer", method="POST"),
response_model=TaskCreateResponse,
data=VideoEnhanceTaskCreateRequest(
video_url=await upload_video_to_comfyapi(cls, video),
resolution=[output_width, output_height],
original_resolution=[src_width, src_height],
model_name=VIDEO_MODELS_MODELS_MAP[model["model"]],
),
wait_label="Creating task",
final_label_on_success="Task created",
)
request_price = initial_res.data.consume_coins / 1000
if initial_res.code != 200:
raise ValueError(f"Task creation failed with code {initial_res.code}: {initial_res.message}")
final_response = await poll_op(
cls,
ApiEndpoint(path="/proxy/hitpaw/api/task-status", method="POST"),
data=TaskStatusPollRequest(job_id=initial_res.data.job_id),
response_model=TaskStatusResponse,
status_extractor=lambda x: x.data.status,
price_extractor=lambda x: request_price,
poll_interval=10.0,
max_poll_attempts=320,
)
return IO.NodeOutput(await download_url_to_video_output(final_response.data.res_url))
class HitPawExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[IO.ComfyNode]]:
return [
HitPawGeneralImageEnhance,
HitPawVideoEnhance,
]
async def comfy_entrypoint() -> HitPawExtension:
return HitPawExtension()

View File

@ -94,7 +94,7 @@ async def upload_image_to_comfyapi(
*,
mime_type: str | None = None,
wait_label: str | None = "Uploading",
total_pixels: int = 2048 * 2048,
total_pixels: int | None = 2048 * 2048,
) -> str:
"""Uploads a single image to ComfyUI API and returns its download URL."""
return (

View File

@ -0,0 +1,319 @@
"""
External Cache Provider API for distributed caching.
This module provides a public API for external cache providers, enabling
distributed caching across multiple ComfyUI instances (e.g., Kubernetes pods).
Public API is also available via:
from comfy_api.latest import Caching
Example usage:
from comfy_execution.cache_provider import (
CacheProvider, CacheContext, CacheValue, register_cache_provider
)
class MyRedisProvider(CacheProvider):
def on_lookup(self, context: CacheContext) -> Optional[CacheValue]:
# Check Redis/GCS for cached result
...
def on_store(self, context: CacheContext, value: CacheValue) -> None:
# Store to Redis/GCS (can be async internally)
...
register_cache_provider(MyRedisProvider())
"""
from abc import ABC, abstractmethod
from typing import Any, Optional, Tuple, List
from dataclasses import dataclass
import hashlib
import json
import logging
import math
import pickle
import threading
logger = logging.getLogger(__name__)
# ============================================================
# Data Classes
# ============================================================
@dataclass
class CacheContext:
"""Context passed to provider methods."""
prompt_id: str # Current prompt execution ID
node_id: str # Node being cached
class_type: str # Node class type (e.g., "KSampler")
cache_key: Any # Raw cache key (frozenset structure)
cache_key_bytes: bytes # SHA256 hash for external storage key
@dataclass
class CacheValue:
"""
Value stored/retrieved from external cache.
The ui field is optional - implementations may choose to skip it
(e.g., if it contains non-portable data like local file paths).
"""
outputs: list # The tensor/value outputs
ui: dict = None # Optional UI data (may be skipped by implementations)
# ============================================================
# Provider Interface
# ============================================================
class CacheProvider(ABC):
"""
Abstract base class for external cache providers.
Thread Safety:
Providers may be called from multiple threads. Implementations
must be thread-safe.
Error Handling:
All methods are wrapped in try/except by the caller. Exceptions
are logged but never propagate to break execution.
Performance Guidelines:
- on_lookup: Should complete in <500ms (including network)
- on_store: Can be async internally (fire-and-forget)
- should_cache: Should be fast (<1ms), called frequently
"""
@abstractmethod
def on_lookup(self, context: CacheContext) -> Optional[CacheValue]:
"""
Check external storage for cached result.
Called AFTER local cache miss (local-first for performance).
Returns:
CacheValue if found externally, None otherwise.
Important:
- Return None on any error (don't raise)
- Validate data integrity before returning
"""
pass
@abstractmethod
def on_store(self, context: CacheContext, value: CacheValue) -> None:
"""
Store value to external cache.
Called AFTER value is stored in local cache.
Important:
- Can be fire-and-forget (async internally)
- Should never block execution
- Handle serialization failures gracefully
"""
pass
def should_cache(self, context: CacheContext, value: Optional[CacheValue] = None) -> bool:
"""
Filter which nodes should be externally cached.
Called before on_lookup (value=None) and on_store (value provided).
Return False to skip external caching for this node.
Implementations can filter based on context.class_type, value size,
or any custom logic. Use estimate_value_size() to get value size.
Default: Returns True (cache everything).
"""
return True
def on_prompt_start(self, prompt_id: str) -> None:
"""Called when prompt execution begins. Optional."""
pass
def on_prompt_end(self, prompt_id: str) -> None:
"""Called when prompt execution ends. Optional."""
pass
# ============================================================
# Provider Registry
# ============================================================
_providers: List[CacheProvider] = []
_providers_lock = threading.Lock()
_providers_snapshot: Optional[Tuple[CacheProvider, ...]] = None
def register_cache_provider(provider: CacheProvider) -> None:
"""
Register an external cache provider.
Providers are called in registration order. First provider to return
a result from on_lookup wins.
"""
global _providers_snapshot
with _providers_lock:
if provider in _providers:
logger.warning(f"Provider {provider.__class__.__name__} already registered")
return
_providers.append(provider)
_providers_snapshot = None # Invalidate cache
logger.info(f"Registered cache provider: {provider.__class__.__name__}")
def unregister_cache_provider(provider: CacheProvider) -> None:
"""Remove a previously registered provider."""
global _providers_snapshot
with _providers_lock:
try:
_providers.remove(provider)
_providers_snapshot = None
logger.info(f"Unregistered cache provider: {provider.__class__.__name__}")
except ValueError:
logger.warning(f"Provider {provider.__class__.__name__} was not registered")
def get_cache_providers() -> Tuple[CacheProvider, ...]:
"""Get registered providers (cached for performance)."""
global _providers_snapshot
snapshot = _providers_snapshot
if snapshot is not None:
return snapshot
with _providers_lock:
if _providers_snapshot is not None:
return _providers_snapshot
_providers_snapshot = tuple(_providers)
return _providers_snapshot
def has_cache_providers() -> bool:
"""Fast check if any providers registered (no lock)."""
return bool(_providers)
def clear_cache_providers() -> None:
"""Remove all providers. Useful for testing."""
global _providers_snapshot
with _providers_lock:
_providers.clear()
_providers_snapshot = None
# ============================================================
# Utilities
# ============================================================
def _canonicalize(obj: Any) -> Any:
"""
Convert an object to a canonical, JSON-serializable form.
This ensures deterministic ordering regardless of Python's hash randomization,
which is critical for cross-pod cache key consistency. Frozensets in particular
have non-deterministic iteration order between Python sessions.
"""
if isinstance(obj, frozenset):
# Sort frozenset items for deterministic ordering
return ("__frozenset__", sorted(
[_canonicalize(item) for item in obj],
key=lambda x: json.dumps(x, sort_keys=True)
))
elif isinstance(obj, set):
return ("__set__", sorted(
[_canonicalize(item) for item in obj],
key=lambda x: json.dumps(x, sort_keys=True)
))
elif isinstance(obj, tuple):
return ("__tuple__", [_canonicalize(item) for item in obj])
elif isinstance(obj, list):
return [_canonicalize(item) for item in obj]
elif isinstance(obj, dict):
return {str(k): _canonicalize(v) for k, v in sorted(obj.items())}
elif isinstance(obj, (int, float, str, bool, type(None))):
return obj
elif isinstance(obj, bytes):
return ("__bytes__", obj.hex())
elif hasattr(obj, 'value'):
# Handle Unhashable class from ComfyUI
return ("__unhashable__", _canonicalize(getattr(obj, 'value', None)))
else:
# For other types, use repr as fallback
return ("__repr__", repr(obj))
def serialize_cache_key(cache_key: Any) -> bytes:
"""
Serialize cache key to bytes for external storage.
Returns SHA256 hash suitable for Redis/database keys.
Note: Uses canonicalize + JSON serialization instead of pickle because
pickle is NOT deterministic across Python sessions due to hash randomization
affecting frozenset iteration order. This is critical for distributed caching
where different pods need to compute the same hash for identical inputs.
"""
try:
canonical = _canonicalize(cache_key)
json_str = json.dumps(canonical, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(json_str.encode('utf-8')).digest()
except Exception as e:
logger.warning(f"Failed to serialize cache key: {e}")
# Fallback to pickle (non-deterministic but better than nothing)
try:
serialized = pickle.dumps(cache_key, protocol=4)
return hashlib.sha256(serialized).digest()
except Exception:
return hashlib.sha256(str(id(cache_key)).encode()).digest()
def contains_nan(obj: Any) -> bool:
"""
Check if cache key contains NaN (indicates uncacheable node).
NaN != NaN in Python, so local cache never hits. But serialized
NaN would match, causing incorrect external hits. Must skip these.
"""
if isinstance(obj, float):
try:
return math.isnan(obj)
except (TypeError, ValueError):
return False
if hasattr(obj, 'value'): # Unhashable class
val = getattr(obj, 'value', None)
if isinstance(val, float):
try:
return math.isnan(val)
except (TypeError, ValueError):
return False
if isinstance(obj, (frozenset, tuple, list, set)):
return any(contains_nan(item) for item in obj)
if isinstance(obj, dict):
return any(contains_nan(k) or contains_nan(v) for k, v in obj.items())
return False
def estimate_value_size(value: CacheValue) -> int:
"""Estimate serialized size in bytes. Useful for size-based filtering."""
try:
import torch
except ImportError:
return 0
total = 0
def estimate(obj):
nonlocal total
if isinstance(obj, torch.Tensor):
total += obj.numel() * obj.element_size()
elif isinstance(obj, dict):
for v in obj.values():
estimate(v)
elif isinstance(obj, (list, tuple)):
for item in obj:
estimate(item)
for output in value.outputs:
estimate(output)
return total

View File

@ -155,6 +155,10 @@ class BasicCache:
self.cache = {}
self.subcaches = {}
# External cache provider support
self._is_subcache = False
self._current_prompt_id = ''
async def set_prompt(self, dynprompt, node_ids, is_changed_cache):
self.dynprompt = dynprompt
self.cache_key_set = self.key_class(dynprompt, node_ids, is_changed_cache)
@ -201,20 +205,123 @@ class BasicCache:
cache_key = self.cache_key_set.get_data_key(node_id)
self.cache[cache_key] = value
# Notify external providers
self._notify_providers_store(node_id, cache_key, value)
def _get_immediate(self, node_id):
if not self.initialized:
return None
cache_key = self.cache_key_set.get_data_key(node_id)
# Check local cache first (fast path)
if cache_key in self.cache:
return self.cache[cache_key]
else:
# Check external providers on local miss
external_result = self._check_providers_lookup(node_id, cache_key)
if external_result is not None:
self.cache[cache_key] = external_result # Warm local cache
return external_result
return None
def _notify_providers_store(self, node_id, cache_key, value):
"""Notify external providers of cache store."""
from comfy_execution.cache_provider import (
has_cache_providers, get_cache_providers,
CacheContext, CacheValue,
serialize_cache_key, contains_nan, logger
)
# Fast exit conditions
if self._is_subcache:
return
if not has_cache_providers():
return
if not self._is_external_cacheable_value(value):
return
if contains_nan(cache_key):
return
context = CacheContext(
prompt_id=self._current_prompt_id,
node_id=node_id,
class_type=self._get_class_type(node_id),
cache_key=cache_key,
cache_key_bytes=serialize_cache_key(cache_key)
)
cache_value = CacheValue(outputs=value.outputs, ui=value.ui)
for provider in get_cache_providers():
try:
if provider.should_cache(context, cache_value):
provider.on_store(context, cache_value)
except Exception as e:
logger.warning(f"Cache provider {provider.__class__.__name__} error on store: {e}")
def _check_providers_lookup(self, node_id, cache_key):
"""Check external providers for cached result."""
from comfy_execution.cache_provider import (
has_cache_providers, get_cache_providers,
CacheContext, CacheValue,
serialize_cache_key, contains_nan, logger
)
if self._is_subcache:
return None
if not has_cache_providers():
return None
if contains_nan(cache_key):
return None
context = CacheContext(
prompt_id=self._current_prompt_id,
node_id=node_id,
class_type=self._get_class_type(node_id),
cache_key=cache_key,
cache_key_bytes=serialize_cache_key(cache_key)
)
for provider in get_cache_providers():
try:
if not provider.should_cache(context):
continue
result = provider.on_lookup(context)
if result is not None:
if not isinstance(result, CacheValue):
logger.warning(f"Provider {provider.__class__.__name__} returned invalid type")
continue
if not isinstance(result.outputs, (list, tuple)):
logger.warning(f"Provider {provider.__class__.__name__} returned invalid outputs")
continue
# Import CacheEntry here to avoid circular import at module level
from execution import CacheEntry
return CacheEntry(ui=result.ui or {}, outputs=list(result.outputs))
except Exception as e:
logger.warning(f"Cache provider {provider.__class__.__name__} error on lookup: {e}")
return None
def _is_external_cacheable_value(self, value):
"""Check if value is a CacheEntry suitable for external caching (not objects cache)."""
return hasattr(value, 'outputs') and hasattr(value, 'ui')
def _get_class_type(self, node_id):
"""Get class_type for a node."""
if not self.initialized or not self.dynprompt:
return ''
try:
return self.dynprompt.get_node(node_id).get('class_type', '')
except Exception:
return ''
async def _ensure_subcache(self, node_id, children_ids):
subcache_key = self.cache_key_set.get_subcache_key(node_id)
subcache = self.subcaches.get(subcache_key, None)
if subcache is None:
subcache = BasicCache(self.key_class)
subcache._is_subcache = True # Mark as subcache - excludes from external caching
subcache._current_prompt_id = self._current_prompt_id # Propagate prompt ID
self.subcaches[subcache_key] = subcache
await subcache.set_prompt(self.dynprompt, children_ids, self.is_changed_cache)
return subcache

View File

@ -683,6 +683,22 @@ class PromptExecutor:
}
self.add_message("execution_error", mes, broadcast=False)
def _notify_prompt_lifecycle(self, event: str, prompt_id: str):
"""Notify external cache providers of prompt lifecycle events."""
from comfy_execution.cache_provider import has_cache_providers, get_cache_providers, logger
if not has_cache_providers():
return
for provider in get_cache_providers():
try:
if event == "start":
provider.on_prompt_start(prompt_id)
elif event == "end":
provider.on_prompt_end(prompt_id)
except Exception as e:
logger.warning(f"Cache provider {provider.__class__.__name__} error on {event}: {e}")
def execute(self, prompt, prompt_id, extra_data={}, execute_outputs=[]):
asyncio.run(self.execute_async(prompt, prompt_id, extra_data, execute_outputs))
@ -699,66 +715,77 @@ class PromptExecutor:
self.status_messages = []
self.add_message("execution_start", { "prompt_id": prompt_id}, broadcast=False)
with torch.inference_mode():
dynamic_prompt = DynamicPrompt(prompt)
reset_progress_state(prompt_id, dynamic_prompt)
add_progress_handler(WebUIProgressHandler(self.server))
is_changed_cache = IsChangedCache(prompt_id, dynamic_prompt, self.caches.outputs)
for cache in self.caches.all:
await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache)
cache.clean_unused()
# Set prompt ID on caches for external provider integration
for cache in self.caches.all:
cache._current_prompt_id = prompt_id
cached_nodes = []
for node_id in prompt:
if self.caches.outputs.get(node_id) is not None:
cached_nodes.append(node_id)
# Notify external cache providers of prompt start
self._notify_prompt_lifecycle("start", prompt_id)
comfy.model_management.cleanup_models_gc()
self.add_message("execution_cached",
{ "nodes": cached_nodes, "prompt_id": prompt_id},
broadcast=False)
pending_subgraph_results = {}
pending_async_nodes = {} # TODO - Unify this with pending_subgraph_results
ui_node_outputs = {}
executed = set()
execution_list = ExecutionList(dynamic_prompt, self.caches.outputs)
current_outputs = self.caches.outputs.all_node_ids()
for node_id in list(execute_outputs):
execution_list.add_node(node_id)
try:
with torch.inference_mode():
dynamic_prompt = DynamicPrompt(prompt)
reset_progress_state(prompt_id, dynamic_prompt)
add_progress_handler(WebUIProgressHandler(self.server))
is_changed_cache = IsChangedCache(prompt_id, dynamic_prompt, self.caches.outputs)
for cache in self.caches.all:
await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache)
cache.clean_unused()
while not execution_list.is_empty():
node_id, error, ex = await execution_list.stage_node_execution()
if error is not None:
self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex)
break
cached_nodes = []
for node_id in prompt:
if self.caches.outputs.get(node_id) is not None:
cached_nodes.append(node_id)
assert node_id is not None, "Node ID should not be None at this point"
result, error, ex = await execute(self.server, dynamic_prompt, self.caches, node_id, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_node_outputs)
self.success = result != ExecutionResult.FAILURE
if result == ExecutionResult.FAILURE:
self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex)
break
elif result == ExecutionResult.PENDING:
execution_list.unstage_node_execution()
else: # result == ExecutionResult.SUCCESS:
execution_list.complete_node_execution()
self.caches.outputs.poll(ram_headroom=self.cache_args["ram"])
else:
# Only execute when the while-loop ends without break
self.add_message("execution_success", { "prompt_id": prompt_id }, broadcast=False)
comfy.model_management.cleanup_models_gc()
self.add_message("execution_cached",
{ "nodes": cached_nodes, "prompt_id": prompt_id},
broadcast=False)
pending_subgraph_results = {}
pending_async_nodes = {} # TODO - Unify this with pending_subgraph_results
ui_node_outputs = {}
executed = set()
execution_list = ExecutionList(dynamic_prompt, self.caches.outputs)
current_outputs = self.caches.outputs.all_node_ids()
for node_id in list(execute_outputs):
execution_list.add_node(node_id)
ui_outputs = {}
meta_outputs = {}
for node_id, ui_info in ui_node_outputs.items():
ui_outputs[node_id] = ui_info["output"]
meta_outputs[node_id] = ui_info["meta"]
self.history_result = {
"outputs": ui_outputs,
"meta": meta_outputs,
}
self.server.last_node_id = None
if comfy.model_management.DISABLE_SMART_MEMORY:
comfy.model_management.unload_all_models()
while not execution_list.is_empty():
node_id, error, ex = await execution_list.stage_node_execution()
if error is not None:
self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex)
break
assert node_id is not None, "Node ID should not be None at this point"
result, error, ex = await execute(self.server, dynamic_prompt, self.caches, node_id, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_node_outputs)
self.success = result != ExecutionResult.FAILURE
if result == ExecutionResult.FAILURE:
self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex)
break
elif result == ExecutionResult.PENDING:
execution_list.unstage_node_execution()
else: # result == ExecutionResult.SUCCESS:
execution_list.complete_node_execution()
self.caches.outputs.poll(ram_headroom=self.cache_args["ram"])
else:
# Only execute when the while-loop ends without break
self.add_message("execution_success", { "prompt_id": prompt_id }, broadcast=False)
ui_outputs = {}
meta_outputs = {}
for node_id, ui_info in ui_node_outputs.items():
ui_outputs[node_id] = ui_info["output"]
meta_outputs[node_id] = ui_info["meta"]
self.history_result = {
"outputs": ui_outputs,
"meta": meta_outputs,
}
self.server.last_node_id = None
if comfy.model_management.DISABLE_SMART_MEMORY:
comfy.model_management.unload_all_models()
finally:
# Notify external cache providers of prompt end
self._notify_prompt_lifecycle("end", prompt_id)
async def validate_inputs(prompt_id, prompt, item, validated):

View File

@ -0,0 +1,370 @@
"""Tests for external cache provider API."""
import importlib.util
import pytest
from typing import Optional
def _torch_available() -> bool:
"""Check if PyTorch is available."""
return importlib.util.find_spec("torch") is not None
from comfy_execution.cache_provider import (
CacheProvider,
CacheContext,
CacheValue,
register_cache_provider,
unregister_cache_provider,
get_cache_providers,
has_cache_providers,
clear_cache_providers,
serialize_cache_key,
contains_nan,
estimate_value_size,
_canonicalize,
)
class TestCanonicalize:
"""Test _canonicalize function for deterministic ordering."""
def test_frozenset_ordering_is_deterministic(self):
"""Frozensets should produce consistent canonical form regardless of iteration order."""
# Create two frozensets with same content
fs1 = frozenset([("a", 1), ("b", 2), ("c", 3)])
fs2 = frozenset([("c", 3), ("a", 1), ("b", 2)])
result1 = _canonicalize(fs1)
result2 = _canonicalize(fs2)
assert result1 == result2
def test_nested_frozenset_ordering(self):
"""Nested frozensets should also be deterministically ordered."""
inner1 = frozenset([1, 2, 3])
inner2 = frozenset([3, 2, 1])
fs1 = frozenset([("key", inner1)])
fs2 = frozenset([("key", inner2)])
result1 = _canonicalize(fs1)
result2 = _canonicalize(fs2)
assert result1 == result2
def test_dict_ordering(self):
"""Dicts should be sorted by key."""
d1 = {"z": 1, "a": 2, "m": 3}
d2 = {"a": 2, "m": 3, "z": 1}
result1 = _canonicalize(d1)
result2 = _canonicalize(d2)
assert result1 == result2
def test_tuple_preserved(self):
"""Tuples should be marked and preserved."""
t = (1, 2, 3)
result = _canonicalize(t)
assert result[0] == "__tuple__"
assert result[1] == [1, 2, 3]
def test_list_preserved(self):
"""Lists should be recursively canonicalized."""
lst = [{"b": 2, "a": 1}, frozenset([3, 2, 1])]
result = _canonicalize(lst)
# First element should be dict with sorted keys
assert result[0] == {"a": 1, "b": 2}
# Second element should be canonicalized frozenset
assert result[1][0] == "__frozenset__"
def test_primitives_unchanged(self):
"""Primitive types should pass through unchanged."""
assert _canonicalize(42) == 42
assert _canonicalize(3.14) == 3.14
assert _canonicalize("hello") == "hello"
assert _canonicalize(True) is True
assert _canonicalize(None) is None
def test_bytes_converted(self):
"""Bytes should be converted to hex string."""
b = b"\x00\xff"
result = _canonicalize(b)
assert result[0] == "__bytes__"
assert result[1] == "00ff"
def test_set_ordering(self):
"""Sets should be sorted like frozensets."""
s1 = {3, 1, 2}
s2 = {1, 2, 3}
result1 = _canonicalize(s1)
result2 = _canonicalize(s2)
assert result1 == result2
assert result1[0] == "__set__"
class TestSerializeCacheKey:
"""Test serialize_cache_key for deterministic hashing."""
def test_same_content_same_hash(self):
"""Same content should produce same hash."""
key1 = frozenset([("node_1", frozenset([("input", "value")]))])
key2 = frozenset([("node_1", frozenset([("input", "value")]))])
hash1 = serialize_cache_key(key1)
hash2 = serialize_cache_key(key2)
assert hash1 == hash2
def test_different_content_different_hash(self):
"""Different content should produce different hash."""
key1 = frozenset([("node_1", "value_a")])
key2 = frozenset([("node_1", "value_b")])
hash1 = serialize_cache_key(key1)
hash2 = serialize_cache_key(key2)
assert hash1 != hash2
def test_returns_bytes(self):
"""Should return bytes (SHA256 digest)."""
key = frozenset([("test", 123)])
result = serialize_cache_key(key)
assert isinstance(result, bytes)
assert len(result) == 32 # SHA256 produces 32 bytes
def test_complex_nested_structure(self):
"""Complex nested structures should hash deterministically."""
# Note: frozensets can only contain hashable types, so we use
# nested frozensets of tuples to represent dict-like structures
key = frozenset([
("node_1", frozenset([
("input_a", ("tuple", "value")),
("input_b", frozenset([("nested", "dict")])),
])),
("node_2", frozenset([
("param", 42),
])),
])
# Hash twice to verify determinism
hash1 = serialize_cache_key(key)
hash2 = serialize_cache_key(key)
assert hash1 == hash2
def test_dict_in_cache_key(self):
"""Dicts passed directly to serialize_cache_key should work."""
# This tests the _canonicalize function's ability to handle dicts
key = {"node_1": {"input": "value"}, "node_2": 42}
hash1 = serialize_cache_key(key)
hash2 = serialize_cache_key(key)
assert hash1 == hash2
assert isinstance(hash1, bytes)
assert len(hash1) == 32
class TestContainsNan:
"""Test contains_nan utility function."""
def test_nan_float_detected(self):
"""NaN floats should be detected."""
assert contains_nan(float('nan')) is True
def test_regular_float_not_nan(self):
"""Regular floats should not be detected as NaN."""
assert contains_nan(3.14) is False
assert contains_nan(0.0) is False
assert contains_nan(-1.5) is False
def test_infinity_not_nan(self):
"""Infinity is not NaN."""
assert contains_nan(float('inf')) is False
assert contains_nan(float('-inf')) is False
def test_nan_in_list(self):
"""NaN in list should be detected."""
assert contains_nan([1, 2, float('nan'), 4]) is True
assert contains_nan([1, 2, 3, 4]) is False
def test_nan_in_tuple(self):
"""NaN in tuple should be detected."""
assert contains_nan((1, float('nan'))) is True
assert contains_nan((1, 2, 3)) is False
def test_nan_in_frozenset(self):
"""NaN in frozenset should be detected."""
assert contains_nan(frozenset([1, float('nan')])) is True
assert contains_nan(frozenset([1, 2, 3])) is False
def test_nan_in_dict_value(self):
"""NaN in dict value should be detected."""
assert contains_nan({"key": float('nan')}) is True
assert contains_nan({"key": 42}) is False
def test_nan_in_nested_structure(self):
"""NaN in deeply nested structure should be detected."""
nested = {"level1": [{"level2": (1, 2, float('nan'))}]}
assert contains_nan(nested) is True
def test_non_numeric_types(self):
"""Non-numeric types should not be NaN."""
assert contains_nan("string") is False
assert contains_nan(None) is False
assert contains_nan(True) is False
class TestEstimateValueSize:
"""Test estimate_value_size utility function."""
def test_empty_outputs(self):
"""Empty outputs should have zero size."""
value = CacheValue(outputs=[])
assert estimate_value_size(value) == 0
@pytest.mark.skipif(
not _torch_available(),
reason="PyTorch not available"
)
def test_tensor_size_estimation(self):
"""Tensor size should be estimated correctly."""
import torch
# 1000 float32 elements = 4000 bytes
tensor = torch.zeros(1000, dtype=torch.float32)
value = CacheValue(outputs=[[tensor]])
size = estimate_value_size(value)
assert size == 4000
@pytest.mark.skipif(
not _torch_available(),
reason="PyTorch not available"
)
def test_nested_tensor_in_dict(self):
"""Tensors nested in dicts should be counted."""
import torch
tensor = torch.zeros(100, dtype=torch.float32) # 400 bytes
value = CacheValue(outputs=[[{"samples": tensor}]])
size = estimate_value_size(value)
assert size == 400
class TestProviderRegistry:
"""Test cache provider registration and retrieval."""
def setup_method(self):
"""Clear providers before each test."""
clear_cache_providers()
def teardown_method(self):
"""Clear providers after each test."""
clear_cache_providers()
def test_register_provider(self):
"""Provider should be registered successfully."""
provider = MockCacheProvider()
register_cache_provider(provider)
assert has_cache_providers() is True
providers = get_cache_providers()
assert len(providers) == 1
assert providers[0] is provider
def test_unregister_provider(self):
"""Provider should be unregistered successfully."""
provider = MockCacheProvider()
register_cache_provider(provider)
unregister_cache_provider(provider)
assert has_cache_providers() is False
def test_multiple_providers(self):
"""Multiple providers can be registered."""
provider1 = MockCacheProvider()
provider2 = MockCacheProvider()
register_cache_provider(provider1)
register_cache_provider(provider2)
providers = get_cache_providers()
assert len(providers) == 2
def test_duplicate_registration_ignored(self):
"""Registering same provider twice should be ignored."""
provider = MockCacheProvider()
register_cache_provider(provider)
register_cache_provider(provider) # Should be ignored
providers = get_cache_providers()
assert len(providers) == 1
def test_clear_providers(self):
"""clear_cache_providers should remove all providers."""
provider1 = MockCacheProvider()
provider2 = MockCacheProvider()
register_cache_provider(provider1)
register_cache_provider(provider2)
clear_cache_providers()
assert has_cache_providers() is False
assert len(get_cache_providers()) == 0
class TestCacheContext:
"""Test CacheContext dataclass."""
def test_context_creation(self):
"""CacheContext should be created with all fields."""
context = CacheContext(
prompt_id="prompt-123",
node_id="node-456",
class_type="KSampler",
cache_key=frozenset([("test", "value")]),
cache_key_bytes=b"hash_bytes",
)
assert context.prompt_id == "prompt-123"
assert context.node_id == "node-456"
assert context.class_type == "KSampler"
assert context.cache_key == frozenset([("test", "value")])
assert context.cache_key_bytes == b"hash_bytes"
class TestCacheValue:
"""Test CacheValue dataclass."""
def test_value_creation(self):
"""CacheValue should be created with outputs."""
outputs = [[{"samples": "tensor_data"}]]
value = CacheValue(outputs=outputs)
assert value.outputs == outputs
class MockCacheProvider(CacheProvider):
"""Mock cache provider for testing."""
def __init__(self):
self.lookups = []
self.stores = []
def on_lookup(self, context: CacheContext) -> Optional[CacheValue]:
self.lookups.append(context)
return None
def on_store(self, context: CacheContext, value: CacheValue) -> None:
self.stores.append((context, value))