From 5ed90a1583cc95d612ed261c9ed85d396061400d Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 13:36:18 -0800 Subject: [PATCH] refactor: move all /jobs business logic to jobs.py --- comfy_execution/jobs.py | 81 ++++++++++++++++++++++++++++++++++++++++- execution.py | 74 ------------------------------------- server.py | 23 ++++++++---- 3 files changed, 94 insertions(+), 84 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index ca7401b1c..9362c7e99 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -52,7 +52,7 @@ def normalize_queue_item(item, status): """Convert queue item tuple to unified job dict.""" priority, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') - extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} + extra_pnginfo = extra_data.get('extra_pnginfo') or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') return { @@ -74,7 +74,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): prompt_tuple = history_item['prompt'] priority, _, prompt, extra_data, _ = prompt_tuple[:5] create_time = extra_data.get('create_time') - extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} + extra_pnginfo = extra_data.get('extra_pnginfo') or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') status_info = history_item.get('status', {}) @@ -178,3 +178,80 @@ def apply_sorting(jobs, sort_by, sort_order): return job.get('create_time') or 0 return sorted(jobs, key=get_sort_key, reverse=reverse) + + +def get_job(prompt_id, running, queued, history): + """ + Get a single job by prompt_id from history or queue. + + Args: + prompt_id: The prompt ID to look up + running: List of currently running queue items + queued: List of pending queue items + history: Dict of history items keyed by prompt_id + + Returns: + Job dict with full details, or None if not found + """ + if prompt_id in history: + return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) + + for item in running: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.IN_PROGRESS) + + for item in queued: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.PENDING) + + return None + + +def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): + """ + Get all jobs (running, pending, completed) with filtering and sorting. + + Args: + running: List of currently running queue items + queued: List of pending queue items + history: Dict of history items keyed by prompt_id + status_filter: List of statuses to include (from JobStatus.ALL) + sort_by: Field to sort by ('created_at', 'execution_duration') + sort_order: 'asc' or 'desc' + limit: Maximum number of items to return + offset: Number of items to skip + + Returns: + tuple: (jobs_list, total_count) + """ + jobs = [] + + if status_filter is None: + status_filter = JobStatus.ALL + + if JobStatus.IN_PROGRESS in status_filter: + for item in running: + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) + + if JobStatus.PENDING in status_filter: + for item in queued: + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) + + include_completed = JobStatus.COMPLETED in status_filter + include_failed = JobStatus.FAILED in status_filter + if include_completed or include_failed: + for prompt_id, history_item in history.items(): + is_failed = history_item.get('status', {}).get('status_str') == 'error' + if (is_failed and include_failed) or (not is_failed and include_completed): + jobs.append(normalize_history_item(prompt_id, history_item)) + + jobs = apply_sorting(jobs, sort_by, sort_order) + + total_count = len(jobs) + + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] + + return (jobs, total_count) diff --git a/execution.py b/execution.py index c552214d4..c2186ac98 100644 --- a/execution.py +++ b/execution.py @@ -33,12 +33,6 @@ from comfy_execution.graph_utils import GraphBuilder, is_link from comfy_execution.validation import validate_node_input from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler from comfy_execution.utils import CurrentNodeContext -from comfy_execution.jobs import ( - JobStatus, - normalize_queue_item, - normalize_history_item, - apply_sorting, -) from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func from comfy_api.latest import io, _io @@ -1229,74 +1223,6 @@ class PromptQueue: with self.mutex: self.history.pop(id_to_delete, None) - def get_job(self, prompt_id): - """Get a single job by prompt_id from history or queue.""" - history = self.get_history(prompt_id=prompt_id) - - if prompt_id in history: - return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) - - running, queued = self.get_current_queue_volatile() - - for item in running: - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.IN_PROGRESS) - - for item in queued: - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.PENDING) - - return None - - def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): - """ - Get all jobs (running, pending, completed) with filtering and sorting. - - Args: - status_filter: list of statuses to include (from JobStatus.ALL) - sort_by: field to sort by ('created_at', 'execution_duration') - sort_order: 'asc' or 'desc' - limit: maximum number of items to return - offset: number of items to skip - - Returns: - tuple: (jobs_list, total_count) - """ - running, queued = self.get_current_queue_volatile() - history = self.get_history() - - jobs = [] - - if status_filter is None: - status_filter = JobStatus.ALL - - if JobStatus.IN_PROGRESS in status_filter: - for item in running: - jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) - - if JobStatus.PENDING in status_filter: - for item in queued: - jobs.append(normalize_queue_item(item, JobStatus.PENDING)) - - include_completed = JobStatus.COMPLETED in status_filter - include_failed = JobStatus.FAILED in status_filter - if include_completed or include_failed: - for prompt_id, history_item in history.items(): - is_failed = history_item.get('status', {}).get('status_str') == 'error' - if (is_failed and include_failed) or (not is_failed and include_completed): - jobs.append(normalize_history_item(prompt_id, history_item)) - - jobs = apply_sorting(jobs, sort_by, sort_order) - - total_count = len(jobs) - - if offset > 0: - jobs = jobs[offset:] - if limit is not None: - jobs = jobs[:limit] - - return (jobs, total_count) - def set_flag(self, name, data): with self.mutex: self.flags[name] = data diff --git a/server.py b/server.py index 6b34d74c3..65a25fd16 100644 --- a/server.py +++ b/server.py @@ -7,7 +7,7 @@ import time import nodes import folder_paths import execution -from comfy_execution.jobs import JobStatus +from comfy_execution.jobs import JobStatus, get_job, get_all_jobs import uuid import urllib import json @@ -709,17 +709,17 @@ class PromptServer(): if not status_filter: status_filter = None - sort_by = query.get('sort', 'created_at') + sort_by = query.get('sort_by', 'created_at') if sort_by not in {'created_at', 'execution_duration'}: return web.json_response( - {"error": "sort must be 'created_at' or 'execution_duration'"}, + {"error": "sort_by must be 'created_at' or 'execution_duration'"}, status=400 ) - sort_order = query.get('order', 'desc') + sort_order = query.get('sort_order', 'desc') if sort_order not in {'asc', 'desc'}: return web.json_response( - {"error": "order must be 'asc' or 'desc'"}, + {"error": "sort_order must be 'asc' or 'desc'"}, status=400 ) @@ -750,7 +750,11 @@ class PromptServer(): status=400 ) - jobs, total = self.prompt_queue.get_all_jobs( + running, queued = self.prompt_queue.get_current_queue_volatile() + history = self.prompt_queue.get_history() + + jobs, total = get_all_jobs( + running, queued, history, status_filter=status_filter, sort_by=sort_by, sort_order=sort_order, @@ -771,7 +775,7 @@ class PromptServer(): }) @routes.get("/api/jobs/{job_id}") - async def get_job(request): + async def get_job_by_id(request): """Get a single job by ID.""" job_id = request.match_info.get("job_id", None) if not job_id: @@ -780,7 +784,10 @@ class PromptServer(): status=400 ) - job = self.prompt_queue.get_job(job_id) + running, queued = self.prompt_queue.get_current_queue_volatile() + history = self.prompt_queue.get_history(prompt_id=job_id) + + job = get_job(job_id, running, queued, history) if job is None: return web.json_response( {"error": "Job not found"},