From b874f469cea373dfe73b118cc8c3e294d287c9cd Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Tue, 2 Dec 2025 18:45:25 -0800 Subject: [PATCH] create jobs helper file --- comfy_execution/jobs.py | 167 ++++++++++++++++++ execution.py | 162 ++--------------- server.py | 3 +- tests/execution/test_jobs.py | 331 +++++++++++++++++++++++++++++++++++ 4 files changed, 519 insertions(+), 144 deletions(-) create mode 100644 comfy_execution/jobs.py create mode 100644 tests/execution/test_jobs.py diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py new file mode 100644 index 000000000..4f5758e07 --- /dev/null +++ b/comfy_execution/jobs.py @@ -0,0 +1,167 @@ +""" +Job utilities for the /api/jobs endpoint. +Provides normalization and helper functions for job status tracking. +""" + + +class JobStatus: + """Job status constants.""" + PENDING = 'pending' + IN_PROGRESS = 'in_progress' + COMPLETED = 'completed' + ERROR = 'error' + + ALL = [PENDING, IN_PROGRESS, COMPLETED, ERROR] + + +# Media types that can be previewed in the frontend +PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'}) + +# 3D file extensions for preview fallback (no dedicated media_type exists) +THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'}) + + +def is_previewable(media_type, item): + """ + Check if an output item is previewable. + Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts + + Priority: + 1. media_type is 'images', 'video', or 'audio' + 2. format field starts with 'video/' or 'audio/' + 3. filename has a 3D extension (.obj, .fbx, .gltf, .glb) + """ + if media_type in PREVIEWABLE_MEDIA_TYPES: + return True + + # Check format field (MIME type) + fmt = item.get('format', '') + if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')): + return True + + # Check for 3D files by extension + filename = item.get('filename', '').lower() + if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS): + return True + + return False + + +def normalize_queue_item(item, status): + """Convert queue item tuple to unified job dict.""" + priority, prompt_id, _, extra_data, _ = item[:5] + create_time = extra_data.get('create_time') + + return { + 'id': prompt_id, + 'status': status, + 'priority': priority, + 'create_time': create_time, + 'execution_time': None, + 'error_message': None, + 'outputs_count': 0, + 'preview_output': None, + 'workflow_id': None, + } + + +def normalize_history_item(prompt_id, history_item, include_outputs=False): + """Convert history item dict to unified job dict.""" + prompt_tuple = history_item['prompt'] + priority, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] + create_time = extra_data.get('create_time') + + status_info = history_item.get('status', {}) + status_str = status_info.get('status_str') if status_info else None + if status_str == 'success': + status = JobStatus.COMPLETED + elif status_str == 'error': + status = JobStatus.ERROR + else: + status = JobStatus.COMPLETED + + outputs = history_item.get('outputs', {}) + outputs_count, preview_output = get_outputs_summary(outputs) + + error_message = None + if status == JobStatus.ERROR and status_info: + messages = status_info.get('messages', []) + if messages: + error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) + + execution_time = history_item.get('execution_time') + + job = { + 'id': prompt_id, + 'status': status, + 'priority': priority, + 'create_time': create_time, + 'execution_time': execution_time, + 'error_message': error_message, + 'outputs_count': outputs_count, + 'preview_output': preview_output, + 'workflow_id': None, + } + + if include_outputs: + job['outputs'] = outputs + job['prompt'] = prompt + job['extra_data'] = extra_data + job['outputs_to_execute'] = outputs_to_execute + + return job + + +def get_outputs_summary(outputs): + """ + Count outputs and find preview in a single pass. + Returns (outputs_count, preview_output). + + Preview priority (matching frontend): + 1. type="output" with previewable media + 2. Any previewable media + """ + count = 0 + preview_output = None + fallback_preview = None + + for node_id, node_outputs in outputs.items(): + for media_type, items in node_outputs.items(): + if media_type == 'animated' or not isinstance(items, list): + continue + + for item in items: + count += 1 + + if preview_output is not None: + continue + + if not is_previewable(media_type, item): + continue + + enriched = { + **item, + 'nodeId': node_id, + 'mediaType': media_type + } + + if item.get('type') == 'output': + preview_output = enriched + elif fallback_preview is None: + fallback_preview = enriched + + return count, preview_output or fallback_preview + + +def apply_sorting(jobs, sort_by, sort_order): + """Sort jobs list by specified field and order.""" + reverse = (sort_order == 'desc') + + if sort_by == 'execution_time': + def get_sort_key(job): + return job.get('execution_time') or 0 + else: + def get_sort_key(job): + return job.get('create_time') or 0 + + return sorted(jobs, key=get_sort_key, reverse=reverse) diff --git a/execution.py b/execution.py index 42076e2c1..5fc616dfd 100644 --- a/execution.py +++ b/execution.py @@ -33,6 +33,12 @@ from comfy_execution.graph_utils import GraphBuilder, is_link from comfy_execution.validation import validate_node_input from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler from comfy_execution.utils import CurrentNodeContext +from comfy_execution.jobs import ( + JobStatus, + normalize_queue_item, + normalize_history_item, + apply_sorting, +) from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func from comfy_api.latest import io, _io @@ -1229,15 +1235,15 @@ class PromptQueue: """Get a single job by prompt_id from history or queue.""" with self.mutex: if prompt_id in self.history: - return self._normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) + return normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) for item in self.currently_running.values(): if item[1] == prompt_id: - return self._normalize_queue_item(item, 'in_progress') + return normalize_queue_item(item, JobStatus.IN_PROGRESS) for item in self.queue: if item[1] == prompt_id: - return self._normalize_queue_item(item, 'pending') + return normalize_queue_item(item, JobStatus.PENDING) return None @@ -1246,7 +1252,7 @@ class PromptQueue: Get all jobs (running, pending, completed) with filtering and sorting. Args: - status_filter: list of statuses to include ['pending', 'in_progress', 'completed', 'error'] + status_filter: list of statuses to include (from JobStatus.ALL) sort_by: field to sort by ('created_at', 'execution_time') sort_order: 'asc' or 'desc' limit: maximum number of items to return @@ -1259,25 +1265,25 @@ class PromptQueue: jobs = [] if status_filter is None: - status_filter = ['pending', 'in_progress', 'completed', 'error'] + status_filter = JobStatus.ALL - if 'in_progress' in status_filter: + if JobStatus.IN_PROGRESS in status_filter: for item in self.currently_running.values(): - jobs.append(self._normalize_queue_item(item, 'in_progress')) + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) - if 'pending' in status_filter: + if JobStatus.PENDING in status_filter: for item in self.queue: - jobs.append(self._normalize_queue_item(item, 'pending')) + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) - include_completed = 'completed' in status_filter - include_error = 'error' in status_filter + include_completed = JobStatus.COMPLETED in status_filter + include_error = JobStatus.ERROR in status_filter if include_completed or include_error: for prompt_id, history_item in self.history.items(): is_error = history_item.get('status', {}).get('status_str') == 'error' if (is_error and include_error) or (not is_error and include_completed): - jobs.append(self._normalize_history_item(prompt_id, history_item)) + jobs.append(normalize_history_item(prompt_id, history_item)) - jobs = self._apply_sorting(jobs, sort_by, sort_order) + jobs = apply_sorting(jobs, sort_by, sort_order) total_count = len(jobs) @@ -1288,136 +1294,6 @@ class PromptQueue: return (jobs, total_count) - def _normalize_queue_item(self, item, status): - """Convert queue item tuple to unified job dict.""" - priority, prompt_id, _, extra_data, _ = item[:5] - create_time = extra_data.get('create_time') - - return { - 'id': prompt_id, - 'status': status, - 'create_time': create_time, - 'priority': priority, - 'execution_time': None, - 'error_message': None, - 'outputs_count': 0, - 'preview_output': None, - 'workflow_id': None, - } - - def _normalize_history_item(self, prompt_id, history_item, include_outputs=False): - """Convert history item dict to unified job dict.""" - prompt_tuple = history_item['prompt'] - _, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] - create_time = extra_data.get('create_time') - - # Determine status from history status - status_info = history_item.get('status', {}) - if status_info: - status = 'completed' if status_info.get('status_str') == 'success' else 'error' - else: - status = 'completed' - - outputs = history_item.get('outputs', {}) - - outputs_count, preview_output = self._get_outputs_summary(outputs) - - error_message = None - if status == 'error' and status_info: - messages = status_info.get('messages', []) - if messages: - error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) - - execution_time = history_item.get('execution_time') - - job = { - 'id': prompt_id, - 'status': status, - 'create_time': create_time, - 'execution_time': execution_time, - 'error_message': error_message, - 'outputs_count': outputs_count, - 'preview_output': preview_output, - 'workflow_id': None, - } - - if include_outputs: - job['outputs'] = outputs - job['prompt'] = prompt - job['extra_data'] = extra_data - job['outputs_to_execute'] = outputs_to_execute - - return job - - def _get_outputs_summary(self, outputs): - """ - Count outputs and find preview in a single pass. - Returns (outputs_count, preview_output). - - Preview priority (matching frontend): - 1. type="output" with previewable media - 2. Any previewable media - """ - count = 0 - preview_output = None - fallback_preview = None - - for node_id, node_outputs in outputs.items(): - for media_type, items in node_outputs.items(): - if media_type == 'animated' or not isinstance(items, list): - continue - for item in items: - count += 1 - - # Skip if we already have the best preview (type=output) - if preview_output is not None: - continue - - filename = item.get('filename', '').lower() - fmt = item.get('format', '') - - # Check if previewable (image/video/audio/3D) - matching frontend logic - is_previewable = ( - media_type == 'images' or - media_type == 'video' or - media_type == 'audio' or - filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images - filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video - filename.endswith(('.mp3', '.wav', '.ogg', '.flac')) or # audio - filename.endswith(('.obj', '.fbx', '.gltf', '.glb')) or # 3D - (fmt and (fmt.startswith('video/') or fmt.startswith('audio/'))) - ) - - if not is_previewable: - continue - - enriched = { - **item, - 'nodeId': node_id, - 'mediaType': media_type - } - - if item.get('type') == 'output': - preview_output = enriched - elif fallback_preview is None: - fallback_preview = enriched - - return count, preview_output or fallback_preview - - def _apply_sorting(self, jobs, sort_by, sort_order): - """Sort jobs list by specified field and order.""" - reverse = (sort_order == 'desc') - - if sort_by == 'execution_time': - def get_sort_key(job): - return job.get('execution_time') or 0 - else: - # Default to create_time - def get_sort_key(job): - return job.get('create_time') or 0 - - return sorted(jobs, key=get_sort_key, reverse=reverse) - def set_flag(self, name, data): with self.mutex: self.flags[name] = data diff --git a/server.py b/server.py index a9466c12d..0e0cc0daa 100644 --- a/server.py +++ b/server.py @@ -7,6 +7,7 @@ import time import nodes import folder_paths import execution +from comfy_execution.jobs import JobStatus import uuid import urllib import json @@ -703,7 +704,7 @@ class PromptServer(): status_filter = None if status_param: status_filter = [s.strip() for s in status_param.split(',') if s.strip()] - valid_statuses = {'pending', 'in_progress', 'completed', 'error'} + valid_statuses = set(JobStatus.ALL) status_filter = [s for s in status_filter if s in valid_statuses] if not status_filter: status_filter = None diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py new file mode 100644 index 000000000..07f882cac --- /dev/null +++ b/tests/execution/test_jobs.py @@ -0,0 +1,331 @@ +"""Unit tests for comfy_execution/jobs.py""" +import pytest +from pytest import fixture +from unittest.mock import MagicMock + +from comfy_execution.jobs import ( + JobStatus, + PREVIEWABLE_MEDIA_TYPES, + THREE_D_EXTENSIONS, + is_previewable, + normalize_queue_item, + normalize_history_item, + get_outputs_summary, + apply_sorting, +) + + +class TestJobStatus: + """Test JobStatus constants.""" + + def test_status_values(self): + """Status constants should have expected string values.""" + assert JobStatus.PENDING == 'pending' + assert JobStatus.IN_PROGRESS == 'in_progress' + assert JobStatus.COMPLETED == 'completed' + assert JobStatus.ERROR == 'error' + + def test_all_contains_all_statuses(self): + """ALL should contain all status values.""" + assert JobStatus.PENDING in JobStatus.ALL + assert JobStatus.IN_PROGRESS in JobStatus.ALL + assert JobStatus.COMPLETED in JobStatus.ALL + assert JobStatus.ERROR in JobStatus.ALL + assert len(JobStatus.ALL) == 4 + + +class TestIsPreviewable: + """Unit tests for is_previewable()""" + + def test_previewable_media_types(self): + """Images, video, audio media types should be previewable.""" + for media_type in ['images', 'video', 'audio']: + assert is_previewable(media_type, {}) is True + + def test_non_previewable_media_types(self): + """Other media types should not be previewable.""" + for media_type in ['latents', 'text', 'metadata', 'files']: + assert is_previewable(media_type, {}) is False + + def test_3d_extensions_previewable(self): + """3D file extensions should be previewable regardless of media_type.""" + for ext in ['.obj', '.fbx', '.gltf', '.glb']: + item = {'filename': f'model{ext}'} + assert is_previewable('files', item) is True + + def test_3d_extensions_case_insensitive(self): + """3D extension check should be case insensitive.""" + item = {'filename': 'MODEL.GLB'} + assert is_previewable('files', item) is True + + def test_video_format_previewable(self): + """Items with video/ format should be previewable.""" + item = {'format': 'video/mp4'} + assert is_previewable('files', item) is True + + def test_audio_format_previewable(self): + """Items with audio/ format should be previewable.""" + item = {'format': 'audio/wav'} + assert is_previewable('files', item) is True + + def test_other_format_not_previewable(self): + """Items with other format should not be previewable.""" + item = {'format': 'application/json'} + assert is_previewable('files', item) is False + + +class TestGetOutputsSummary: + """Unit tests for get_outputs_summary()""" + + def test_empty_outputs(self): + """Empty outputs should return 0 count and None preview.""" + count, preview = get_outputs_summary({}) + assert count == 0 + assert preview is None + + def test_counts_across_multiple_nodes(self): + """Outputs from multiple nodes should all be counted.""" + outputs = { + 'node1': {'images': [{'filename': 'a.png', 'type': 'output'}]}, + 'node2': {'images': [{'filename': 'b.png', 'type': 'output'}]}, + 'node3': {'images': [ + {'filename': 'c.png', 'type': 'output'}, + {'filename': 'd.png', 'type': 'output'} + ]} + } + count, preview = get_outputs_summary(outputs) + assert count == 4 + + def test_skips_animated_key_and_non_list_values(self): + """The 'animated' key and non-list values should be skipped.""" + outputs = { + 'node1': { + 'images': [{'filename': 'test.png', 'type': 'output'}], + 'animated': [True], # Should skip due to key name + 'metadata': 'string', # Should skip due to non-list + 'count': 42 # Should skip due to non-list + } + } + count, preview = get_outputs_summary(outputs) + assert count == 1 + + def test_preview_prefers_type_output(self): + """Items with type='output' should be preferred for preview.""" + outputs = { + 'node1': { + 'images': [ + {'filename': 'temp.png', 'type': 'temp'}, + {'filename': 'output.png', 'type': 'output'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert count == 2 + assert preview['filename'] == 'output.png' + + def test_preview_fallback_when_no_output_type(self): + """If no type='output', should use first previewable.""" + outputs = { + 'node1': { + 'images': [ + {'filename': 'temp1.png', 'type': 'temp'}, + {'filename': 'temp2.png', 'type': 'temp'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert preview['filename'] == 'temp1.png' + + def test_non_previewable_media_types_counted_but_no_preview(self): + """Non-previewable media types should be counted but not used as preview.""" + outputs = { + 'node1': { + 'latents': [ + {'filename': 'latent1.safetensors'}, + {'filename': 'latent2.safetensors'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert count == 2 + assert preview is None + + def test_previewable_media_types(self): + """Images, video, and audio media types should be previewable.""" + for media_type in ['images', 'video', 'audio']: + outputs = { + 'node1': { + media_type: [{'filename': 'test.file', 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"{media_type} should be previewable" + + def test_3d_files_previewable(self): + """3D file extensions should be previewable.""" + for ext in ['.obj', '.fbx', '.gltf', '.glb']: + outputs = { + 'node1': { + 'files': [{'filename': f'model{ext}', 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"3D file {ext} should be previewable" + + def test_format_mime_type_previewable(self): + """Files with video/ or audio/ format should be previewable.""" + for fmt in ['video/x-custom', 'audio/x-custom']: + outputs = { + 'node1': { + 'files': [{'filename': 'file.custom', 'format': fmt, 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"Format {fmt} should be previewable" + + def test_preview_enriched_with_node_metadata(self): + """Preview should include nodeId, mediaType, and original fields.""" + outputs = { + 'node123': { + 'images': [{'filename': 'test.png', 'type': 'output', 'subfolder': 'outputs'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview['nodeId'] == 'node123' + assert preview['mediaType'] == 'images' + assert preview['subfolder'] == 'outputs' + + +class TestApplySorting: + """Unit tests for apply_sorting()""" + + def test_sort_by_create_time_desc(self): + """Default sort by create_time descending.""" + jobs = [ + {'id': 'a', 'create_time': 100}, + {'id': 'b', 'create_time': 300}, + {'id': 'c', 'create_time': 200}, + ] + result = apply_sorting(jobs, 'created_at', 'desc') + assert [j['id'] for j in result] == ['b', 'c', 'a'] + + def test_sort_by_create_time_asc(self): + """Sort by create_time ascending.""" + jobs = [ + {'id': 'a', 'create_time': 100}, + {'id': 'b', 'create_time': 300}, + {'id': 'c', 'create_time': 200}, + ] + result = apply_sorting(jobs, 'created_at', 'asc') + assert [j['id'] for j in result] == ['a', 'c', 'b'] + + def test_sort_by_execution_time(self): + """Sort by execution_time should order by duration.""" + jobs = [ + {'id': 'a', 'create_time': 100, 'execution_time': 5.0}, + {'id': 'b', 'create_time': 300, 'execution_time': 1.0}, + {'id': 'c', 'create_time': 200, 'execution_time': 3.0}, + ] + result = apply_sorting(jobs, 'execution_time', 'desc') + assert [j['id'] for j in result] == ['a', 'c', 'b'] + + def test_sort_with_none_values(self): + """Jobs with None values should sort as 0.""" + jobs = [ + {'id': 'a', 'create_time': 100, 'execution_time': 5.0}, + {'id': 'b', 'create_time': 300, 'execution_time': None}, + {'id': 'c', 'create_time': 200, 'execution_time': 3.0}, + ] + result = apply_sorting(jobs, 'execution_time', 'asc') + assert result[0]['id'] == 'b' # None treated as 0, comes first + + +class TestNormalizeQueueItem: + """Unit tests for normalize_queue_item()""" + + def test_basic_normalization(self): + """Queue item should be normalized to job dict.""" + item = ( + 10, # priority/number + 'prompt-123', # prompt_id + {'nodes': {}}, # prompt + {'create_time': 1234567890}, # extra_data + ['node1'], # outputs_to_execute + ) + job = normalize_queue_item(item, JobStatus.PENDING) + + assert job['id'] == 'prompt-123' + assert job['status'] == 'pending' + assert job['priority'] == 10 + assert job['create_time'] == 1234567890 + assert job['execution_time'] is None + assert job['error_message'] is None + assert job['outputs_count'] == 0 + + +class TestNormalizeHistoryItem: + """Unit tests for normalize_history_item()""" + + def test_completed_job(self): + """Completed history item should have correct status.""" + history_item = { + 'prompt': ( + 5, # priority + 'prompt-456', + {'nodes': {}}, + {'create_time': 1234567890}, + ['node1'], + ), + 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'outputs': {}, + 'execution_time': 2.5, + } + job = normalize_history_item('prompt-456', history_item) + + assert job['id'] == 'prompt-456' + assert job['status'] == 'completed' + assert job['execution_time'] == 2.5 + + def test_error_job(self): + """Error history item should have error status and message.""" + history_item = { + 'prompt': ( + 5, + 'prompt-789', + {'nodes': {}}, + {'create_time': 1234567890}, + ['node1'], + ), + 'status': { + 'status_str': 'error', + 'completed': False, + 'messages': ['Node failed: OutOfMemory'] + }, + 'outputs': {}, + 'execution_time': 1.0, + } + job = normalize_history_item('prompt-789', history_item) + + assert job['status'] == 'error' + assert job['error_message'] == 'Node failed: OutOfMemory' + + def test_include_outputs(self): + """When include_outputs=True, should include full output data.""" + history_item = { + 'prompt': ( + 5, + 'prompt-123', + {'nodes': {'1': {}}}, + {'create_time': 1234567890, 'client_id': 'abc'}, + ['node1'], + ), + 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, + 'execution_time': 2.5, + } + job = normalize_history_item('prompt-123', history_item, include_outputs=True) + + assert 'outputs' in job + assert 'prompt' in job + assert 'extra_data' in job + assert job['outputs'] == {'node1': {'images': [{'filename': 'test.png'}]}}