From 33109e0a04dd870bd8875eae0f9f490fa77c6dac Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Tue, 9 Jun 2026 21:28:25 -0700 Subject: [PATCH] feat(jobs): cursor-based pagination on GET /api/jobs (BE-943) (#14363) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor(pagination): hoist cursor codec to utils/ for cross-domain reuse The keyset cursor codec was asset-namespaced (app/assets/services/cursor.py) but the wire format and encode/decode logic are domain-agnostic. Move it to utils/cursor.py so the jobs endpoint can share one codec instead of importing across domains or duplicating it. * feat(jobs): cursor-based pagination on GET /api/jobs (BE-943) Mirror the cloud jobs cursor (BE-885) on the OSS Python server so the frontend sees one contract across runtimes. - apply_sorting now appends the job id as a tiebreaker, making (create_time, id) a stable keyset; without it, ties could reorder between pages. - get_all_jobs accepts an opaque 'after' cursor (honored only for created_at sort, like cloud), keyset-filters the sorted in-memory list, and returns has_more + a next_cursor. Minted in offset mode too so a client can bootstrap into keyset pagination. - server.py /api/jobs parses 'after', returns next_cursor in the pagination object, and maps a malformed cursor to 400 INVALID_CURSOR. - Reuses the shared utils.cursor codec (base64url JSON {s,v,id,o}) so the wire format matches cloud and assets exactly. Tests: asc/desc multi-page round-trip, same-create_time tiebreaker, last-page no-cursor, offset-mode bootstrap, execution_duration ignores cursor, malformed cursor raises. * refactor(jobs): return NamedTuple page, early-out on empty job set Review feedback on the jobs cursor pagination: - get_all_jobs now returns JobsPage, a NamedTuple, instead of a bare 4-tuple (callers unpack positionally either way). - Early-out when the filtered job set is empty so paging code never has to reason about indexing into an empty list. A malformed 'after' cursor is still decoded first and rejected with INVALID_CURSOR. - Document that job ids are server-assigned UUIDs, always present and unique — the empty-string fallback in _job_id_key only shields sorted() from a malformed dict, it is not part of the keyset contract. --- app/assets/api/routes.py | 2 +- app/assets/services/asset_management.py | 2 +- comfy_execution/jobs.py | 100 ++++++++++++++++-- server.py | 45 +++++--- .../assets_test/services/test_cursor.py | 4 +- tests/execution/test_jobs.py | 94 ++++++++++++++++ {app/assets/services => utils}/cursor.py | 2 +- 7 files changed, 218 insertions(+), 31 deletions(-) rename {app/assets/services => utils}/cursor.py (99%) diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index 544a614f2..389ca1462 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -39,7 +39,7 @@ from app.assets.services import ( update_asset_metadata, upload_from_temp_path, ) -from app.assets.services.cursor import InvalidCursorError +from utils.cursor import InvalidCursorError from app.assets.services.tagging import list_tag_histogram ROUTES = web.RouteTableDef() diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index 1072c95fa..5df630156 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -4,7 +4,7 @@ import os from datetime import timezone from typing import Sequence -from app.assets.services.cursor import ( +from utils.cursor import ( CursorPayload, InvalidCursorError, decode_cursor, diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index fcd7ef735..c951af017 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -3,9 +3,27 @@ Job utilities for the /api/jobs endpoint. Provides normalization and helper functions for job status tracking. """ -from typing import Optional +from typing import NamedTuple, Optional from comfy_api.internal import prune_dict +from utils.cursor import ( + decode_cursor, + decode_cursor_int, + encode_cursor, +) + +# Cursor pagination is defined only for the created_at timeline. execution_duration +# is a derived value with no stable keyset, so it stays offset-only (matching the +# cloud jobs implementation). +CURSOR_SORT_FIELD = 'created_at' + + +class JobsPage(NamedTuple): + """One page of the jobs listing, as returned by get_all_jobs.""" + jobs: list[dict] + total_count: int + has_more: bool + next_cursor: Optional[str] class JobStatus: @@ -282,18 +300,34 @@ def get_outputs_summary(outputs: dict) -> tuple[int, Optional[dict]]: return count, preview_output or fallback_preview +def _job_id_key(job: dict) -> str: + # Job ids are server-assigned prompt UUIDs and are always present and + # unique, so the (sort_value, id) pair below is a valid keyset. The + # fallback is not part of that contract — it only keeps a malformed job + # dict from raising TypeError inside sorted() (None is unorderable + # against str). + return job.get('id') or '' + + def apply_sorting(jobs: list[dict], sort_by: str, sort_order: str) -> list[dict]: - """Sort jobs list by specified field and order.""" + """Sort jobs list by specified field and order. + + The job ``id`` is appended as a tiebreaker so rows sharing a sort value have + a stable, deterministic order. This makes the (sort_value, id) pair a valid + keyset for cursor pagination — without it, ties could reorder between pages + and a cursor would skip or repeat rows. + """ reverse = (sort_order == 'desc') if sort_by == 'execution_duration': def get_sort_key(job): start = job.get('execution_start_time', 0) end = job.get('execution_end_time', 0) - return end - start if end and start else 0 + duration = end - start if end and start else 0 + return (duration, _job_id_key(job)) else: def get_sort_key(job): - return job.get('create_time', 0) + return (job.get('create_time') or 0, _job_id_key(job)) return sorted(jobs, key=get_sort_key, reverse=reverse) @@ -334,8 +368,9 @@ def get_all_jobs( sort_by: str = "created_at", sort_order: str = "desc", limit: Optional[int] = None, - offset: int = 0 -) -> tuple[list[dict], int]: + offset: int = 0, + after: Optional[str] = None +) -> JobsPage: """ Get all jobs (running, pending, completed) with filtering and sorting. @@ -348,10 +383,14 @@ def get_all_jobs( sort_by: Field to sort by ('created_at', 'execution_duration') sort_order: 'asc' or 'desc' limit: Maximum number of items to return - offset: Number of items to skip + offset: Number of items to skip (ignored when a cursor is supplied) + after: Opaque keyset cursor from a prior next_cursor. Honored only for + created_at sort; takes precedence over offset. Raises + InvalidCursorError on a malformed cursor. Returns: - tuple: (jobs_list, total_count) + JobsPage: (jobs, total_count, has_more, next_cursor) + next_cursor is non-None only for created_at sort when more rows remain. """ jobs = [] @@ -381,9 +420,50 @@ def get_all_jobs( total_count = len(jobs) - if offset > 0: + use_cursor = after is not None and sort_by == CURSOR_SORT_FIELD + cursor_payload = ( + decode_cursor(after, [CURSOR_SORT_FIELD], expected_order=sort_order) + if use_cursor + else None + ) + + # Early-out on an empty result set: nothing to page through and no cursor + # to mint, and downstream code never has to reason about indexing into an + # empty list. The cursor is still decoded above so a malformed `after` is + # rejected with INVALID_CURSOR even when there are no jobs. + if total_count == 0: + return JobsPage([], 0, False, None) + + if cursor_payload is not None: + ascending = sort_order == 'asc' + cursor_key = (decode_cursor_int(cursor_payload), cursor_payload.id) + jobs = [ + j for j in jobs + if (_job_keyset(j) > cursor_key if ascending else _job_keyset(j) < cursor_key) + ] + elif offset > 0: jobs = jobs[offset:] + + has_more = limit is not None and len(jobs) > limit if limit is not None: jobs = jobs[:limit] - return (jobs, total_count) + # Mint a forward cursor for the created_at timeline whenever more rows remain. + # Emitting it in offset mode too lets a client bootstrap into cursor pagination + # on its next request without a separate round trip. + next_cursor = None + if sort_by == CURSOR_SORT_FIELD and has_more and jobs: + last = jobs[-1] + next_cursor = encode_cursor( + CURSOR_SORT_FIELD, + str(last.get('create_time') or 0), + _job_id_key(last), + order=sort_order, + ) + + return JobsPage(jobs, total_count, has_more, next_cursor) + + +def _job_keyset(job: dict) -> tuple[int, str]: + """Keyset tuple matching the (create_time, id) ordering apply_sorting produces.""" + return (job.get('create_time') or 0, _job_id_key(job)) diff --git a/server.py b/server.py index a85c1e591..74b034a7d 100644 --- a/server.py +++ b/server.py @@ -9,6 +9,7 @@ import nodes import folder_paths import execution from comfy_execution.jobs import JobStatus, get_job, get_all_jobs +from utils.cursor import InvalidCursorError import uuid import urllib import json @@ -785,6 +786,8 @@ class PromptServer(): sort_order: Sort direction: asc, desc (default) limit: Max items to return (positive integer) offset: Items to skip (non-negative integer, default 0) + after: Opaque keyset cursor from a prior next_cursor; takes + precedence over offset. Honored only for created_at sort. """ query = request.rel_url.query @@ -792,6 +795,7 @@ class PromptServer(): workflow_id = query.get('workflow_id') sort_by = query.get('sort_by', 'created_at').lower() sort_order = query.get('sort_order', 'desc').lower() + after = query.get('after') status_filter = None if status_param: @@ -850,26 +854,35 @@ class PromptServer(): running = _remove_sensitive_from_queue(running) queued = _remove_sensitive_from_queue(queued) - jobs, total = get_all_jobs( - running, queued, history, - status_filter=status_filter, - workflow_id=workflow_id, - sort_by=sort_by, - sort_order=sort_order, - limit=limit, - offset=offset - ) + try: + jobs, total, has_more, next_cursor = get_all_jobs( + running, queued, history, + status_filter=status_filter, + workflow_id=workflow_id, + sort_by=sort_by, + sort_order=sort_order, + limit=limit, + offset=offset, + after=after + ) + except InvalidCursorError: + return web.json_response( + {"error": "Invalid pagination cursor", "code": "INVALID_CURSOR"}, + status=400 + ) - has_more = (offset + len(jobs)) < total + pagination = { + 'offset': offset, + 'limit': limit, + 'total': total, + 'has_more': has_more + } + if next_cursor is not None: + pagination['next_cursor'] = next_cursor return web.json_response({ 'jobs': jobs, - 'pagination': { - 'offset': offset, - 'limit': limit, - 'total': total, - 'has_more': has_more - } + 'pagination': pagination }) @routes.get("/api/jobs/{job_id}") diff --git a/tests-unit/assets_test/services/test_cursor.py b/tests-unit/assets_test/services/test_cursor.py index 47970e168..18863cd22 100644 --- a/tests-unit/assets_test/services/test_cursor.py +++ b/tests-unit/assets_test/services/test_cursor.py @@ -1,4 +1,4 @@ -"""Tests for app.assets.services.cursor. +"""Tests for utils.cursor. Cursors are opaque tokens internal to this server — these tests cover round-tripping, validation, and length caps, not any particular wire @@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone import pytest -from app.assets.services.cursor import ( +from utils.cursor import ( MAX_CURSOR_ID_LENGTH, MAX_CURSOR_VALUE_LENGTH, MAX_ENCODED_CURSOR_LENGTH, diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 814af5c13..5815d5564 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -1,5 +1,7 @@ """Unit tests for comfy_execution/jobs.py""" +import pytest + from comfy_execution.jobs import ( JobStatus, is_previewable, @@ -9,8 +11,10 @@ from comfy_execution.jobs import ( normalize_outputs, get_outputs_summary, apply_sorting, + get_all_jobs, has_3d_extension, ) +from utils.cursor import InvalidCursorError class TestJobStatus: @@ -595,3 +599,93 @@ class TestNormalizeOutputs: 'result': ['data.json', [1, 2, 3]], } } + + +def _completed_history(jobs_by_id: dict) -> dict: + """Build a history dict of completed jobs keyed by id, with the given create_times.""" + return { + job_id: { + 'prompt': (0, '', {}, {'create_time': create_time}, {}), + 'status': {'status_str': 'success', 'messages': []}, + 'outputs': {}, + } + for job_id, create_time in jobs_by_id.items() + } + + +def _walk_cursor(history: dict, sort_order: str, limit: int) -> list[str]: + """Page through every job using only next_cursor, asserting the page invariants.""" + collected: list[str] = [] + seen: set[str] = set() + after = None + for _ in range(100): + jobs, _total, has_more, next_cursor = get_all_jobs( + [], [], history, sort_order=sort_order, limit=limit, after=after + ) + assert len(jobs) <= limit + for job in jobs: + assert job['id'] not in seen, f"{job['id']} returned on two pages" + seen.add(job['id']) + collected.append(job['id']) + if not has_more: + assert next_cursor is None, "final page must not emit a cursor" + return collected + assert next_cursor is not None, "non-final page must emit a cursor" + after = next_cursor + raise AssertionError("cursor paging did not terminate") + + +class TestGetAllJobsCursor: + """Cursor pagination on get_all_jobs().""" + + def test_round_trip_desc(self): + history = _completed_history({'j1': 100, 'j2': 200, 'j3': 300, 'j4': 400, 'j5': 500}) + assert _walk_cursor(history, 'desc', 2) == ['j5', 'j4', 'j3', 'j2', 'j1'] + + def test_round_trip_asc(self): + history = _completed_history({'j1': 100, 'j2': 200, 'j3': 300, 'j4': 400, 'j5': 500}) + assert _walk_cursor(history, 'asc', 2) == ['j1', 'j2', 'j3', 'j4', 'j5'] + + def test_tiebreaker_same_create_time(self): + """Rows sharing a create_time must page by the id tiebreaker with no gaps or repeats.""" + history = _completed_history({'a': 100, 'b': 100, 'c': 100}) + # Ground truth: a single page large enough to hold them all, same sort. + single, _total, _hm, _nc = get_all_jobs([], [], history, sort_order='desc', limit=10) + truth = [j['id'] for j in single] + assert _walk_cursor(history, 'desc', 1) == truth + assert sorted(truth) == ['a', 'b', 'c'] + + def test_final_page_omits_cursor(self): + history = _completed_history({'j1': 100, 'j2': 200, 'j3': 300}) + jobs, total, has_more, next_cursor = get_all_jobs( + [], [], history, sort_order='desc', limit=3 + ) + assert total == 3 + assert has_more is False + assert next_cursor is None + + def test_offset_mode_mints_bootstrap_cursor(self): + """First page in offset mode still emits a cursor so a client can switch to keyset.""" + history = _completed_history({'j1': 100, 'j2': 200, 'j3': 300}) + jobs, _total, has_more, next_cursor = get_all_jobs( + [], [], history, sort_order='desc', limit=2 + ) + assert [j['id'] for j in jobs] == ['j3', 'j2'] + assert has_more is True + assert next_cursor is not None + + def test_cursor_ignored_for_execution_duration_sort(self): + """execution_duration has no keyset; a cursor is ignored and none is minted.""" + history = _completed_history({'j1': 100, 'j2': 200, 'j3': 300}) + _first, _t, _hm, cursor = get_all_jobs([], [], history, sort_order='desc', limit=2) + jobs, _total, _has_more, next_cursor = get_all_jobs( + [], [], history, sort_by='execution_duration', sort_order='desc', + limit=2, after=cursor + ) + assert next_cursor is None + assert len(jobs) == 2 # offset mode, cursor disregarded + + def test_malformed_cursor_raises(self): + history = _completed_history({'j1': 100}) + with pytest.raises(InvalidCursorError): + get_all_jobs([], [], history, sort_order='desc', limit=2, after='not-a-cursor') diff --git a/app/assets/services/cursor.py b/utils/cursor.py similarity index 99% rename from app/assets/services/cursor.py rename to utils/cursor.py index 6c7791528..d140f8fef 100644 --- a/app/assets/services/cursor.py +++ b/utils/cursor.py @@ -1,4 +1,4 @@ -"""Opaque keyset-pagination cursor for /api/assets. +"""Opaque keyset-pagination cursor shared by /api/assets and /api/jobs. Payload JSON uses short keys to keep the encoded length small: