add clearer typing - remove get("..") or ..

This commit is contained in:
Richard Yu 2025-12-11 16:23:46 -08:00
parent 1f7c1a954e
commit ed618992bc

View File

@ -3,6 +3,8 @@ Job utilities for the /api/jobs endpoint.
Provides normalization and helper functions for job status tracking.
"""
from typing import Optional
from comfy_api.internal import prune_dict
@ -23,7 +25,19 @@ PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'})
THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'})
def is_previewable(media_type, item):
def _extract_job_metadata(extra_data: dict) -> tuple[Optional[int], Optional[str]]:
"""Extract create_time and workflow_id from extra_data.
Returns:
tuple: (create_time, workflow_id)
"""
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo', {})
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
return create_time, workflow_id
def is_previewable(media_type: str, item: dict) -> bool:
"""
Check if an output item is previewable.
Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts
@ -49,12 +63,10 @@ def is_previewable(media_type, item):
return False
def normalize_queue_item(item, status):
def normalize_queue_item(item: tuple, status: str) -> dict:
"""Convert queue item tuple to unified job dict."""
priority, prompt_id, _, extra_data, _ = item[:5]
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
create_time, workflow_id = _extract_job_metadata(extra_data)
return prune_dict({
'id': prompt_id,
@ -66,13 +78,11 @@ def normalize_queue_item(item, status):
})
def normalize_history_item(prompt_id, history_item, include_outputs=False):
def normalize_history_item(prompt_id: str, history_item: dict, include_outputs: bool = False) -> dict:
"""Convert history item dict to unified job dict."""
prompt_tuple = history_item['prompt']
priority, _, prompt, extra_data, _ = prompt_tuple[:5]
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
create_time, workflow_id = _extract_job_metadata(extra_data)
status_info = history_item.get('status', {})
status_str = status_info.get('status_str') if status_info else None
@ -83,7 +93,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
else:
status = JobStatus.COMPLETED
outputs = history_item.get('outputs') or {}
outputs = history_item.get('outputs', {})
outputs_count, preview_output = get_outputs_summary(outputs)
execution_error = None
@ -126,7 +136,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
return job
def get_outputs_summary(outputs):
def get_outputs_summary(outputs: dict) -> tuple[int, Optional[dict]]:
"""
Count outputs and find preview in a single pass.
Returns (outputs_count, preview_output).
@ -165,23 +175,23 @@ def get_outputs_summary(outputs):
return count, preview_output or fallback_preview
def apply_sorting(jobs, sort_by, sort_order):
def apply_sorting(jobs: list[dict], sort_by: str, sort_order: str) -> list[dict]:
"""Sort jobs list by specified field and order."""
reverse = (sort_order == 'desc')
if sort_by == 'execution_duration':
def get_sort_key(job):
start = job.get('execution_start_time') or 0
end = job.get('execution_end_time') or 0
start = job.get('execution_start_time', 0)
end = job.get('execution_end_time', 0)
return end - start if end and start else 0
else:
def get_sort_key(job):
return job.get('create_time') or 0
return job.get('create_time', 0)
return sorted(jobs, key=get_sort_key, reverse=reverse)
def get_job(prompt_id, running, queued, history):
def get_job(prompt_id: str, running: list, queued: list, history: dict) -> Optional[dict]:
"""
Get a single job by prompt_id from history or queue.
@ -208,7 +218,17 @@ def get_job(prompt_id, running, queued, history):
return None
def get_all_jobs(running, queued, history, status_filter=None, workflow_id=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
def get_all_jobs(
running: list,
queued: list,
history: dict,
status_filter: Optional[list[str]] = None,
workflow_id: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
limit: Optional[int] = None,
offset: int = 0
) -> tuple[list[dict], int]:
"""
Get all jobs (running, pending, completed) with filtering and sorting.