ComfyUI/comfy/execution_context.py
doctorpangloss 3ddec8ae90 Better support for process pool executors
- --panics-when=torch.cuda.OutOfMemory will now correctly panic and
   exit the worker, giving it time to reply that the execution failed
   and better dealing with irrecoverable out-of-memory errors
 - --executor-factory=ProcessPoolExecutor will use a process instead of
   a thread to execute comfyui workflows when using the worker. When
   this process panics and exits, it will be correctly replaced, making
   a more robust worker
2025-02-18 14:37:20 -08:00

87 lines
2.9 KiB
Python

from __future__ import annotations
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, replace
from typing import Optional, Final
from .component_model import cvpickle
from .component_model.executor_types import ExecutorToClientProgress
from .component_model.folder_path_types import FolderNames
from .distributed.server_stub import ServerStub
from .nodes.package_typing import ExportedNodes, exported_nodes_view
comfyui_execution_context: Final[ContextVar] = ContextVar("comfyui_execution_context")
# enables context var propagation across process boundaries for process pool executors
cvpickle.register_contextvar(comfyui_execution_context, __name__)
@dataclass(frozen=True)
class ExecutionContext:
server: ExecutorToClientProgress
folder_names_and_paths: FolderNames
custom_nodes: ExportedNodes
node_id: Optional[str] = None
task_id: Optional[str] = None
inference_mode: bool = True
comfyui_execution_context.set(ExecutionContext(server=ServerStub(), folder_names_and_paths=FolderNames(is_root=True), custom_nodes=ExportedNodes()))
def current_execution_context() -> ExecutionContext:
return comfyui_execution_context.get()
@contextmanager
def _new_execution_context(ctx: ExecutionContext):
token = comfyui_execution_context.set(ctx)
try:
yield ctx
finally:
comfyui_execution_context.reset(token)
@contextmanager
def context_folder_names_and_paths(folder_names_and_paths: FolderNames):
current_ctx = current_execution_context()
new_ctx = replace(current_ctx, folder_names_and_paths=folder_names_and_paths)
with _new_execution_context(new_ctx):
yield new_ctx
@contextmanager
def context_execute_prompt(server: ExecutorToClientProgress, prompt_id: str, inference_mode: bool = True):
current_ctx = current_execution_context()
new_ctx = replace(current_ctx, server=server, task_id=prompt_id, inference_mode=inference_mode)
with _new_execution_context(new_ctx):
yield new_ctx
@contextmanager
def context_execute_node(node_id: str):
current_ctx = current_execution_context()
new_ctx = replace(current_ctx, node_id=node_id)
with _new_execution_context(new_ctx):
yield new_ctx
@contextmanager
def context_add_custom_nodes(exported_nodes: ExportedNodes):
"""
Adds custom nodes to the execution context
:param exported_nodes: an object that represents a gathering of custom node export symbols
:return: a context
"""
current_ctx = current_execution_context()
if len(exported_nodes) == 0:
yield current_ctx
if len(current_ctx.custom_nodes) == 0:
merged_custom_nodes = exported_nodes
else:
merged_custom_nodes = exported_nodes_view(current_ctx.custom_nodes, exported_nodes)
new_ctx = replace(current_ctx, custom_nodes=merged_custom_nodes)
with _new_execution_context(new_ctx):
yield new_ctx