diff --git a/execution.py b/execution.py index c2186ac98..815788ab2 100644 --- a/execution.py +++ b/execution.py @@ -1130,7 +1130,8 @@ class PromptQueue: messages: List[str] def task_done(self, item_id, history_result, - status: Optional['PromptQueue.ExecutionStatus'], process_item=None): + status: Optional['PromptQueue.ExecutionStatus'], process_item=None, + execution_time: Optional[float] = None): with self.mutex: prompt = self.currently_running.pop(item_id) if len(self.history) > MAXIMUM_HISTORY_SIZE: @@ -1147,6 +1148,7 @@ class PromptQueue: "prompt": prompt, "outputs": {}, 'status': status_dict, + 'execution_time': execution_time, } self.history[prompt[1]].update(history_result) self.server.queue_updated() @@ -1223,6 +1225,198 @@ class PromptQueue: with self.mutex: self.history.pop(id_to_delete, None) + def get_job(self, prompt_id): + """Get a single job by prompt_id from history or queue.""" + with self.mutex: + if prompt_id in self.history: + return self._normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) + + for item in self.currently_running.values(): + if item[1] == prompt_id: + return self._normalize_queue_item(item, 'in_progress') + + for item in self.queue: + if item[1] == prompt_id: + return self._normalize_queue_item(item, 'pending') + + return None + + def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): + """ + Get all jobs (running, pending, completed) with filtering and sorting. + + Args: + status_filter: list of statuses to include ['pending', 'in_progress', 'completed', 'error'] + sort_by: field to sort by ('created_at', 'execution_time') + sort_order: 'asc' or 'desc' + limit: maximum number of items to return + offset: number of items to skip + + Returns: + tuple: (jobs_list, total_count) + """ + with self.mutex: + jobs = [] + + if status_filter is None: + status_filter = ['pending', 'in_progress', 'completed', 'error'] + + if 'in_progress' in status_filter: + for item in self.currently_running.values(): + jobs.append(self._normalize_queue_item(item, 'in_progress')) + + if 'pending' in status_filter: + for item in self.queue: + jobs.append(self._normalize_queue_item(item, 'pending')) + + include_completed = 'completed' in status_filter + include_error = 'error' in status_filter + if include_completed or include_error: + for prompt_id, history_item in self.history.items(): + is_error = history_item.get('status', {}).get('status_str') == 'error' + if (is_error and include_error) or (not is_error and include_completed): + jobs.append(self._normalize_history_item(prompt_id, history_item)) + + jobs = self._apply_sorting(jobs, sort_by, sort_order) + + total_count = len(jobs) + + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] + + return (jobs, total_count) + + def _normalize_queue_item(self, item, status): + """Convert queue item tuple to unified job dict.""" + number, prompt_id, prompt, extra_data, outputs_to_execute = item[:5] + create_time = extra_data.get('create_time') + + return { + 'id': prompt_id, + 'status': status, + 'create_time': create_time, + 'execution_time': None, + 'error_message': None, + 'outputs_count': 0, + 'preview_output': None, + 'workflow_id': None, + } + + def _normalize_history_item(self, prompt_id, history_item, include_outputs=False): + """Convert history item dict to unified job dict.""" + prompt_tuple = history_item['prompt'] + number, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] + create_time = extra_data.get('create_time') + + # Determine status from history status + status_info = history_item.get('status', {}) + if status_info: + status = 'completed' if status_info.get('status_str') == 'success' else 'error' + else: + status = 'completed' + + outputs = history_item.get('outputs', {}) + + outputs_count, preview_output = self._get_outputs_summary(outputs) + + error_message = None + if status == 'error' and status_info: + messages = status_info.get('messages', []) + if messages: + error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) + + execution_time = history_item.get('execution_time') + + job = { + 'id': prompt_id, + 'status': status, + 'create_time': create_time, + 'execution_time': execution_time, + 'error_message': error_message, + 'outputs_count': outputs_count, + 'preview_output': preview_output, + 'workflow_id': None, + } + + if include_outputs: + job['outputs'] = outputs + job['prompt'] = prompt + job['extra_data'] = extra_data + job['outputs_to_execute'] = outputs_to_execute + + return job + + def _get_outputs_summary(self, outputs): + """ + Count outputs and find preview in a single pass. + Returns (outputs_count, preview_output). + + Preview priority (matching frontend): + 1. type="output" with previewable media + 2. Any previewable media + """ + count = 0 + preview_output = None + fallback_preview = None + + for node_id, node_outputs in outputs.items(): + for media_type, items in node_outputs.items(): + if media_type == 'animated' or not isinstance(items, list): + continue + for item in items: + count += 1 + + # Skip if we already have the best preview (type=output) + if preview_output is not None: + continue + + filename = item.get('filename', '').lower() + fmt = item.get('format', '') + + # Check if previewable (image/video/audio/3D) - matching frontend logic + is_previewable = ( + media_type == 'images' or + media_type == 'video' or + media_type == 'audio' or + filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images + filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video + filename.endswith(('.mp3', '.wav', '.ogg', '.flac')) or # audio + filename.endswith(('.obj', '.fbx', '.gltf', '.glb')) or # 3D + (fmt and (fmt.startswith('video/') or fmt.startswith('audio/'))) + ) + + if not is_previewable: + continue + + enriched = { + **item, + 'nodeId': node_id, + 'mediaType': media_type + } + + if item.get('type') == 'output': + preview_output = enriched + elif fallback_preview is None: + fallback_preview = enriched + + return count, preview_output or fallback_preview + + def _apply_sorting(self, jobs, sort_by, sort_order): + """Sort jobs list by specified field and order.""" + reverse = (sort_order == 'desc') + + if sort_by == 'execution_time': + def get_sort_key(job): + return job.get('execution_time') or 0 + else: + # Default to create_time + def get_sort_key(job): + return job.get('create_time') or 0 + + return sorted(jobs, key=get_sort_key, reverse=reverse) + def set_flag(self, name, data): with self.mutex: self.flags[name] = data diff --git a/main.py b/main.py index 0cd815d9e..7605f1f02 100644 --- a/main.py +++ b/main.py @@ -229,19 +229,21 @@ def prompt_worker(q, server_instance): e.execute(item[2], prompt_id, extra_data, item[4]) need_gc = True + current_time = time.perf_counter() + execution_time = current_time - execution_start_time + remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, e.history_result, status=execution.PromptQueue.ExecutionStatus( status_str='success' if e.success else 'error', completed=e.success, - messages=e.status_messages), process_item=remove_sensitive) + messages=e.status_messages), + process_item=remove_sensitive, + execution_time=execution_time) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) - current_time = time.perf_counter() - execution_time = current_time - execution_start_time - # Log Time in a more readable way after 10 minutes if execution_time > 600: execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) diff --git a/server.py b/server.py index ac4f42222..a9466c12d 100644 --- a/server.py +++ b/server.py @@ -694,6 +694,96 @@ class PromptServer(): out[node_class] = node_info(node_class) return web.json_response(out) + @routes.get("/api/jobs") + async def get_jobs(request): + """List all jobs with filtering, sorting, and pagination.""" + query = request.rel_url.query + + status_param = query.get("status", None) + status_filter = None + if status_param: + status_filter = [s.strip() for s in status_param.split(',') if s.strip()] + valid_statuses = {'pending', 'in_progress', 'completed', 'error'} + status_filter = [s for s in status_filter if s in valid_statuses] + if not status_filter: + status_filter = None + + sort_by = query.get('sort', 'created_at') + if sort_by != 'created_at': + return web.json_response( + {"error": "sort must be 'created_at'"}, + status=400 + ) + + sort_order = query.get('order', 'desc') + if sort_order not in {'asc', 'desc'}: + return web.json_response( + {"error": "order must be 'asc' or 'desc'"}, + status=400 + ) + + limit = None + if 'limit' in query: + try: + limit = int(query.get('limit')) + if limit <= 0 or limit > 500: + return web.json_response( + {"error": "limit must be between 1 and 500"}, + status=400 + ) + except (ValueError, TypeError): + return web.json_response( + {"error": "limit must be an integer"}, + status=400 + ) + + offset = 0 + if 'offset' in query: + try: + offset = int(query.get('offset')) + if offset < 0: + offset = 0 + except (ValueError, TypeError): + return web.json_response( + {"error": "offset must be an integer"}, + status=400 + ) + + jobs, total = self.prompt_queue.get_all_jobs( + status_filter=status_filter, + sort_by=sort_by, + sort_order=sort_order, + limit=limit, + offset=offset + ) + + has_more = (offset + len(jobs)) < total + + return web.json_response({ + 'jobs': jobs, + 'total': total, + 'has_more': has_more + }) + + @routes.get("/api/jobs/{job_id}") + async def get_job(request): + """Get a single job by ID.""" + job_id = request.match_info.get("job_id", None) + if not job_id: + return web.json_response( + {"error": "job_id is required"}, + status=400 + ) + + job = self.prompt_queue.get_job(job_id) + if job is None: + return web.json_response( + {"error": "Job not found"}, + status=404 + ) + + return web.json_response(job) + @routes.get("/history") async def get_history(request): max_items = request.rel_url.query.get("max_items", None) diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index ace0d2279..5316848df 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -99,6 +99,37 @@ class ComfyClient: with urllib.request.urlopen(url) as response: return json.loads(response.read()) + def get_jobs(self, status=None, limit=None, offset=None, sort=None, order=None): + url = "http://{}/api/jobs".format(self.server_address) + params = {} + if status is not None: + params["status"] = status + if limit is not None: + params["limit"] = limit + if offset is not None: + params["offset"] = offset + if sort is not None: + params["sort"] = sort + if order is not None: + params["order"] = order + + if params: + url_values = urllib.parse.urlencode(params) + url = "{}?{}".format(url, url_values) + + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + + def get_job(self, job_id): + url = "http://{}/api/jobs/{}".format(self.server_address, job_id) + try: + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + except urllib.error.HTTPError as e: + if e.code == 404: + return None + raise + def set_test_name(self, name): self.test_name = name @@ -877,3 +908,135 @@ class TestExecution: result = client.get_all_history(max_items=5, offset=len(all_history) - 1) assert len(result) <= 1, "Should return at most 1 item when offset is near end" + + # Jobs API tests + def test_jobs_api_returns_completed_jobs( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that /api/jobs returns completed jobs""" + result = self._create_history_item(client, builder) + prompt_id = result.get_prompt_id() + + jobs_response = client.get_jobs(status="completed") + assert "jobs" in jobs_response, "Response should have jobs array" + assert "total" in jobs_response, "Response should have total count" + assert "has_more" in jobs_response, "Response should have has_more flag" + + job_ids = [j["id"] for j in jobs_response["jobs"]] + assert prompt_id in job_ids, "Completed job should appear in jobs list" + + def test_jobs_api_job_structure( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that job objects have required fields""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + assert len(jobs_response["jobs"]) > 0, "Should have at least one job" + + job = jobs_response["jobs"][0] + assert "id" in job, "Job should have id" + assert "status" in job, "Job should have status" + assert "create_time" in job, "Job should have create_time" + assert "outputs_count" in job, "Job should have outputs_count" + assert "preview_output" in job, "Job should have preview_output" + assert "workflow_id" in job, "Job should have workflow_id" + assert "error_message" in job, "Job should have error_message" + + def test_jobs_api_preview_output_structure( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that preview_output has correct structure""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + job = jobs_response["jobs"][0] + + if job["preview_output"] is not None: + preview = job["preview_output"] + assert "filename" in preview, "Preview should have filename" + assert "nodeId" in preview, "Preview should have nodeId" + assert "mediaType" in preview, "Preview should have mediaType" + + def test_jobs_api_pagination( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API pagination""" + for _ in range(5): + self._create_history_item(client, builder) + + first_page = client.get_jobs(limit=2, offset=0) + second_page = client.get_jobs(limit=2, offset=2) + + assert len(first_page["jobs"]) <= 2, "First page should have at most 2 jobs" + assert len(second_page["jobs"]) <= 2, "Second page should have at most 2 jobs" + + first_ids = {j["id"] for j in first_page["jobs"]} + second_ids = {j["id"] for j in second_page["jobs"]} + assert first_ids.isdisjoint(second_ids), "Pages should have different jobs" + + def test_jobs_api_sorting( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API sorting""" + for _ in range(3): + self._create_history_item(client, builder) + + desc_jobs = client.get_jobs(order="desc") + asc_jobs = client.get_jobs(order="asc") + + if len(desc_jobs["jobs"]) >= 2: + desc_times = [j["create_time"] for j in desc_jobs["jobs"] if j["create_time"]] + asc_times = [j["create_time"] for j in asc_jobs["jobs"] if j["create_time"]] + if len(desc_times) >= 2: + assert desc_times == sorted(desc_times, reverse=True), "Desc should be newest first" + if len(asc_times) >= 2: + assert asc_times == sorted(asc_times), "Asc should be oldest first" + + def test_jobs_api_status_filter( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API status filtering""" + self._create_history_item(client, builder) + + completed_jobs = client.get_jobs(status="completed") + assert len(completed_jobs["jobs"]) > 0, "Should have completed jobs from history" + + for job in completed_jobs["jobs"]: + assert job["status"] == "completed", "Should only return completed jobs" + + # Pending jobs are transient - just verify filter doesn't error + pending_jobs = client.get_jobs(status="pending") + for job in pending_jobs["jobs"]: + assert job["status"] == "pending", "Should only return pending jobs" + + def test_get_job_by_id( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test getting a single job by ID""" + result = self._create_history_item(client, builder) + prompt_id = result.get_prompt_id() + + job = client.get_job(prompt_id) + assert job is not None, "Should find the job" + assert job["id"] == prompt_id, "Job ID should match" + assert "outputs" in job, "Single job should include outputs" + + def test_get_job_not_found( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test getting a non-existent job returns 404""" + job = client.get_job("nonexistent-job-id") + assert job is None, "Non-existent job should return None" + + def test_jobs_list_excludes_outputs( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that job list doesn't include full outputs""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + job = jobs_response["jobs"][0] + + assert "outputs" not in job, "List should not include outputs" + assert "prompt" not in job, "List should not include prompt"