From 90fb5cc1e86eadc6cbb2f0295d351acb628902a5 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 20:03:38 -0800 Subject: [PATCH] re-use queue and history functions --- comfy_execution/jobs.py | 8 ++-- execution.py | 79 +++++++++++++++++++----------------- main.py | 12 +++--- server.py | 4 +- tests/execution/test_jobs.py | 14 +++---- 5 files changed, 61 insertions(+), 56 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index 95bfb1469..01da70517 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -97,11 +97,11 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): execution_error = detail break - execution_time_duration = history_item.get('execution_time') + execution_duration = history_item.get('execution_duration') execution_start_time = None execution_end_time = None - if execution_time_duration is not None and create_time is not None: - execution_end_time = create_time + int(execution_time_duration * 1000) + if execution_duration is not None and create_time is not None: + execution_end_time = create_time + int(execution_duration * 1000) execution_start_time = create_time job = { @@ -167,7 +167,7 @@ def apply_sorting(jobs, sort_by, sort_order): """Sort jobs list by specified field and order.""" reverse = (sort_order == 'desc') - if sort_by == 'execution_time': + if sort_by == 'execution_duration': def get_sort_key(job): start = job.get('execution_start_time') or 0 end = job.get('execution_end_time') or 0 diff --git a/execution.py b/execution.py index 558fb8b5e..99c07bfec 100644 --- a/execution.py +++ b/execution.py @@ -1137,7 +1137,7 @@ class PromptQueue: def task_done(self, item_id, history_result, status: Optional['PromptQueue.ExecutionStatus'], process_item=None, - execution_time: Optional[float] = None): + execution_duration: Optional[float] = None): with self.mutex: prompt = self.currently_running.pop(item_id) if len(self.history) > MAXIMUM_HISTORY_SIZE: @@ -1154,7 +1154,7 @@ class PromptQueue: "prompt": prompt, "outputs": {}, 'status': status_dict, - 'execution_time': execution_time, + 'execution_duration': execution_duration, } self.history[prompt[1]].update(history_result) self.server.queue_updated() @@ -1233,19 +1233,22 @@ class PromptQueue: def get_job(self, prompt_id): """Get a single job by prompt_id from history or queue.""" - with self.mutex: - if prompt_id in self.history: - return normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) + history = self.get_history(prompt_id=prompt_id) - for item in self.currently_running.values(): - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.IN_PROGRESS) + if prompt_id in history: + return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) - for item in self.queue: - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.PENDING) + running, queued = self.get_current_queue_volatile() - return None + for item in running: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.IN_PROGRESS) + + for item in queued: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.PENDING) + + return None def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): """ @@ -1253,7 +1256,7 @@ class PromptQueue: Args: status_filter: list of statuses to include (from JobStatus.ALL) - sort_by: field to sort by ('created_at', 'execution_time') + sort_by: field to sort by ('created_at', 'execution_duration') sort_order: 'asc' or 'desc' limit: maximum number of items to return offset: number of items to skip @@ -1261,38 +1264,40 @@ class PromptQueue: Returns: tuple: (jobs_list, total_count) """ - with self.mutex: - jobs = [] + running, queued = self.get_current_queue_volatile() + history = self.get_history() - if status_filter is None: - status_filter = JobStatus.ALL + jobs = [] - if JobStatus.IN_PROGRESS in status_filter: - for item in self.currently_running.values(): - jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) + if status_filter is None: + status_filter = JobStatus.ALL - if JobStatus.PENDING in status_filter: - for item in self.queue: - jobs.append(normalize_queue_item(item, JobStatus.PENDING)) + if JobStatus.IN_PROGRESS in status_filter: + for item in running: + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) - include_completed = JobStatus.COMPLETED in status_filter - include_failed = JobStatus.FAILED in status_filter - if include_completed or include_failed: - for prompt_id, history_item in self.history.items(): - is_failed = history_item.get('status', {}).get('status_str') == 'error' - if (is_failed and include_failed) or (not is_failed and include_completed): - jobs.append(normalize_history_item(prompt_id, history_item)) + if JobStatus.PENDING in status_filter: + for item in queued: + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) - jobs = apply_sorting(jobs, sort_by, sort_order) + include_completed = JobStatus.COMPLETED in status_filter + include_failed = JobStatus.FAILED in status_filter + if include_completed or include_failed: + for prompt_id, history_item in history.items(): + is_failed = history_item.get('status', {}).get('status_str') == 'error' + if (is_failed and include_failed) or (not is_failed and include_completed): + jobs.append(normalize_history_item(prompt_id, history_item)) - total_count = len(jobs) + jobs = apply_sorting(jobs, sort_by, sort_order) - if offset > 0: - jobs = jobs[offset:] - if limit is not None: - jobs = jobs[:limit] + total_count = len(jobs) - return (jobs, total_count) + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] + + return (jobs, total_count) def set_flag(self, name, data): with self.mutex: diff --git a/main.py b/main.py index 7605f1f02..d2e3682fa 100644 --- a/main.py +++ b/main.py @@ -230,7 +230,7 @@ def prompt_worker(q, server_instance): need_gc = True current_time = time.perf_counter() - execution_time = current_time - execution_start_time + execution_duration = current_time - execution_start_time remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, @@ -240,16 +240,16 @@ def prompt_worker(q, server_instance): completed=e.success, messages=e.status_messages), process_item=remove_sensitive, - execution_time=execution_time) + execution_duration=execution_duration) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) # Log Time in a more readable way after 10 minutes - if execution_time > 600: - execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) - logging.info(f"Prompt executed in {execution_time}") + if execution_duration > 600: + execution_duration_str = time.strftime("%H:%M:%S", time.gmtime(execution_duration)) + logging.info(f"Prompt executed in {execution_duration_str}") else: - logging.info("Prompt executed in {:.2f} seconds".format(execution_time)) + logging.info("Prompt executed in {:.2f} seconds".format(execution_duration)) flags = q.get_flags() free_memory = flags.get("free_memory", False) diff --git a/server.py b/server.py index 6e0b7f31a..f72142a80 100644 --- a/server.py +++ b/server.py @@ -727,9 +727,9 @@ class PromptServer(): if 'limit' in query: try: limit = int(query.get('limit')) - if limit <= 0 or limit > 500: + if limit <= 0: return web.json_response( - {"error": "limit must be between 1 and 500"}, + {"error": "limit must be a positive integer"}, status=400 ) except (ValueError, TypeError): diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 479169643..c08e07b2b 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -216,14 +216,14 @@ class TestApplySorting: result = apply_sorting(jobs, 'created_at', 'asc') assert [j['id'] for j in result] == ['a', 'c', 'b'] - def test_sort_by_execution_time(self): - """Sort by execution_time should order by duration.""" + def test_sort_by_execution_duration(self): + """Sort by execution_duration should order by duration.""" jobs = [ {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, # 5s {'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, # 1s {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, # 3s ] - result = apply_sorting(jobs, 'execution_time', 'desc') + result = apply_sorting(jobs, 'execution_duration', 'desc') assert [j['id'] for j in result] == ['a', 'c', 'b'] def test_sort_with_none_values(self): @@ -233,7 +233,7 @@ class TestApplySorting: {'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None}, {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, ] - result = apply_sorting(jobs, 'execution_time', 'asc') + result = apply_sorting(jobs, 'execution_duration', 'asc') assert result[0]['id'] == 'b' # None treated as 0, comes first @@ -275,7 +275,7 @@ class TestNormalizeHistoryItem: ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'outputs': {}, - 'execution_time': 2.5, + 'execution_duration': 2.5, } job = normalize_history_item('prompt-456', history_item) @@ -309,7 +309,7 @@ class TestNormalizeHistoryItem: ] }, 'outputs': {}, - 'execution_time': 1.0, + 'execution_duration': 1.0, } # List view - includes execution_error @@ -332,7 +332,7 @@ class TestNormalizeHistoryItem: ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, - 'execution_time': 2.5, + 'execution_duration': 2.5, } job = normalize_history_item('prompt-123', history_item, include_outputs=True)