mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-03-15 06:07:33 +08:00
fix: address review feedback on cache provider API
- Hold references to pending store tasks to prevent "Task was destroyed but it is still pending" warnings (bigcat88) - Parallel cache lookups with asyncio.gather instead of sequential awaits for better performance (bigcat88) - Delegate Caching.register/unregister_provider to existing functions in cache_provider.py instead of reimplementing (bigcat88) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0e912e54fc
commit
3891064840
@ -107,25 +107,13 @@ class ComfyAPI_latest(ComfyAPIBase):
|
|||||||
|
|
||||||
async def register_provider(self, provider: "ComfyAPI_latest.Caching.CacheProvider") -> None:
|
async def register_provider(self, provider: "ComfyAPI_latest.Caching.CacheProvider") -> None:
|
||||||
"""Register an external cache provider. Providers are called in registration order."""
|
"""Register an external cache provider. Providers are called in registration order."""
|
||||||
from comfy_execution import cache_provider as _cp
|
from comfy_execution.cache_provider import register_cache_provider
|
||||||
with _cp._providers_lock:
|
register_cache_provider(provider)
|
||||||
if provider in _cp._providers:
|
|
||||||
_cp._logger.warning(f"Provider {provider.__class__.__name__} already registered")
|
|
||||||
return
|
|
||||||
_cp._providers.append(provider)
|
|
||||||
_cp._providers_snapshot = tuple(_cp._providers)
|
|
||||||
_cp._logger.debug(f"Registered cache provider: {provider.__class__.__name__}")
|
|
||||||
|
|
||||||
async def unregister_provider(self, provider: "ComfyAPI_latest.Caching.CacheProvider") -> None:
|
async def unregister_provider(self, provider: "ComfyAPI_latest.Caching.CacheProvider") -> None:
|
||||||
"""Unregister a previously registered cache provider."""
|
"""Unregister a previously registered cache provider."""
|
||||||
from comfy_execution import cache_provider as _cp
|
from comfy_execution.cache_provider import unregister_cache_provider
|
||||||
with _cp._providers_lock:
|
unregister_cache_provider(provider)
|
||||||
try:
|
|
||||||
_cp._providers.remove(provider)
|
|
||||||
_cp._providers_snapshot = tuple(_cp._providers)
|
|
||||||
_cp._logger.debug(f"Unregistered cache provider: {provider.__class__.__name__}")
|
|
||||||
except ValueError:
|
|
||||||
_cp._logger.warning(f"Provider {provider.__class__.__name__} was not registered")
|
|
||||||
|
|
||||||
class ComfyExtension(ABC):
|
class ComfyExtension(ABC):
|
||||||
async def on_load(self) -> None:
|
async def on_load(self) -> None:
|
||||||
|
|||||||
@ -155,6 +155,7 @@ class BasicCache:
|
|||||||
self.cache_key_set: CacheKeySet
|
self.cache_key_set: CacheKeySet
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.subcaches = {}
|
self.subcaches = {}
|
||||||
|
self._pending_store_tasks: set = set()
|
||||||
|
|
||||||
async def set_prompt(self, dynprompt, node_ids, is_changed_cache):
|
async def set_prompt(self, dynprompt, node_ids, is_changed_cache):
|
||||||
self.dynprompt = dynprompt
|
self.dynprompt = dynprompt
|
||||||
@ -253,7 +254,9 @@ class BasicCache:
|
|||||||
for provider in _get_cache_providers():
|
for provider in _get_cache_providers():
|
||||||
try:
|
try:
|
||||||
if provider.should_cache(context, cache_value):
|
if provider.should_cache(context, cache_value):
|
||||||
asyncio.create_task(self._safe_provider_store(provider, context, cache_value))
|
task = asyncio.create_task(self._safe_provider_store(provider, context, cache_value))
|
||||||
|
self._pending_store_tasks.add(task)
|
||||||
|
task.add_done_callback(self._pending_store_tasks.discard)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_logger.warning(f"Cache provider {provider.__class__.__name__} error on store: {e}")
|
_logger.warning(f"Cache provider {provider.__class__.__name__} error on store: {e}")
|
||||||
|
|
||||||
|
|||||||
12
execution.py
12
execution.py
@ -726,10 +726,14 @@ class PromptExecutor:
|
|||||||
await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache)
|
await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache)
|
||||||
cache.clean_unused()
|
cache.clean_unused()
|
||||||
|
|
||||||
cached_nodes = []
|
node_ids = list(prompt.keys())
|
||||||
for node_id in prompt:
|
cache_results = await asyncio.gather(
|
||||||
if await self.caches.outputs.get(node_id) is not None:
|
*(self.caches.outputs.get(node_id) for node_id in node_ids)
|
||||||
cached_nodes.append(node_id)
|
)
|
||||||
|
cached_nodes = [
|
||||||
|
node_id for node_id, result in zip(node_ids, cache_results)
|
||||||
|
if result is not None
|
||||||
|
]
|
||||||
|
|
||||||
comfy.model_management.cleanup_models_gc()
|
comfy.model_management.cleanup_models_gc()
|
||||||
self.add_message("execution_cached",
|
self.add_message("execution_cached",
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user