From 5274b91f257b82cd465bd33ea8a4070a44db6504 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 12:51:24 -0800 Subject: [PATCH] use ws messages to get start and end times --- comfy_execution/jobs.py | 25 ++++++++++---------- execution.py | 4 +--- main.py | 13 +++++----- tests/execution/test_jobs.py | 46 ++++++++++++++++++++---------------- 4 files changed, 45 insertions(+), 43 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index d9e23ceed..ca7401b1c 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -90,21 +90,20 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): outputs_count, preview_output = get_outputs_summary(outputs) execution_error = None - if status == JobStatus.FAILED and status_info: - messages = status_info.get('messages', []) - for entry in messages: - if isinstance(entry, (list, tuple)) and len(entry) >= 2 and entry[0] == 'execution_error': - detail = entry[1] - if isinstance(detail, dict): - execution_error = detail - break - - execution_duration = history_item.get('execution_duration') execution_start_time = None execution_end_time = None - if execution_duration is not None and create_time is not None: - execution_end_time = create_time + int(execution_duration * 1000) - execution_start_time = create_time + if status_info: + messages = status_info.get('messages', []) + for entry in messages: + if isinstance(entry, (list, tuple)) and len(entry) >= 2: + event_name, event_data = entry[0], entry[1] + if isinstance(event_data, dict): + if event_name == 'execution_start': + execution_start_time = event_data.get('timestamp') + elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'): + execution_end_time = event_data.get('timestamp') + if event_name == 'execution_error': + execution_error = event_data job = { 'id': prompt_id, diff --git a/execution.py b/execution.py index 99c07bfec..c552214d4 100644 --- a/execution.py +++ b/execution.py @@ -1136,8 +1136,7 @@ class PromptQueue: messages: List[str] def task_done(self, item_id, history_result, - status: Optional['PromptQueue.ExecutionStatus'], process_item=None, - execution_duration: Optional[float] = None): + status: Optional['PromptQueue.ExecutionStatus'], process_item=None): with self.mutex: prompt = self.currently_running.pop(item_id) if len(self.history) > MAXIMUM_HISTORY_SIZE: @@ -1154,7 +1153,6 @@ class PromptQueue: "prompt": prompt, "outputs": {}, 'status': status_dict, - 'execution_duration': execution_duration, } self.history[prompt[1]].update(history_result) self.server.queue_updated() diff --git a/main.py b/main.py index d2e3682fa..1eb4423b8 100644 --- a/main.py +++ b/main.py @@ -230,7 +230,7 @@ def prompt_worker(q, server_instance): need_gc = True current_time = time.perf_counter() - execution_duration = current_time - execution_start_time + execution_time = current_time - execution_start_time remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, @@ -239,17 +239,16 @@ def prompt_worker(q, server_instance): status_str='success' if e.success else 'error', completed=e.success, messages=e.status_messages), - process_item=remove_sensitive, - execution_duration=execution_duration) + process_item=remove_sensitive) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) # Log Time in a more readable way after 10 minutes - if execution_duration > 600: - execution_duration_str = time.strftime("%H:%M:%S", time.gmtime(execution_duration)) - logging.info(f"Prompt executed in {execution_duration_str}") + if execution_time > 600: + execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) + logging.info(f"Prompt executed in {execution_time}") else: - logging.info("Prompt executed in {:.2f} seconds".format(execution_duration)) + logging.info("Prompt executed in {:.2f} seconds".format(execution_time)) flags = q.get_flags() free_memory = flags.get("free_memory", False) diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 2b63e1e4e..b01d26ecc 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -269,7 +269,7 @@ class TestNormalizeHistoryItem: """Unit tests for normalize_history_item()""" def test_completed_job(self): - """Completed history item should have correct status.""" + """Completed history item should have correct status and times from messages.""" history_item = { 'prompt': ( 5, # priority @@ -278,54 +278,61 @@ class TestNormalizeHistoryItem: { 'create_time': 1234567890000, 'extra_pnginfo': {'workflow': {'id': 'workflow-xyz'}} - }, # milliseconds + }, ['node1'], ), - 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'status': { + 'status_str': 'success', + 'completed': True, + 'messages': [ + ('execution_start', {'prompt_id': 'prompt-456', 'timestamp': 1234567890500}), + ('execution_success', {'prompt_id': 'prompt-456', 'timestamp': 1234567893000}), + ] + }, 'outputs': {}, - 'execution_duration': 2.5, } job = normalize_history_item('prompt-456', history_item) assert job['id'] == 'prompt-456' assert job['status'] == 'completed' assert job['priority'] == 5 - assert job['execution_start_time'] == 1234567890000 - assert job['execution_end_time'] == 1234567890000 + 2500 # +2.5 seconds in ms + assert job['execution_start_time'] == 1234567890500 + assert job['execution_end_time'] == 1234567893000 assert job['workflow_id'] == 'workflow-xyz' def test_failed_job(self): - """Failed history item should have failed status and message.""" - error_detail = { - 'node_id': '5', - 'node_type': 'KSampler', - 'exception_message': 'CUDA out of memory', - 'exception_type': 'RuntimeError', - 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], - } + """Failed history item should have failed status and error from messages.""" history_item = { 'prompt': ( 5, 'prompt-789', {'nodes': {}}, - {'create_time': 1234567890}, + {'create_time': 1234567890000}, ['node1'], ), 'status': { 'status_str': 'error', 'completed': False, 'messages': [ - ('execution_error', error_detail) + ('execution_start', {'prompt_id': 'prompt-789', 'timestamp': 1234567890500}), + ('execution_error', { + 'prompt_id': 'prompt-789', + 'node_id': '5', + 'node_type': 'KSampler', + 'exception_message': 'CUDA out of memory', + 'exception_type': 'RuntimeError', + 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], + 'timestamp': 1234567891000, + }) ] }, 'outputs': {}, - 'execution_duration': 1.0, } - # List view - includes execution_error job = normalize_history_item('prompt-789', history_item) assert job['status'] == 'failed' - assert job['execution_error'] == error_detail + assert job['execution_start_time'] == 1234567890500 + assert job['execution_end_time'] == 1234567891000 assert job['execution_error']['node_id'] == '5' assert job['execution_error']['node_type'] == 'KSampler' assert job['execution_error']['exception_message'] == 'CUDA out of memory' @@ -342,7 +349,6 @@ class TestNormalizeHistoryItem: ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, - 'execution_duration': 2.5, } job = normalize_history_item('prompt-123', history_item, include_outputs=True)