[fix] Restore proper thread-based TaskQueue worker management

- Fix async/sync mismatch in TaskQueue worker implementation
- Use threading.Thread with asyncio.run() as originally designed
- Remove incorrect async task approach that caused blocking issues
- TaskQueue now properly manages its own thread lifecycle
- Resolves WebSocket message delivery and task processing issues
This commit is contained in:
bymyself 2025-06-13 20:27:41 -07:00
parent 7a73f5db73
commit b089db79c5

View File

@ -216,26 +216,18 @@ class TaskQueue:
"""Check if the queue is currently processing tasks""" """Check if the queue is currently processing tasks"""
return ( return (
self._worker_task is not None self._worker_task is not None
and not self._worker_task.done() and self._worker_task.is_alive()
and (len(self.running_tasks) > 0 or len(self.pending_tasks) > 0)
) )
async def start_worker(self) -> bool: def start_worker(self) -> bool:
"""Start the task worker if not already running. Returns True if started, False if already running.""" """Start the task worker if not already running. Returns True if started, False if already running."""
if self._worker_task is not None and not self._worker_task.done(): if self._worker_task is not None and self._worker_task.is_alive():
return False # Already running return False # Already running
self._worker_task = asyncio.create_task(self._worker()) self._worker_task = threading.Thread(target=lambda: asyncio.run(task_worker()))
self._worker_task.start()
return True return True
async def _worker(self):
"""Internal worker that processes the task queue"""
try:
await task_worker()
finally:
# Clean up worker reference when done
self._worker_task = None
def get_current_state(self) -> TaskStateMessage: def get_current_state(self) -> TaskStateMessage:
return TaskStateMessage( return TaskStateMessage(
history=self.get_history(), history=self.get_history(),
@ -1435,7 +1427,7 @@ async def queue_count(request):
@routes.get("/v2/manager/queue/start") @routes.get("/v2/manager/queue/start")
async def queue_start(request): async def queue_start(request):
# finalize_temp_queue_batch() # finalize_temp_queue_batch()
started = await task_queue.start_worker() started = task_queue.start_worker()
if started: if started:
return web.Response(status=200) # Started successfully return web.Response(status=200) # Started successfully