From c8a1d2ea0ddabe94cec3814ce7f07bb2f2bcaf15 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 11 Dec 2025 19:36:24 -0800 Subject: [PATCH] refactor query params to top get_job(s) doc, add remove_sensitive_from_queue --- comfy_execution/jobs.py | 14 ++++++++++---- server.py | 43 ++++++++++++++++++++++++++++++----------- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index 43602d0de..3e94660d0 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -64,8 +64,11 @@ def is_previewable(media_type: str, item: dict) -> bool: def normalize_queue_item(item: tuple, status: str) -> dict: - """Convert queue item tuple to unified job dict.""" - priority, prompt_id, _, extra_data, _ = item[:5] + """Convert queue item tuple to unified job dict. + + Expects item with sensitive data already removed (5 elements). + """ + priority, prompt_id, _, extra_data, _ = item create_time, workflow_id = _extract_job_metadata(extra_data) return prune_dict({ @@ -79,9 +82,12 @@ def normalize_queue_item(item: tuple, status: str) -> dict: def normalize_history_item(prompt_id: str, history_item: dict, include_outputs: bool = False) -> dict: - """Convert history item dict to unified job dict.""" + """Convert history item dict to unified job dict. + + History items have sensitive data already removed (prompt tuple has 5 elements). + """ prompt_tuple = history_item['prompt'] - priority, _, prompt, extra_data, _ = prompt_tuple[:5] + priority, _, prompt, extra_data, _ = prompt_tuple create_time, workflow_id = _extract_job_metadata(extra_data) status_info = history_item.get('status', {}) diff --git a/server.py b/server.py index e4fe2e091..c27f8be7d 100644 --- a/server.py +++ b/server.py @@ -48,6 +48,12 @@ from middleware.cache_middleware import cache_control if args.enable_manager: import comfyui_manager + +def _remove_sensitive_from_queue(queue: list) -> list: + """Remove sensitive data (index 5) from queue item tuples.""" + return [item[:5] for item in queue] + + async def send_socket_catch_exception(function, message): try: await function(message) @@ -697,29 +703,39 @@ class PromptServer(): @routes.get("/api/jobs") async def get_jobs(request): - """List all jobs with filtering, sorting, and pagination.""" + """List all jobs with filtering, sorting, and pagination. + + Query parameters: + status: Filter by status (comma-separated): pending, in_progress, completed, failed + workflow_id: Filter by workflow ID + sort_by: Sort field: created_at (default), execution_duration + sort_order: Sort direction: asc, desc (default) + limit: Max items to return (positive integer) + offset: Items to skip (non-negative integer, default 0) + """ query = request.rel_url.query - status_param = query.get("status", None) + status_param = query.get('status') + workflow_id = query.get('workflow_id') + sort_by = query.get('sort_by', 'created_at').lower() + sort_order = query.get('sort_order', 'desc').lower() + status_filter = None if status_param: status_filter = [s.strip().lower() for s in status_param.split(',') if s.strip()] - valid_statuses = set(JobStatus.ALL) - invalid_statuses = [s for s in status_filter if s not in valid_statuses] + invalid_statuses = [s for s in status_filter if s not in JobStatus.ALL] if invalid_statuses: return web.json_response( {"error": f"Invalid status value(s): {', '.join(invalid_statuses)}. Valid values: {', '.join(JobStatus.ALL)}"}, status=400 ) - sort_by = query.get('sort_by', 'created_at').lower() if sort_by not in {'created_at', 'execution_duration'}: return web.json_response( {"error": "sort_by must be 'created_at' or 'execution_duration'"}, status=400 ) - sort_order = query.get('sort_order', 'desc').lower() if sort_order not in {'asc', 'desc'}: return web.json_response( {"error": "sort_order must be 'asc' or 'desc'"}, @@ -727,6 +743,8 @@ class PromptServer(): ) limit = None + + # If limit is provided, validate that it is a positive integer, else continue without a limit if 'limit' in query: try: limit = int(query.get('limit')) @@ -753,11 +771,12 @@ class PromptServer(): status=400 ) - workflow_id = query.get('workflow_id', None) - running, queued = self.prompt_queue.get_current_queue_volatile() history = self.prompt_queue.get_history() + running = _remove_sensitive_from_queue(running) + queued = _remove_sensitive_from_queue(queued) + jobs, total = get_all_jobs( running, queued, history, status_filter=status_filter, @@ -793,6 +812,9 @@ class PromptServer(): running, queued = self.prompt_queue.get_current_queue_volatile() history = self.prompt_queue.get_history(prompt_id=job_id) + running = _remove_sensitive_from_queue(running) + queued = _remove_sensitive_from_queue(queued) + job = get_job(job_id, running, queued, history) if job is None: return web.json_response( @@ -825,9 +847,8 @@ class PromptServer(): async def get_queue(request): queue_info = {} current_queue = self.prompt_queue.get_current_queue_volatile() - remove_sensitive = lambda queue: [x[:5] for x in queue] - queue_info['queue_running'] = remove_sensitive(current_queue[0]) - queue_info['queue_pending'] = remove_sensitive(current_queue[1]) + queue_info['queue_running'] = _remove_sensitive_from_queue(current_queue[0]) + queue_info['queue_pending'] = _remove_sensitive_from_queue(current_queue[1]) return web.json_response(queue_info) @routes.post("/prompt")