""" Job utilities for the /api/jobs endpoint. Provides normalization and helper functions for job status tracking. """ class JobStatus: """Job status constants.""" PENDING = 'pending' IN_PROGRESS = 'in_progress' COMPLETED = 'completed' FAILED = 'failed' CANCELLED = 'cancelled' ALL = [PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED] # Media types that can be previewed in the frontend PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'}) # 3D file extensions for preview fallback (no dedicated media_type exists) THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'}) def is_previewable(media_type, item): """ Check if an output item is previewable. Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts Priority: 1. media_type is 'images', 'video', or 'audio' 2. format field starts with 'video/' or 'audio/' 3. filename has a 3D extension (.obj, .fbx, .gltf, .glb) """ if media_type in PREVIEWABLE_MEDIA_TYPES: return True # Check format field (MIME type) fmt = item.get('format', '') if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')): return True # Check for 3D files by extension filename = item.get('filename', '').lower() if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS): return True return False def normalize_queue_item(item, status): """Convert queue item tuple to unified job dict.""" priority, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') extra_pnginfo = extra_data.get('extra_pnginfo') or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') return { 'id': prompt_id, 'status': status, 'priority': priority, 'create_time': create_time, 'execution_error': None, 'execution_start_time': None, 'execution_end_time': None, 'outputs_count': 0, 'preview_output': None, 'workflow_id': workflow_id, } def normalize_history_item(prompt_id, history_item, include_outputs=False): """Convert history item dict to unified job dict.""" prompt_tuple = history_item['prompt'] priority, _, prompt, extra_data, _ = prompt_tuple[:5] create_time = extra_data.get('create_time') extra_pnginfo = extra_data.get('extra_pnginfo') or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') status_info = history_item.get('status', {}) status_str = status_info.get('status_str') if status_info else None if status_str == 'success': status = JobStatus.COMPLETED elif status_str == 'error': status = JobStatus.FAILED else: status = JobStatus.COMPLETED outputs = history_item.get('outputs', {}) outputs_count, preview_output = get_outputs_summary(outputs) execution_error = None execution_start_time = None execution_end_time = None if status_info: messages = status_info.get('messages', []) for entry in messages: if isinstance(entry, (list, tuple)) and len(entry) >= 2: event_name, event_data = entry[0], entry[1] if isinstance(event_data, dict): if event_name == 'execution_start': execution_start_time = event_data.get('timestamp') elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'): execution_end_time = event_data.get('timestamp') if event_name == 'execution_error': execution_error = event_data job = { 'id': prompt_id, 'status': status, 'priority': priority, 'create_time': create_time, 'execution_error': execution_error, 'execution_start_time': execution_start_time, 'execution_end_time': execution_end_time, 'outputs_count': outputs_count, 'preview_output': preview_output, 'workflow_id': workflow_id, } if include_outputs: job['outputs'] = outputs job['execution_status'] = status_info job['workflow'] = { 'prompt': prompt, 'extra_data': extra_data, } return job def get_outputs_summary(outputs): """ Count outputs and find preview in a single pass. Returns (outputs_count, preview_output). Preview priority (matching frontend): 1. type="output" with previewable media 2. Any previewable media """ count = 0 preview_output = None fallback_preview = None for node_id, node_outputs in outputs.items(): for media_type, items in node_outputs.items(): if media_type == 'animated' or not isinstance(items, list): continue for item in items: count += 1 if preview_output is None and is_previewable(media_type, item): enriched = { **item, 'nodeId': node_id, 'mediaType': media_type } if item.get('type') == 'output': preview_output = enriched elif fallback_preview is None: fallback_preview = enriched return count, preview_output or fallback_preview def apply_sorting(jobs, sort_by, sort_order): """Sort jobs list by specified field and order.""" reverse = (sort_order == 'desc') if sort_by == 'execution_duration': def get_sort_key(job): start = job.get('execution_start_time') or 0 end = job.get('execution_end_time') or 0 return end - start if end and start else 0 else: def get_sort_key(job): return job.get('create_time') or 0 return sorted(jobs, key=get_sort_key, reverse=reverse) def get_job(prompt_id, running, queued, history): """ Get a single job by prompt_id from history or queue. Args: prompt_id: The prompt ID to look up running: List of currently running queue items queued: List of pending queue items history: Dict of history items keyed by prompt_id Returns: Job dict with full details, or None if not found """ if prompt_id in history: return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) for item in running: if item[1] == prompt_id: return normalize_queue_item(item, JobStatus.IN_PROGRESS) for item in queued: if item[1] == prompt_id: return normalize_queue_item(item, JobStatus.PENDING) return None def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): """ Get all jobs (running, pending, completed) with filtering and sorting. Args: running: List of currently running queue items queued: List of pending queue items history: Dict of history items keyed by prompt_id status_filter: List of statuses to include (from JobStatus.ALL) sort_by: Field to sort by ('created_at', 'execution_duration') sort_order: 'asc' or 'desc' limit: Maximum number of items to return offset: Number of items to skip Returns: tuple: (jobs_list, total_count) """ jobs = [] if status_filter is None: status_filter = JobStatus.ALL if JobStatus.IN_PROGRESS in status_filter: for item in running: jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) if JobStatus.PENDING in status_filter: for item in queued: jobs.append(normalize_queue_item(item, JobStatus.PENDING)) include_completed = JobStatus.COMPLETED in status_filter include_failed = JobStatus.FAILED in status_filter if include_completed or include_failed: for prompt_id, history_item in history.items(): is_failed = history_item.get('status', {}).get('status_str') == 'error' if (is_failed and include_failed) or (not is_failed and include_completed): jobs.append(normalize_history_item(prompt_id, history_item)) jobs = apply_sorting(jobs, sort_by, sort_order) total_count = len(jobs) if offset > 0: jobs = jobs[offset:] if limit is not None: jobs = jobs[:limit] return (jobs, total_count)