re-use queue and history functions

This commit is contained in:
Richard Yu 2025-12-03 20:03:38 -08:00
parent e4c713633a
commit 90fb5cc1e8
5 changed files with 61 additions and 56 deletions

View File

@ -97,11 +97,11 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
execution_error = detail execution_error = detail
break break
execution_time_duration = history_item.get('execution_time') execution_duration = history_item.get('execution_duration')
execution_start_time = None execution_start_time = None
execution_end_time = None execution_end_time = None
if execution_time_duration is not None and create_time is not None: if execution_duration is not None and create_time is not None:
execution_end_time = create_time + int(execution_time_duration * 1000) execution_end_time = create_time + int(execution_duration * 1000)
execution_start_time = create_time execution_start_time = create_time
job = { job = {
@ -167,7 +167,7 @@ def apply_sorting(jobs, sort_by, sort_order):
"""Sort jobs list by specified field and order.""" """Sort jobs list by specified field and order."""
reverse = (sort_order == 'desc') reverse = (sort_order == 'desc')
if sort_by == 'execution_time': if sort_by == 'execution_duration':
def get_sort_key(job): def get_sort_key(job):
start = job.get('execution_start_time') or 0 start = job.get('execution_start_time') or 0
end = job.get('execution_end_time') or 0 end = job.get('execution_end_time') or 0

View File

@ -1137,7 +1137,7 @@ class PromptQueue:
def task_done(self, item_id, history_result, def task_done(self, item_id, history_result,
status: Optional['PromptQueue.ExecutionStatus'], process_item=None, status: Optional['PromptQueue.ExecutionStatus'], process_item=None,
execution_time: Optional[float] = None): execution_duration: Optional[float] = None):
with self.mutex: with self.mutex:
prompt = self.currently_running.pop(item_id) prompt = self.currently_running.pop(item_id)
if len(self.history) > MAXIMUM_HISTORY_SIZE: if len(self.history) > MAXIMUM_HISTORY_SIZE:
@ -1154,7 +1154,7 @@ class PromptQueue:
"prompt": prompt, "prompt": prompt,
"outputs": {}, "outputs": {},
'status': status_dict, 'status': status_dict,
'execution_time': execution_time, 'execution_duration': execution_duration,
} }
self.history[prompt[1]].update(history_result) self.history[prompt[1]].update(history_result)
self.server.queue_updated() self.server.queue_updated()
@ -1233,15 +1233,18 @@ class PromptQueue:
def get_job(self, prompt_id): def get_job(self, prompt_id):
"""Get a single job by prompt_id from history or queue.""" """Get a single job by prompt_id from history or queue."""
with self.mutex: history = self.get_history(prompt_id=prompt_id)
if prompt_id in self.history:
return normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True)
for item in self.currently_running.values(): if prompt_id in history:
return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True)
running, queued = self.get_current_queue_volatile()
for item in running:
if item[1] == prompt_id: if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.IN_PROGRESS) return normalize_queue_item(item, JobStatus.IN_PROGRESS)
for item in self.queue: for item in queued:
if item[1] == prompt_id: if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.PENDING) return normalize_queue_item(item, JobStatus.PENDING)
@ -1253,7 +1256,7 @@ class PromptQueue:
Args: Args:
status_filter: list of statuses to include (from JobStatus.ALL) status_filter: list of statuses to include (from JobStatus.ALL)
sort_by: field to sort by ('created_at', 'execution_time') sort_by: field to sort by ('created_at', 'execution_duration')
sort_order: 'asc' or 'desc' sort_order: 'asc' or 'desc'
limit: maximum number of items to return limit: maximum number of items to return
offset: number of items to skip offset: number of items to skip
@ -1261,24 +1264,26 @@ class PromptQueue:
Returns: Returns:
tuple: (jobs_list, total_count) tuple: (jobs_list, total_count)
""" """
with self.mutex: running, queued = self.get_current_queue_volatile()
history = self.get_history()
jobs = [] jobs = []
if status_filter is None: if status_filter is None:
status_filter = JobStatus.ALL status_filter = JobStatus.ALL
if JobStatus.IN_PROGRESS in status_filter: if JobStatus.IN_PROGRESS in status_filter:
for item in self.currently_running.values(): for item in running:
jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS))
if JobStatus.PENDING in status_filter: if JobStatus.PENDING in status_filter:
for item in self.queue: for item in queued:
jobs.append(normalize_queue_item(item, JobStatus.PENDING)) jobs.append(normalize_queue_item(item, JobStatus.PENDING))
include_completed = JobStatus.COMPLETED in status_filter include_completed = JobStatus.COMPLETED in status_filter
include_failed = JobStatus.FAILED in status_filter include_failed = JobStatus.FAILED in status_filter
if include_completed or include_failed: if include_completed or include_failed:
for prompt_id, history_item in self.history.items(): for prompt_id, history_item in history.items():
is_failed = history_item.get('status', {}).get('status_str') == 'error' is_failed = history_item.get('status', {}).get('status_str') == 'error'
if (is_failed and include_failed) or (not is_failed and include_completed): if (is_failed and include_failed) or (not is_failed and include_completed):
jobs.append(normalize_history_item(prompt_id, history_item)) jobs.append(normalize_history_item(prompt_id, history_item))

12
main.py
View File

@ -230,7 +230,7 @@ def prompt_worker(q, server_instance):
need_gc = True need_gc = True
current_time = time.perf_counter() current_time = time.perf_counter()
execution_time = current_time - execution_start_time execution_duration = current_time - execution_start_time
remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] remove_sensitive = lambda prompt: prompt[:5] + prompt[6:]
q.task_done(item_id, q.task_done(item_id,
@ -240,16 +240,16 @@ def prompt_worker(q, server_instance):
completed=e.success, completed=e.success,
messages=e.status_messages), messages=e.status_messages),
process_item=remove_sensitive, process_item=remove_sensitive,
execution_time=execution_time) execution_duration=execution_duration)
if server_instance.client_id is not None: if server_instance.client_id is not None:
server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id)
# Log Time in a more readable way after 10 minutes # Log Time in a more readable way after 10 minutes
if execution_time > 600: if execution_duration > 600:
execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) execution_duration_str = time.strftime("%H:%M:%S", time.gmtime(execution_duration))
logging.info(f"Prompt executed in {execution_time}") logging.info(f"Prompt executed in {execution_duration_str}")
else: else:
logging.info("Prompt executed in {:.2f} seconds".format(execution_time)) logging.info("Prompt executed in {:.2f} seconds".format(execution_duration))
flags = q.get_flags() flags = q.get_flags()
free_memory = flags.get("free_memory", False) free_memory = flags.get("free_memory", False)

View File

@ -727,9 +727,9 @@ class PromptServer():
if 'limit' in query: if 'limit' in query:
try: try:
limit = int(query.get('limit')) limit = int(query.get('limit'))
if limit <= 0 or limit > 500: if limit <= 0:
return web.json_response( return web.json_response(
{"error": "limit must be between 1 and 500"}, {"error": "limit must be a positive integer"},
status=400 status=400
) )
except (ValueError, TypeError): except (ValueError, TypeError):

View File

@ -216,14 +216,14 @@ class TestApplySorting:
result = apply_sorting(jobs, 'created_at', 'asc') result = apply_sorting(jobs, 'created_at', 'asc')
assert [j['id'] for j in result] == ['a', 'c', 'b'] assert [j['id'] for j in result] == ['a', 'c', 'b']
def test_sort_by_execution_time(self): def test_sort_by_execution_duration(self):
"""Sort by execution_time should order by duration.""" """Sort by execution_duration should order by duration."""
jobs = [ jobs = [
{'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, # 5s {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, # 5s
{'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, # 1s {'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, # 1s
{'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, # 3s {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, # 3s
] ]
result = apply_sorting(jobs, 'execution_time', 'desc') result = apply_sorting(jobs, 'execution_duration', 'desc')
assert [j['id'] for j in result] == ['a', 'c', 'b'] assert [j['id'] for j in result] == ['a', 'c', 'b']
def test_sort_with_none_values(self): def test_sort_with_none_values(self):
@ -233,7 +233,7 @@ class TestApplySorting:
{'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None}, {'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None},
{'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200},
] ]
result = apply_sorting(jobs, 'execution_time', 'asc') result = apply_sorting(jobs, 'execution_duration', 'asc')
assert result[0]['id'] == 'b' # None treated as 0, comes first assert result[0]['id'] == 'b' # None treated as 0, comes first
@ -275,7 +275,7 @@ class TestNormalizeHistoryItem:
), ),
'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'status': {'status_str': 'success', 'completed': True, 'messages': []},
'outputs': {}, 'outputs': {},
'execution_time': 2.5, 'execution_duration': 2.5,
} }
job = normalize_history_item('prompt-456', history_item) job = normalize_history_item('prompt-456', history_item)
@ -309,7 +309,7 @@ class TestNormalizeHistoryItem:
] ]
}, },
'outputs': {}, 'outputs': {},
'execution_time': 1.0, 'execution_duration': 1.0,
} }
# List view - includes execution_error # List view - includes execution_error
@ -332,7 +332,7 @@ class TestNormalizeHistoryItem:
), ),
'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'status': {'status_str': 'success', 'completed': True, 'messages': []},
'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}},
'execution_time': 2.5, 'execution_duration': 2.5,
} }
job = normalize_history_item('prompt-123', history_item, include_outputs=True) job = normalize_history_item('prompt-123', history_item, include_outputs=True)