diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index fdb3902ba..af666746d 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -60,8 +60,9 @@ class ListAssetsQuery(BaseModel): limit: conint(ge=1, le=500) = 20 offset: conint(ge=0) = 0 # Opaque keyset cursor. When supplied, `offset` is ignored. Cursor pagination - # is supported for sort values `created_at`, `updated_at`, `name`, `size`; - # `last_access_time` falls back to offset/limit. + # is supported for sort values `created_at`, `updated_at`, `name`, `size`. + # Supplying `after` together with `sort=last_access_time` returns + # 400 INVALID_CURSOR; that sort only supports offset/limit. after: str | None = None sort: Literal["name", "created_at", "updated_at", "size", "last_access_time"] = ( diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index 730384e8c..93d32aaa8 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -279,7 +279,11 @@ def list_assets_page( """ cursor_value: object | None = None cursor_id: str | None = None - use_cursor_mode = after is not None and sort in _CURSOR_SORT_FIELDS + # Mint next_cursor on every page where the sort is cursor-supported, not + # only when the request itself arrived with a cursor. Otherwise a first + # request (no `after`) returns next_cursor=None and the client can never + # enter cursor mode. + mint_cursor = sort in _CURSOR_SORT_FIELDS if after is not None: if sort not in _CURSOR_SORT_FIELDS: @@ -293,6 +297,11 @@ def list_assets_page( ) cursor_value, cursor_id = _resolve_cursor_value(payload), payload.id + # Over-fetch by one row so we can distinguish "exactly `limit` rows total + # remaining" from "more rows past this page" without a second query. Drop + # the sentinel before returning. + fetch_limit = limit + 1 if mint_cursor else limit + with create_session() as session: refs, tag_map, total = list_references_page( session, @@ -301,7 +310,7 @@ def list_assets_page( exclude_tags=exclude_tags, name_contains=name_contains, metadata_filter=metadata_filter, - limit=limit, + limit=fetch_limit, offset=offset, sort=sort, order=order, @@ -309,6 +318,14 @@ def list_assets_page( after_cursor_id=cursor_id, ) + next_cursor: str | None = None + if mint_cursor and len(refs) > limit: + # There's at least one more row past this page — mint a cursor from + # the last row of the page (i.e. index `limit - 1`, since we + # over-fetched), and drop the sentinel. + next_cursor = _encode_next_cursor(refs[limit - 1], sort) + refs = refs[:limit] + items: list[AssetSummaryData] = [] for ref in refs: items.append( @@ -319,10 +336,6 @@ def list_assets_page( ) ) - next_cursor: str | None = None - if use_cursor_mode and len(refs) == limit: - next_cursor = _encode_next_cursor(refs[-1], sort) - return ListAssetsResult(items=items, total=total, next_cursor=next_cursor) @@ -337,15 +350,24 @@ def _resolve_cursor_value(payload: CursorPayload) -> object: return payload.value # name, str-typed -def _encode_next_cursor(ref, sort: str) -> str: - """Mint a cursor pointing at *ref* for the given sort dimension.""" +def _encode_next_cursor(ref, sort: str) -> str | None: + """Mint a cursor pointing at *ref* for the given sort dimension. + + Returns None when the boundary row carries a NULL sort value (e.g. an asset + record whose size_bytes hasn't been backfilled). Continuing pagination + across a NULL boundary is undefined under keyset ordering — better to + truncate cleanly here than to mint a cursor that mis-positions. + """ if sort == "name": return encode_cursor("name", ref.name, ref.id) if sort == "size": - size = ref.asset.size_bytes if ref.asset is not None else 0 - return encode_cursor("size", str(size), ref.id) + if ref.asset is None or ref.asset.size_bytes is None: + return None + return encode_cursor("size", str(ref.asset.size_bytes), ref.id) # created_at / updated_at — DB datetimes are naive UTC; attach tz before encoding. value = ref.created_at if sort == "created_at" else ref.updated_at + if value is None: + return None return encode_cursor_from_time(sort, value.replace(tzinfo=timezone.utc), ref.id) diff --git a/app/assets/services/cursor.py b/app/assets/services/cursor.py index 51faa05e5..70b359f3e 100644 --- a/app/assets/services/cursor.py +++ b/app/assets/services/cursor.py @@ -32,9 +32,16 @@ class InvalidCursorError(ValueError): # Wire-format length caps. Cursors are user-controlled, so caps protect the # decode path from oversized allocations and downstream SQL predicates from -# unbounded strings. Same numbers as cloud/common/pagination/cursor.go. +# unbounded strings. +# +# MAX_CURSOR_VALUE_LENGTH is 512 (vs cloud's 256) to fit OSS's +# `AssetReference.name` column max (String(512)) — otherwise a long-named +# asset would mint a cursor the same server then refuses on the next request. +# Cloud's data model has shorter names so its lower cap is fine there; +# cross-runtime byte-identity is unaffected because no real cloud cursor ever +# carries a value > 256. MAX_ENCODED_CURSOR_LENGTH = 1024 -MAX_CURSOR_VALUE_LENGTH = 256 +MAX_CURSOR_VALUE_LENGTH = 512 MAX_CURSOR_ID_LENGTH = 128 @@ -122,7 +129,12 @@ def decode_cursor_time(payload: Optional[CursorPayload]) -> datetime: micros = int(payload.value) except ValueError as e: raise InvalidCursorError(f"value is not a valid timestamp: {e}") from e - return _unix_micros_to_datetime(micros) + try: + return _unix_micros_to_datetime(micros) + except (OverflowError, OSError, ValueError) as e: + # Crafted out-of-range microseconds (e.g. > datetime.MAX_YEAR) blow up + # in fromtimestamp / datetime construction. Map to 400, not 500. + raise InvalidCursorError(f"value is out of representable range: {e}") from e def decode_cursor_int(payload: Optional[CursorPayload]) -> int: diff --git a/openapi.yaml b/openapi.yaml index d75b84b43..464751c4f 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -1594,6 +1594,12 @@ paths: application/json: schema: $ref: "#/components/schemas/ListAssetsResponse" + "400": + description: Malformed query or cursor (e.g. `INVALID_CURSOR`) + content: + application/json: + schema: + $ref: "#/components/schemas/AssetsApiError" post: operationId: createAsset tags: [assets] @@ -6498,6 +6504,29 @@ components: Opaque cursor to fetch the next page. Pass back as the `after` query parameter. Omitted when there are no more results. + AssetsApiError: + type: object + description: Error envelope returned by the assets API on 400 responses. + required: + - error + properties: + error: + type: object + required: + - code + - message + properties: + code: + type: string + description: | + Machine-readable error code. `INVALID_CURSOR` is returned when the + `after` cursor is malformed, oversized, or its sort field does + not match the request's `sort`. `INVALID_QUERY` covers other + Pydantic validation failures. + enum: [INVALID_CURSOR, INVALID_QUERY] + message: + type: string + TagInfo: type: object description: A tag known to the asset database, with the number of assets bearing it. diff --git a/tests-unit/assets_test/services/test_cursor.py b/tests-unit/assets_test/services/test_cursor.py index dcf41aaf7..c96cad12f 100644 --- a/tests-unit/assets_test/services/test_cursor.py +++ b/tests-unit/assets_test/services/test_cursor.py @@ -162,6 +162,40 @@ class TestEncodeAtCapsFits: assert payload.id == id +class TestDatetimeOverflow: + """Crafted cursors with extreme micros must map to InvalidCursorError, + not OverflowError/OSError leaking as 500. + """ + + @pytest.mark.parametrize( + "micros_str", + [ + "999999999999999999999", # 10^21 µs — past datetime.MAX_YEAR by ~14 orders + "-999999999999999999999", # symmetric negative — pre-epoch overflow + ], + ) + def test_out_of_range_micros_rejected(self, micros_str): + encoded = encode_cursor("created_at", micros_str, "asset-x") + payload = decode_cursor(encoded, ALLOWED) + with pytest.raises(InvalidCursorError): + decode_cursor_time(payload) + + +class TestEncoderDecoderSymmetry: + """The encoder must reject inputs the decoder rejects, or the same server + will mint a cursor it then 400s on the next request. + """ + + def test_long_name_within_cap_round_trips(self): + """OSS assets allow names up to 512 chars (`String(512)`); cursor must + handle that. Cloud's lower cap is acceptable on its side because the + cloud schema doesn't permit names that long.""" + long_name = "n" * MAX_CURSOR_VALUE_LENGTH + encoded = encode_cursor("name", long_name, "asset-x") + payload = decode_cursor(encoded, ALLOWED) + assert payload.value == long_name + + class TestByteIdentityWithCloud: """Lock the wire format against drift from cloud's Go implementation. diff --git a/tests-unit/assets_test/test_list_cursor.py b/tests-unit/assets_test/test_list_cursor.py index 75f965100..87eae4c03 100644 --- a/tests-unit/assets_test/test_list_cursor.py +++ b/tests-unit/assets_test/test_list_cursor.py @@ -4,6 +4,9 @@ Wire contract is shared with cloud's Go implementation (BE-893). These tests exercise the handler/service/query path end-to-end; cursor-encoding-level tests live in tests-unit/assets_test/services/test_cursor.py. """ +import time + +import pytest import requests @@ -146,6 +149,105 @@ def test_next_cursor_absent_when_no_more_results(http: requests.Session, api_bas assert "next_cursor" not in body +def test_cursor_pagination_first_page_mints_cursor(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """First-page request (no `after`) must still return `next_cursor` when + more rows exist, or pagination is unreachable from a cold start. + """ + _seed(asset_factory, make_asset_bytes, count=3, tag="cursor-first-page") + r = http.get( + api_base + "/api/assets", + params={"include_tags": "unit-tests,cursor-first-page", "sort": "name", "order": "asc", "limit": "2"}, + timeout=120, + ) + body = r.json() + assert body["has_more"] is True + assert body.get("next_cursor"), "first page must mint a cursor when more rows exist" + + +def test_cursor_no_spurious_cursor_when_page_size_equals_remainder(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """When `total` is an exact multiple of `limit`, the final page must + NOT carry a next_cursor — there is nothing past it. + """ + _seed(asset_factory, make_asset_bytes, count=4, tag="cursor-exact-multiple") + # Page 1 + r = http.get( + api_base + "/api/assets", + params={"include_tags": "unit-tests,cursor-exact-multiple", "sort": "name", "order": "asc", "limit": "2"}, + timeout=120, + ) + cursor = r.json()["next_cursor"] + assert cursor is not None + # Page 2 — should exhaust the set with no cursor for a phantom page 3 + r2 = http.get( + api_base + "/api/assets", + params={"include_tags": "unit-tests,cursor-exact-multiple", "sort": "name", "order": "asc", "limit": "2", "after": cursor}, + timeout=120, + ) + body = r2.json() + assert len(body["assets"]) == 2 + assert body["has_more"] is False + assert "next_cursor" not in body + + +@pytest.mark.parametrize("sort_field", ["created_at", "updated_at", "size"]) +def test_cursor_walks_for_non_name_sorts(sort_field, http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """Cursor pagination must work for every sort field the contract claims. + + Without this, the `created_at` / `updated_at` (time-encoded micros) and + `size` (int-encoded) cursor paths go entirely unexercised end-to-end. + """ + # Stagger create_time slightly so created_at / updated_at sort is well-defined. + names = [] + for i in range(4): + n = f"cursor_{sort_field}_{i:02d}.safetensors" + asset_factory(n, ["models", "checkpoints", "unit-tests", f"cursor-{sort_field}"], {}, make_asset_bytes(n, size=2048 + i)) + time.sleep(0.05) # ensure distinct timestamps + names.append(n) + + params = { + "include_tags": f"unit-tests,cursor-{sort_field}", + "sort": sort_field, + "order": "desc", + "limit": "2", + } + seen: list[str] = [] + after: str | None = None + pages = 0 + while True: + page_params = dict(params) + if after is not None: + page_params["after"] = after + r = http.get(api_base + "/api/assets", params=page_params, timeout=120) + assert r.status_code == 200, r.text + body = r.json() + seen.extend(a["name"] for a in body["assets"]) + after = body.get("next_cursor") + pages += 1 + if after is None: + break + assert pages < 10, "guard against runaway cursor loop" + + assert set(seen) == set(names), f"missing items for sort={sort_field}: expected {set(names)}, got {set(seen)}" + + +def test_cursor_invalid_cursor_at_microsecond_boundary(http: requests.Session, api_base: str): + """A cursor carrying an out-of-range microsecond timestamp must map to + 400 INVALID_CURSOR, not 500.""" + import base64 + import json + # 10^18 microseconds ≈ year 33658, well past datetime.MAX_YEAR. + payload = {"s": "created_at", "v": "999999999999999999999", "id": "asset-x"} + raw = json.dumps(payload, separators=(",", ":")).encode("utf-8") + cursor = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + r = http.get( + api_base + "/api/assets", + params={"after": cursor, "sort": "created_at"}, + timeout=120, + ) + assert r.status_code == 400, r.text + assert r.json()["error"]["code"] == "INVALID_CURSOR" + + def test_cursor_pagination_stable_after_delete(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): names = _seed(asset_factory, make_asset_bytes, count=4, tag="cursor-delete")