mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-11 14:50:49 +08:00
- --panics-when=torch.cuda.OutOfMemory will now correctly panic and exit the worker, giving it time to reply that the execution failed and better dealing with irrecoverable out-of-memory errors - --executor-factory=ProcessPoolExecutor will use a process instead of a thread to execute comfyui workflows when using the worker. When this process panics and exits, it will be correctly replaced, making a more robust worker
87 lines
2.9 KiB
Python
87 lines
2.9 KiB
Python
from __future__ import annotations
|
|
|
|
from contextlib import contextmanager
|
|
from contextvars import ContextVar
|
|
from dataclasses import dataclass, replace
|
|
from typing import Optional, Final
|
|
|
|
from .component_model import cvpickle
|
|
from .component_model.executor_types import ExecutorToClientProgress
|
|
from .component_model.folder_path_types import FolderNames
|
|
from .distributed.server_stub import ServerStub
|
|
from .nodes.package_typing import ExportedNodes, exported_nodes_view
|
|
|
|
comfyui_execution_context: Final[ContextVar] = ContextVar("comfyui_execution_context")
|
|
# enables context var propagation across process boundaries for process pool executors
|
|
cvpickle.register_contextvar(comfyui_execution_context, __name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ExecutionContext:
|
|
server: ExecutorToClientProgress
|
|
folder_names_and_paths: FolderNames
|
|
custom_nodes: ExportedNodes
|
|
node_id: Optional[str] = None
|
|
task_id: Optional[str] = None
|
|
inference_mode: bool = True
|
|
|
|
|
|
comfyui_execution_context.set(ExecutionContext(server=ServerStub(), folder_names_and_paths=FolderNames(is_root=True), custom_nodes=ExportedNodes()))
|
|
|
|
|
|
def current_execution_context() -> ExecutionContext:
|
|
return comfyui_execution_context.get()
|
|
|
|
|
|
@contextmanager
|
|
def _new_execution_context(ctx: ExecutionContext):
|
|
token = comfyui_execution_context.set(ctx)
|
|
try:
|
|
yield ctx
|
|
finally:
|
|
comfyui_execution_context.reset(token)
|
|
|
|
|
|
@contextmanager
|
|
def context_folder_names_and_paths(folder_names_and_paths: FolderNames):
|
|
current_ctx = current_execution_context()
|
|
new_ctx = replace(current_ctx, folder_names_and_paths=folder_names_and_paths)
|
|
with _new_execution_context(new_ctx):
|
|
yield new_ctx
|
|
|
|
|
|
@contextmanager
|
|
def context_execute_prompt(server: ExecutorToClientProgress, prompt_id: str, inference_mode: bool = True):
|
|
current_ctx = current_execution_context()
|
|
new_ctx = replace(current_ctx, server=server, task_id=prompt_id, inference_mode=inference_mode)
|
|
with _new_execution_context(new_ctx):
|
|
yield new_ctx
|
|
|
|
|
|
@contextmanager
|
|
def context_execute_node(node_id: str):
|
|
current_ctx = current_execution_context()
|
|
new_ctx = replace(current_ctx, node_id=node_id)
|
|
with _new_execution_context(new_ctx):
|
|
yield new_ctx
|
|
|
|
|
|
@contextmanager
|
|
def context_add_custom_nodes(exported_nodes: ExportedNodes):
|
|
"""
|
|
Adds custom nodes to the execution context
|
|
:param exported_nodes: an object that represents a gathering of custom node export symbols
|
|
:return: a context
|
|
"""
|
|
current_ctx = current_execution_context()
|
|
if len(exported_nodes) == 0:
|
|
yield current_ctx
|
|
|
|
if len(current_ctx.custom_nodes) == 0:
|
|
merged_custom_nodes = exported_nodes
|
|
else:
|
|
merged_custom_nodes = exported_nodes_view(current_ctx.custom_nodes, exported_nodes)
|
|
|
|
new_ctx = replace(current_ctx, custom_nodes=merged_custom_nodes)
|
|
with _new_execution_context(new_ctx):
|
|
yield new_ctx |