refactor: move all /jobs business logic to jobs.py

This commit is contained in:
Richard Yu 2025-12-04 13:36:18 -08:00
parent c860cc612b
commit 5ed90a1583
3 changed files with 94 additions and 84 deletions

View File

@ -52,7 +52,7 @@ def normalize_queue_item(item, status):
"""Convert queue item tuple to unified job dict.""" """Convert queue item tuple to unified job dict."""
priority, prompt_id, _, extra_data, _ = item[:5] priority, prompt_id, _, extra_data, _ = item[:5]
create_time = extra_data.get('create_time') create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id') workflow_id = extra_pnginfo.get('workflow', {}).get('id')
return { return {
@ -74,7 +74,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
prompt_tuple = history_item['prompt'] prompt_tuple = history_item['prompt']
priority, _, prompt, extra_data, _ = prompt_tuple[:5] priority, _, prompt, extra_data, _ = prompt_tuple[:5]
create_time = extra_data.get('create_time') create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id') workflow_id = extra_pnginfo.get('workflow', {}).get('id')
status_info = history_item.get('status', {}) status_info = history_item.get('status', {})
@ -178,3 +178,80 @@ def apply_sorting(jobs, sort_by, sort_order):
return job.get('create_time') or 0 return job.get('create_time') or 0
return sorted(jobs, key=get_sort_key, reverse=reverse) return sorted(jobs, key=get_sort_key, reverse=reverse)
def get_job(prompt_id, running, queued, history):
"""
Get a single job by prompt_id from history or queue.
Args:
prompt_id: The prompt ID to look up
running: List of currently running queue items
queued: List of pending queue items
history: Dict of history items keyed by prompt_id
Returns:
Job dict with full details, or None if not found
"""
if prompt_id in history:
return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True)
for item in running:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.IN_PROGRESS)
for item in queued:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.PENDING)
return None
def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
"""
Get all jobs (running, pending, completed) with filtering and sorting.
Args:
running: List of currently running queue items
queued: List of pending queue items
history: Dict of history items keyed by prompt_id
status_filter: List of statuses to include (from JobStatus.ALL)
sort_by: Field to sort by ('created_at', 'execution_duration')
sort_order: 'asc' or 'desc'
limit: Maximum number of items to return
offset: Number of items to skip
Returns:
tuple: (jobs_list, total_count)
"""
jobs = []
if status_filter is None:
status_filter = JobStatus.ALL
if JobStatus.IN_PROGRESS in status_filter:
for item in running:
jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS))
if JobStatus.PENDING in status_filter:
for item in queued:
jobs.append(normalize_queue_item(item, JobStatus.PENDING))
include_completed = JobStatus.COMPLETED in status_filter
include_failed = JobStatus.FAILED in status_filter
if include_completed or include_failed:
for prompt_id, history_item in history.items():
is_failed = history_item.get('status', {}).get('status_str') == 'error'
if (is_failed and include_failed) or (not is_failed and include_completed):
jobs.append(normalize_history_item(prompt_id, history_item))
jobs = apply_sorting(jobs, sort_by, sort_order)
total_count = len(jobs)
if offset > 0:
jobs = jobs[offset:]
if limit is not None:
jobs = jobs[:limit]
return (jobs, total_count)

View File

@ -33,12 +33,6 @@ from comfy_execution.graph_utils import GraphBuilder, is_link
from comfy_execution.validation import validate_node_input from comfy_execution.validation import validate_node_input
from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler
from comfy_execution.utils import CurrentNodeContext from comfy_execution.utils import CurrentNodeContext
from comfy_execution.jobs import (
JobStatus,
normalize_queue_item,
normalize_history_item,
apply_sorting,
)
from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func
from comfy_api.latest import io, _io from comfy_api.latest import io, _io
@ -1229,74 +1223,6 @@ class PromptQueue:
with self.mutex: with self.mutex:
self.history.pop(id_to_delete, None) self.history.pop(id_to_delete, None)
def get_job(self, prompt_id):
"""Get a single job by prompt_id from history or queue."""
history = self.get_history(prompt_id=prompt_id)
if prompt_id in history:
return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True)
running, queued = self.get_current_queue_volatile()
for item in running:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.IN_PROGRESS)
for item in queued:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.PENDING)
return None
def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
"""
Get all jobs (running, pending, completed) with filtering and sorting.
Args:
status_filter: list of statuses to include (from JobStatus.ALL)
sort_by: field to sort by ('created_at', 'execution_duration')
sort_order: 'asc' or 'desc'
limit: maximum number of items to return
offset: number of items to skip
Returns:
tuple: (jobs_list, total_count)
"""
running, queued = self.get_current_queue_volatile()
history = self.get_history()
jobs = []
if status_filter is None:
status_filter = JobStatus.ALL
if JobStatus.IN_PROGRESS in status_filter:
for item in running:
jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS))
if JobStatus.PENDING in status_filter:
for item in queued:
jobs.append(normalize_queue_item(item, JobStatus.PENDING))
include_completed = JobStatus.COMPLETED in status_filter
include_failed = JobStatus.FAILED in status_filter
if include_completed or include_failed:
for prompt_id, history_item in history.items():
is_failed = history_item.get('status', {}).get('status_str') == 'error'
if (is_failed and include_failed) or (not is_failed and include_completed):
jobs.append(normalize_history_item(prompt_id, history_item))
jobs = apply_sorting(jobs, sort_by, sort_order)
total_count = len(jobs)
if offset > 0:
jobs = jobs[offset:]
if limit is not None:
jobs = jobs[:limit]
return (jobs, total_count)
def set_flag(self, name, data): def set_flag(self, name, data):
with self.mutex: with self.mutex:
self.flags[name] = data self.flags[name] = data

View File

@ -7,7 +7,7 @@ import time
import nodes import nodes
import folder_paths import folder_paths
import execution import execution
from comfy_execution.jobs import JobStatus from comfy_execution.jobs import JobStatus, get_job, get_all_jobs
import uuid import uuid
import urllib import urllib
import json import json
@ -709,17 +709,17 @@ class PromptServer():
if not status_filter: if not status_filter:
status_filter = None status_filter = None
sort_by = query.get('sort', 'created_at') sort_by = query.get('sort_by', 'created_at')
if sort_by not in {'created_at', 'execution_duration'}: if sort_by not in {'created_at', 'execution_duration'}:
return web.json_response( return web.json_response(
{"error": "sort must be 'created_at' or 'execution_duration'"}, {"error": "sort_by must be 'created_at' or 'execution_duration'"},
status=400 status=400
) )
sort_order = query.get('order', 'desc') sort_order = query.get('sort_order', 'desc')
if sort_order not in {'asc', 'desc'}: if sort_order not in {'asc', 'desc'}:
return web.json_response( return web.json_response(
{"error": "order must be 'asc' or 'desc'"}, {"error": "sort_order must be 'asc' or 'desc'"},
status=400 status=400
) )
@ -750,7 +750,11 @@ class PromptServer():
status=400 status=400
) )
jobs, total = self.prompt_queue.get_all_jobs( running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history()
jobs, total = get_all_jobs(
running, queued, history,
status_filter=status_filter, status_filter=status_filter,
sort_by=sort_by, sort_by=sort_by,
sort_order=sort_order, sort_order=sort_order,
@ -771,7 +775,7 @@ class PromptServer():
}) })
@routes.get("/api/jobs/{job_id}") @routes.get("/api/jobs/{job_id}")
async def get_job(request): async def get_job_by_id(request):
"""Get a single job by ID.""" """Get a single job by ID."""
job_id = request.match_info.get("job_id", None) job_id = request.match_info.get("job_id", None)
if not job_id: if not job_id:
@ -780,7 +784,10 @@ class PromptServer():
status=400 status=400
) )
job = self.prompt_queue.get_job(job_id) running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history(prompt_id=job_id)
job = get_job(job_id, running, queued, history)
if job is None: if job is None:
return web.json_response( return web.json_response(
{"error": "Job not found"}, {"error": "Job not found"},