Capture workflow_id before execute() to fix terminal 'executing' reset

The previous commit cleared server.last_workflow_id in PromptExecutor's
finally block to prevent racing progress callbacks from leaking stale
ids. That fix introduced a new bug: prompt_worker in main.py reads
server.last_workflow_id *after* e.execute() returns to populate the
terminal 'executing' (node=None) message that tells the client a prompt
is done. Because the executor has already cleared the field by then,
this message always emitted workflow_id=None.

Capture the workflow id from extra_data into a local variable before
calling e.execute(), and use that local in the send_sync() call so the
per-prompt id is preserved through the executor's finally cleanup.

Adds an AST-based regression test asserting that prompt_worker extracts
workflow_id locally and that the terminal 'executing' send_sync never
reads server.last_workflow_id. Verified the test fails on the buggy
revision and passes after the fix.
This commit is contained in:
Glary-Bot 2026-05-12 08:24:47 +00:00
parent 1f0b13705d
commit 7ca4457fd1
2 changed files with 74 additions and 1 deletions

View File

@ -21,6 +21,7 @@ import logging
import sys
from comfy_execution.progress import get_progress_state
from comfy_execution.utils import get_executing_context
from comfy_execution.jobs import extract_workflow_id
from comfy_api import feature_flags
from app.database.db import init_db, dependencies_available
@ -309,6 +310,12 @@ def prompt_worker(q, server_instance):
for k in sensitive:
extra_data[k] = sensitive[k]
# Capture the workflow id for this prompt before execution: the
# executor clears server.last_workflow_id in its finally block, so
# reading it after e.execute() returns would emit workflow_id=None
# on the terminal "executing" reset below.
workflow_id = extract_workflow_id(extra_data)
asset_seeder.pause()
e.execute(item[2], prompt_id, extra_data, item[4])
@ -322,7 +329,7 @@ def prompt_worker(q, server_instance):
completed=e.success,
messages=e.status_messages), process_item=remove_sensitive)
if server_instance.client_id is not None:
server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id, "workflow_id": getattr(server_instance, 'last_workflow_id', None)}, server_instance.client_id)
server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id, "workflow_id": workflow_id}, server_instance.client_id)
current_time = time.perf_counter()
execution_time = current_time - execution_start_time

View File

@ -219,3 +219,69 @@ class TestPreviewImageMetadataPayload:
_, metadata = payload
assert metadata["prompt_id"] == "p1"
assert metadata["workflow_id"] == "wf-1"
class TestTerminalExecutingResetInMainPy:
"""Regression test for the main.py prompt_worker terminal 'executing' reset.
The executor clears server.last_workflow_id in its finally block, so
main.py must capture the workflow id *before* calling e.execute() and use
that local value, not read server.last_workflow_id afterwards.
Rather than importing main.py (which triggers torch CUDA init in this
environment), we statically assert the contract via AST: somewhere
between the `extra_data = item[3].copy()` line and the
`e.execute(item[2], ...)` call, the function must extract workflow_id
from extra_data into a local, and the subsequent send_sync("executing",
...) must reference that local rather than server.last_workflow_id.
"""
def test_terminal_executing_uses_locally_captured_workflow_id(self):
import ast
from pathlib import Path
source = Path("main.py").read_text()
tree = ast.parse(source)
worker = next(
(
n
for n in ast.walk(tree)
if isinstance(n, ast.FunctionDef) and n.name == "prompt_worker"
),
None,
)
assert worker is not None, "prompt_worker function not found in main.py"
worker_src = ast.get_source_segment(source, worker) or ""
assert "extract_workflow_id(extra_data)" in worker_src, (
"main.py:prompt_worker must capture workflow_id locally from extra_data "
"before calling e.execute() (the executor clears server.last_workflow_id "
"in finally)."
)
for node in ast.walk(worker):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (
isinstance(func, ast.Attribute)
and func.attr == "send_sync"
and node.args
and isinstance(node.args[0], ast.Constant)
and node.args[0].value == "executing"
and len(node.args) >= 2
and isinstance(node.args[1], ast.Dict)
):
continue
payload = node.args[1]
for key, value in zip(payload.keys, payload.values):
if isinstance(key, ast.Constant) and key.value == "workflow_id":
rendered = ast.unparse(value)
assert "last_workflow_id" not in rendered, (
"main.py terminal 'executing' must not read "
"server.last_workflow_id; the executor clears it in its "
"finally block. Use a locally captured workflow_id instead."
)