mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-19 19:00:21 +08:00
Compare commits
2 Commits
1f7c1a954e
...
c8a1d2ea0d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8a1d2ea0d | ||
|
|
ed618992bc |
@ -3,6 +3,8 @@ Job utilities for the /api/jobs endpoint.
|
||||
Provides normalization and helper functions for job status tracking.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from comfy_api.internal import prune_dict
|
||||
|
||||
|
||||
@ -23,7 +25,19 @@ PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'})
|
||||
THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'})
|
||||
|
||||
|
||||
def is_previewable(media_type, item):
|
||||
def _extract_job_metadata(extra_data: dict) -> tuple[Optional[int], Optional[str]]:
|
||||
"""Extract create_time and workflow_id from extra_data.
|
||||
|
||||
Returns:
|
||||
tuple: (create_time, workflow_id)
|
||||
"""
|
||||
create_time = extra_data.get('create_time')
|
||||
extra_pnginfo = extra_data.get('extra_pnginfo', {})
|
||||
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
|
||||
return create_time, workflow_id
|
||||
|
||||
|
||||
def is_previewable(media_type: str, item: dict) -> bool:
|
||||
"""
|
||||
Check if an output item is previewable.
|
||||
Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts
|
||||
@ -49,12 +63,13 @@ def is_previewable(media_type, item):
|
||||
return False
|
||||
|
||||
|
||||
def normalize_queue_item(item, status):
|
||||
"""Convert queue item tuple to unified job dict."""
|
||||
priority, prompt_id, _, extra_data, _ = item[:5]
|
||||
create_time = extra_data.get('create_time')
|
||||
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
|
||||
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
|
||||
def normalize_queue_item(item: tuple, status: str) -> dict:
|
||||
"""Convert queue item tuple to unified job dict.
|
||||
|
||||
Expects item with sensitive data already removed (5 elements).
|
||||
"""
|
||||
priority, prompt_id, _, extra_data, _ = item
|
||||
create_time, workflow_id = _extract_job_metadata(extra_data)
|
||||
|
||||
return prune_dict({
|
||||
'id': prompt_id,
|
||||
@ -66,13 +81,14 @@ def normalize_queue_item(item, status):
|
||||
})
|
||||
|
||||
|
||||
def normalize_history_item(prompt_id, history_item, include_outputs=False):
|
||||
"""Convert history item dict to unified job dict."""
|
||||
def normalize_history_item(prompt_id: str, history_item: dict, include_outputs: bool = False) -> dict:
|
||||
"""Convert history item dict to unified job dict.
|
||||
|
||||
History items have sensitive data already removed (prompt tuple has 5 elements).
|
||||
"""
|
||||
prompt_tuple = history_item['prompt']
|
||||
priority, _, prompt, extra_data, _ = prompt_tuple[:5]
|
||||
create_time = extra_data.get('create_time')
|
||||
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
|
||||
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
|
||||
priority, _, prompt, extra_data, _ = prompt_tuple
|
||||
create_time, workflow_id = _extract_job_metadata(extra_data)
|
||||
|
||||
status_info = history_item.get('status', {})
|
||||
status_str = status_info.get('status_str') if status_info else None
|
||||
@ -83,7 +99,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
|
||||
else:
|
||||
status = JobStatus.COMPLETED
|
||||
|
||||
outputs = history_item.get('outputs') or {}
|
||||
outputs = history_item.get('outputs', {})
|
||||
outputs_count, preview_output = get_outputs_summary(outputs)
|
||||
|
||||
execution_error = None
|
||||
@ -126,7 +142,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
|
||||
return job
|
||||
|
||||
|
||||
def get_outputs_summary(outputs):
|
||||
def get_outputs_summary(outputs: dict) -> tuple[int, Optional[dict]]:
|
||||
"""
|
||||
Count outputs and find preview in a single pass.
|
||||
Returns (outputs_count, preview_output).
|
||||
@ -165,23 +181,23 @@ def get_outputs_summary(outputs):
|
||||
return count, preview_output or fallback_preview
|
||||
|
||||
|
||||
def apply_sorting(jobs, sort_by, sort_order):
|
||||
def apply_sorting(jobs: list[dict], sort_by: str, sort_order: str) -> list[dict]:
|
||||
"""Sort jobs list by specified field and order."""
|
||||
reverse = (sort_order == 'desc')
|
||||
|
||||
if sort_by == 'execution_duration':
|
||||
def get_sort_key(job):
|
||||
start = job.get('execution_start_time') or 0
|
||||
end = job.get('execution_end_time') or 0
|
||||
start = job.get('execution_start_time', 0)
|
||||
end = job.get('execution_end_time', 0)
|
||||
return end - start if end and start else 0
|
||||
else:
|
||||
def get_sort_key(job):
|
||||
return job.get('create_time') or 0
|
||||
return job.get('create_time', 0)
|
||||
|
||||
return sorted(jobs, key=get_sort_key, reverse=reverse)
|
||||
|
||||
|
||||
def get_job(prompt_id, running, queued, history):
|
||||
def get_job(prompt_id: str, running: list, queued: list, history: dict) -> Optional[dict]:
|
||||
"""
|
||||
Get a single job by prompt_id from history or queue.
|
||||
|
||||
@ -208,7 +224,17 @@ def get_job(prompt_id, running, queued, history):
|
||||
return None
|
||||
|
||||
|
||||
def get_all_jobs(running, queued, history, status_filter=None, workflow_id=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
|
||||
def get_all_jobs(
|
||||
running: list,
|
||||
queued: list,
|
||||
history: dict,
|
||||
status_filter: Optional[list[str]] = None,
|
||||
workflow_id: Optional[str] = None,
|
||||
sort_by: str = "created_at",
|
||||
sort_order: str = "desc",
|
||||
limit: Optional[int] = None,
|
||||
offset: int = 0
|
||||
) -> tuple[list[dict], int]:
|
||||
"""
|
||||
Get all jobs (running, pending, completed) with filtering and sorting.
|
||||
|
||||
|
||||
43
server.py
43
server.py
@ -48,6 +48,12 @@ from middleware.cache_middleware import cache_control
|
||||
if args.enable_manager:
|
||||
import comfyui_manager
|
||||
|
||||
|
||||
def _remove_sensitive_from_queue(queue: list) -> list:
|
||||
"""Remove sensitive data (index 5) from queue item tuples."""
|
||||
return [item[:5] for item in queue]
|
||||
|
||||
|
||||
async def send_socket_catch_exception(function, message):
|
||||
try:
|
||||
await function(message)
|
||||
@ -697,29 +703,39 @@ class PromptServer():
|
||||
|
||||
@routes.get("/api/jobs")
|
||||
async def get_jobs(request):
|
||||
"""List all jobs with filtering, sorting, and pagination."""
|
||||
"""List all jobs with filtering, sorting, and pagination.
|
||||
|
||||
Query parameters:
|
||||
status: Filter by status (comma-separated): pending, in_progress, completed, failed
|
||||
workflow_id: Filter by workflow ID
|
||||
sort_by: Sort field: created_at (default), execution_duration
|
||||
sort_order: Sort direction: asc, desc (default)
|
||||
limit: Max items to return (positive integer)
|
||||
offset: Items to skip (non-negative integer, default 0)
|
||||
"""
|
||||
query = request.rel_url.query
|
||||
|
||||
status_param = query.get("status", None)
|
||||
status_param = query.get('status')
|
||||
workflow_id = query.get('workflow_id')
|
||||
sort_by = query.get('sort_by', 'created_at').lower()
|
||||
sort_order = query.get('sort_order', 'desc').lower()
|
||||
|
||||
status_filter = None
|
||||
if status_param:
|
||||
status_filter = [s.strip().lower() for s in status_param.split(',') if s.strip()]
|
||||
valid_statuses = set(JobStatus.ALL)
|
||||
invalid_statuses = [s for s in status_filter if s not in valid_statuses]
|
||||
invalid_statuses = [s for s in status_filter if s not in JobStatus.ALL]
|
||||
if invalid_statuses:
|
||||
return web.json_response(
|
||||
{"error": f"Invalid status value(s): {', '.join(invalid_statuses)}. Valid values: {', '.join(JobStatus.ALL)}"},
|
||||
status=400
|
||||
)
|
||||
|
||||
sort_by = query.get('sort_by', 'created_at').lower()
|
||||
if sort_by not in {'created_at', 'execution_duration'}:
|
||||
return web.json_response(
|
||||
{"error": "sort_by must be 'created_at' or 'execution_duration'"},
|
||||
status=400
|
||||
)
|
||||
|
||||
sort_order = query.get('sort_order', 'desc').lower()
|
||||
if sort_order not in {'asc', 'desc'}:
|
||||
return web.json_response(
|
||||
{"error": "sort_order must be 'asc' or 'desc'"},
|
||||
@ -727,6 +743,8 @@ class PromptServer():
|
||||
)
|
||||
|
||||
limit = None
|
||||
|
||||
# If limit is provided, validate that it is a positive integer, else continue without a limit
|
||||
if 'limit' in query:
|
||||
try:
|
||||
limit = int(query.get('limit'))
|
||||
@ -753,11 +771,12 @@ class PromptServer():
|
||||
status=400
|
||||
)
|
||||
|
||||
workflow_id = query.get('workflow_id', None)
|
||||
|
||||
running, queued = self.prompt_queue.get_current_queue_volatile()
|
||||
history = self.prompt_queue.get_history()
|
||||
|
||||
running = _remove_sensitive_from_queue(running)
|
||||
queued = _remove_sensitive_from_queue(queued)
|
||||
|
||||
jobs, total = get_all_jobs(
|
||||
running, queued, history,
|
||||
status_filter=status_filter,
|
||||
@ -793,6 +812,9 @@ class PromptServer():
|
||||
running, queued = self.prompt_queue.get_current_queue_volatile()
|
||||
history = self.prompt_queue.get_history(prompt_id=job_id)
|
||||
|
||||
running = _remove_sensitive_from_queue(running)
|
||||
queued = _remove_sensitive_from_queue(queued)
|
||||
|
||||
job = get_job(job_id, running, queued, history)
|
||||
if job is None:
|
||||
return web.json_response(
|
||||
@ -825,9 +847,8 @@ class PromptServer():
|
||||
async def get_queue(request):
|
||||
queue_info = {}
|
||||
current_queue = self.prompt_queue.get_current_queue_volatile()
|
||||
remove_sensitive = lambda queue: [x[:5] for x in queue]
|
||||
queue_info['queue_running'] = remove_sensitive(current_queue[0])
|
||||
queue_info['queue_pending'] = remove_sensitive(current_queue[1])
|
||||
queue_info['queue_running'] = _remove_sensitive_from_queue(current_queue[0])
|
||||
queue_info['queue_pending'] = _remove_sensitive_from_queue(current_queue[1])
|
||||
return web.json_response(queue_info)
|
||||
|
||||
@routes.post("/prompt")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user