From b089db79c5361f811770a05fe1c593d000bbbf37 Mon Sep 17 00:00:00 2001 From: bymyself Date: Fri, 13 Jun 2025 20:27:41 -0700 Subject: [PATCH] [fix] Restore proper thread-based TaskQueue worker management - Fix async/sync mismatch in TaskQueue worker implementation - Use threading.Thread with asyncio.run() as originally designed - Remove incorrect async task approach that caused blocking issues - TaskQueue now properly manages its own thread lifecycle - Resolves WebSocket message delivery and task processing issues --- comfyui_manager/glob/manager_server.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/comfyui_manager/glob/manager_server.py b/comfyui_manager/glob/manager_server.py index 41e2d0ac..aa6ce743 100644 --- a/comfyui_manager/glob/manager_server.py +++ b/comfyui_manager/glob/manager_server.py @@ -216,25 +216,17 @@ class TaskQueue: """Check if the queue is currently processing tasks""" return ( self._worker_task is not None - and not self._worker_task.done() - and (len(self.running_tasks) > 0 or len(self.pending_tasks) > 0) + and self._worker_task.is_alive() ) - async def start_worker(self) -> bool: + def start_worker(self) -> bool: """Start the task worker if not already running. Returns True if started, False if already running.""" - if self._worker_task is not None and not self._worker_task.done(): + if self._worker_task is not None and self._worker_task.is_alive(): return False # Already running - self._worker_task = asyncio.create_task(self._worker()) + self._worker_task = threading.Thread(target=lambda: asyncio.run(task_worker())) + self._worker_task.start() return True - - async def _worker(self): - """Internal worker that processes the task queue""" - try: - await task_worker() - finally: - # Clean up worker reference when done - self._worker_task = None def get_current_state(self) -> TaskStateMessage: return TaskStateMessage( @@ -1435,7 +1427,7 @@ async def queue_count(request): @routes.get("/v2/manager/queue/start") async def queue_start(request): # finalize_temp_queue_batch() - started = await task_queue.start_worker() + started = task_queue.start_worker() if started: return web.Response(status=200) # Started successfully