Surface node scheduling errors instead of crashing the worker

A node whose FUNCTION points at a method that does not exist (e.g. a typo
in a custom node) raised an AttributeError inside the scheduling heuristic
(ux_friendly_pick_node -> is_async). That exception escaped
stage_node_execution() and the prompt worker's error handling, silently
killing the worker thread with nothing reported to the client.

- is_async() now treats a node whose FUNCTION does not resolve to a method
  as non-async, so scheduling proceeds and the missing-method error is
  raised and reported through the normal execution path.
- stage_node_execution() wraps node picking so any unexpected scheduling
  error is returned as an execution error (attributed to an available
  node) rather than propagating and killing the worker thread.

Add regression tests covering both paths.
This commit is contained in:
Wei Hai 2026-06-26 15:26:41 -07:00
parent 7cb784e0f4
commit 91f3c0c4d9
2 changed files with 99 additions and 2 deletions

View File

@ -3,6 +3,7 @@ from typing import Type, Literal
import nodes
import asyncio
import inspect
import traceback
from comfy_execution.graph_utils import is_link, ExecutionBlocker
from comfy.comfy_types.node_typing import ComfyNodeABC, InputTypeDict, InputTypeOptions
@ -263,7 +264,25 @@ class ExecutionList(TopologicalSort):
}
return None, error_details, ex
self.staged_node_id = self.ux_friendly_pick_node(available)
try:
self.staged_node_id = self.ux_friendly_pick_node(available)
except Exception as ex:
# Picking a node is a scheduling heuristic that inspects node
# definitions; a malformed custom node must not crash the prompt
# worker thread silently. Blame an available node and surface the
# error to the frontend like any other execution error.
blamed_node = self.dynprompt.get_display_node_id(available[0])
exception_type = type(ex).__qualname__
if type(ex).__module__ != "builtins":
exception_type = type(ex).__module__ + "." + exception_type
error_details = {
"node_id": blamed_node,
"exception_message": str(ex),
"exception_type": exception_type,
"traceback": traceback.format_tb(ex.__traceback__),
"current_inputs": []
}
return None, error_details, ex
return self.staged_node_id, None, None
def ux_friendly_pick_node(self, node_list):
@ -283,7 +302,17 @@ class ExecutionList(TopologicalSort):
def is_async(node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
return inspect.iscoroutinefunction(getattr(class_def, class_def.FUNCTION))
# A malformed node (e.g. FUNCTION pointing at a method that does not
# exist because of a typo) must not crash scheduling here. Treat it as
# non-async so it proceeds to normal execution, where the missing
# method raises an error that is caught and reported to the frontend.
function_name = getattr(class_def, "FUNCTION", None)
if function_name is None:
return False
func = getattr(class_def, function_name, None)
if func is None:
return False
return inspect.iscoroutinefunction(func)
for node_id in node_list:
if is_output(node_id) or is_async(node_id):

View File

@ -0,0 +1,68 @@
"""Regression tests for scheduler resilience to malformed nodes.
A node whose FUNCTION points at a method that does not exist (e.g. a typo in a
custom node) used to raise inside the scheduling heuristic, escaping the prompt
worker's error handling and silently killing the worker thread. Scheduling must
instead either proceed (so the error surfaces through normal execution) or report
the failure as an execution error.
"""
import asyncio
import nodes
from comfy_execution.graph import DynamicPrompt, ExecutionList
class _MalformedV1Node:
@classmethod
def INPUT_TYPES(cls):
return {"required": {}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "invert" # the actual method below is misspelled
OUTPUT_NODE = True
CATEGORY = "Test"
def invvert(self):
return (None,)
class _FakeOutputCache:
def all_node_ids(self):
return set()
async def get(self, node_id):
return None
def _make_execution_list(class_type, class_def):
nodes.NODE_CLASS_MAPPINGS[class_type] = class_def
prompt = {"1": {"class_type": class_type, "inputs": {}}}
execution_list = ExecutionList(DynamicPrompt(prompt), _FakeOutputCache())
execution_list.add_node("1")
return execution_list
def test_malformed_function_does_not_crash_scheduler():
"""A FUNCTION-typo node schedules without raising; the error surfaces later."""
execution_list = _make_execution_list("MalformedV1Node", _MalformedV1Node)
node_id, error, ex = asyncio.run(execution_list.stage_node_execution())
assert ex is None
assert error is None
assert node_id == "1"
def test_pick_node_failure_is_reported_not_raised():
"""An unexpected scheduling error is returned as an error, not raised."""
execution_list = _make_execution_list("MalformedV1Node", _MalformedV1Node)
def raise_on_pick(_available):
raise RuntimeError("boom")
execution_list.ux_friendly_pick_node = raise_on_pick
node_id, error, ex = asyncio.run(execution_list.stage_node_execution())
assert node_id is None
assert isinstance(ex, RuntimeError)
assert error["node_id"] == "1"
assert error["exception_type"] == "RuntimeError"
assert error["exception_message"] == "boom"
assert error["traceback"]