diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index d8786d52c..20abde960 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -39,6 +39,7 @@ from app.assets.services import ( update_asset_metadata, upload_from_temp_path, ) +from app.assets.services.cursor import InvalidCursorError from app.assets.services.path_utils import compute_paths_for_response from app.assets.services.tagging import list_tag_histogram @@ -182,7 +183,7 @@ def _build_asset_response(result: schemas.AssetDetailResult | schemas.UploadResu user_metadata=result.ref.user_metadata or {}, metadata=result.ref.system_metadata, job_id=result.ref.job_id, - prompt_id=result.ref.job_id, # deprecated: mirrors job_id for cloud compat + prompt_id=result.ref.job_id, # deprecated alias of job_id, kept for compatibility created_at=result.ref.created_at, updated_at=result.ref.updated_at, last_access_time=result.ref.last_access_time, @@ -219,24 +220,37 @@ async def list_assets_route(request: web.Request) -> web.Response: order_candidate = (q.order or "desc").lower() order = order_candidate if order_candidate in {"asc", "desc"} else "desc" - result = list_assets_page( - owner_id=USER_MANAGER.get_request_user_id(request), - include_tags=q.include_tags, - exclude_tags=q.exclude_tags, - name_contains=q.name_contains, - metadata_filter=q.metadata_filter, - limit=q.limit, - offset=q.offset, - sort=sort, - order=order, - ) + try: + result = list_assets_page( + owner_id=USER_MANAGER.get_request_user_id(request), + include_tags=q.include_tags, + exclude_tags=q.exclude_tags, + name_contains=q.name_contains, + metadata_filter=q.metadata_filter, + limit=q.limit, + offset=q.offset, + sort=sort, + order=order, + after=q.after, + ) + except InvalidCursorError as e: + return _build_error_response(400, "INVALID_CURSOR", str(e)) summaries = [_build_asset_response(item) for item in result.items] + # has_more semantics differ by mode: + # - cursor mode: a non-empty next_cursor means there are more results. + # - offset mode: derived from total - (offset + page size). + if q.after is not None: + has_more = result.next_cursor is not None + else: + has_more = (q.offset + len(summaries)) < result.total + payload = schemas_out.AssetsList( assets=summaries, total=result.total, - has_more=(q.offset + len(summaries)) < result.total, + has_more=has_more, + next_cursor=result.next_cursor, ) return web.json_response(payload.model_dump(mode="json", exclude_none=True)) diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index 186a6ae1e..af666746d 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -59,6 +59,11 @@ class ListAssetsQuery(BaseModel): limit: conint(ge=1, le=500) = 20 offset: conint(ge=0) = 0 + # Opaque keyset cursor. When supplied, `offset` is ignored. Cursor pagination + # is supported for sort values `created_at`, `updated_at`, `name`, `size`. + # Supplying `after` together with `sort=last_access_time` returns + # 400 INVALID_CURSOR; that sort only supports offset/limit. + after: str | None = None sort: Literal["name", "created_at", "updated_at", "size", "last_access_time"] = ( "created_at" diff --git a/app/assets/api/schemas_out.py b/app/assets/api/schemas_out.py index e6b0df0d9..6b2a82438 100644 --- a/app/assets/api/schemas_out.py +++ b/app/assets/api/schemas_out.py @@ -43,6 +43,8 @@ class AssetsList(BaseModel): assets: list[Asset] total: int has_more: bool + # Opaque cursor for the next page. Omitted when there are no more results. + next_cursor: str | None = None class TagUsage(BaseModel): diff --git a/app/assets/database/queries/asset_reference.py b/app/assets/database/queries/asset_reference.py index 2ef5c210e..e0a01e871 100644 --- a/app/assets/database/queries/asset_reference.py +++ b/app/assets/database/queries/asset_reference.py @@ -266,9 +266,18 @@ def list_references_page( metadata_filter: dict | None = None, sort: str | None = None, order: str | None = None, + after_cursor_value: object | None = None, + after_cursor_id: str | None = None, ) -> tuple[list[AssetReference], dict[str, list[str]], int]: """List references with pagination, filtering, and sorting. + When ``after_cursor_value``/``after_cursor_id`` are supplied the query uses + keyset pagination — ``offset`` is ignored and a WHERE clause selects rows + strictly after the given ``(sort_col, id)`` position in the active sort + direction. The cursor value must already be typed for the column + (datetime for time sorts, int for size, str for name); the caller decodes + the opaque cursor string and resolves to the typed value. + Returns (references, tag_map, total_count). """ base = ( @@ -297,9 +306,31 @@ def list_references_page( "size": Asset.size_bytes, } sort_col = sort_map.get(sort, AssetReference.created_at) - sort_exp = sort_col.desc() if order == "desc" else sort_col.asc() + descending = order == "desc" - base = base.order_by(sort_exp).limit(limit).offset(offset) + # Keyset WHERE: (sort_col, id) strictly less-than / greater-than the cursor. + # Equivalent to: sort_col v OR (sort_col = v AND id cursor_id). + if after_cursor_value is not None and after_cursor_id is not None: + if descending: + keyset = sa.or_( + sort_col < after_cursor_value, + sa.and_(sort_col == after_cursor_value, AssetReference.id < after_cursor_id), + ) + else: + keyset = sa.or_( + sort_col > after_cursor_value, + sa.and_(sort_col == after_cursor_value, AssetReference.id > after_cursor_id), + ) + base = base.where(keyset) + + # Secondary ORDER BY id (matching the primary direction) gives the keyset + # comparison a deterministic tiebreaker on duplicate sort_col values. + id_exp = AssetReference.id.desc() if descending else AssetReference.id.asc() + sort_exp = sort_col.desc() if descending else sort_col.asc() + + base = base.order_by(sort_exp, id_exp).limit(limit) + if after_cursor_id is None: + base = base.offset(offset) count_stmt = ( select(sa.func.count()) diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index 5aefd9956..1072c95fa 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -1,8 +1,19 @@ import contextlib import mimetypes import os +from datetime import timezone from typing import Sequence +from app.assets.services.cursor import ( + CursorPayload, + InvalidCursorError, + decode_cursor, + decode_cursor_int, + decode_cursor_time, + encode_cursor, + encode_cursor_from_time, +) + from app.assets.database.models import Asset from app.assets.database.queries import ( @@ -242,6 +253,11 @@ def get_asset_by_hash(asset_hash: str) -> AssetData | None: return extract_asset_data(asset) +# Sort fields that support cursor pagination. `last_access_time` is not +# in this list — it falls back to offset/limit. +_CURSOR_SORT_FIELDS = ("created_at", "updated_at", "name", "size") + + def list_assets_page( owner_id: str = "", include_tags: Sequence[str] | None = None, @@ -252,7 +268,39 @@ def list_assets_page( offset: int = 0, sort: str = "created_at", order: str = "desc", + after: str | None = None, ) -> ListAssetsResult: + """List assets with optional cursor pagination. + + When ``after`` is supplied it overrides ``offset``. The cursor's sort field + must match ``sort`` and be in the cursor-supported allowlist; mismatches + raise InvalidCursorError so the handler can map to 400 INVALID_CURSOR. + """ + cursor_value: object | None = None + cursor_id: str | None = None + # Mint next_cursor on every page where the sort is cursor-supported, not + # only when the request itself arrived with a cursor. Otherwise a first + # request (no `after`) returns next_cursor=None and the client can never + # enter cursor mode. + mint_cursor = sort in _CURSOR_SORT_FIELDS + + if after is not None: + if sort not in _CURSOR_SORT_FIELDS: + raise InvalidCursorError( + f"cursor pagination is not supported for sort={sort!r}" + ) + payload = decode_cursor(after, _CURSOR_SORT_FIELDS, expected_order=order) + if payload.sort_field != sort: + raise InvalidCursorError( + f"cursor sort field {payload.sort_field!r} does not match request sort {sort!r}" + ) + cursor_value, cursor_id = _resolve_cursor_value(payload), payload.id + + # Over-fetch by one row so we can distinguish "exactly `limit` rows total + # remaining" from "more rows past this page" without a second query. Drop + # the sentinel before returning. + fetch_limit = limit + 1 if mint_cursor else limit + with create_session() as session: refs, tag_map, total = list_references_page( session, @@ -261,12 +309,22 @@ def list_assets_page( exclude_tags=exclude_tags, name_contains=name_contains, metadata_filter=metadata_filter, - limit=limit, + limit=fetch_limit, offset=offset, sort=sort, order=order, + after_cursor_value=cursor_value, + after_cursor_id=cursor_id, ) + next_cursor: str | None = None + if mint_cursor and len(refs) > limit: + # There's at least one more row past this page — mint a cursor from + # the last row of the page (i.e. index `limit - 1`, since we + # over-fetched), and drop the sentinel. + next_cursor = _encode_next_cursor(refs[limit - 1], sort, order) + refs = refs[:limit] + items: list[AssetSummaryData] = [] for ref in refs: items.append( @@ -277,7 +335,39 @@ def list_assets_page( ) ) - return ListAssetsResult(items=items, total=total) + return ListAssetsResult(items=items, total=total, next_cursor=next_cursor) + + +def _resolve_cursor_value(payload: CursorPayload) -> object: + """Map a decoded cursor payload to a column-typed Python value.""" + if payload.sort_field in ("created_at", "updated_at"): + # DB stores naive UTC; strip tzinfo so the comparison binds against a + # `TIMESTAMP WITHOUT TIME ZONE` column without an offset shift. + return decode_cursor_time(payload).replace(tzinfo=None) + if payload.sort_field == "size": + return decode_cursor_int(payload) + return payload.value # name, str-typed + + +def _encode_next_cursor(ref, sort: str, order: str) -> str | None: + """Mint a cursor pointing at *ref* for the given sort dimension. + + Returns None when the boundary row carries a NULL sort value (e.g. an asset + record whose size_bytes hasn't been backfilled). Continuing pagination + across a NULL boundary is undefined under keyset ordering — better to + truncate cleanly here than to mint a cursor that mis-positions. + """ + if sort == "name": + return encode_cursor("name", ref.name, ref.id, order=order) + if sort == "size": + if ref.asset is None or ref.asset.size_bytes is None: + return None + return encode_cursor("size", str(ref.asset.size_bytes), ref.id, order=order) + # created_at / updated_at — DB datetimes are naive UTC; attach tz before encoding. + value = ref.created_at if sort == "created_at" else ref.updated_at + if value is None: + return None + return encode_cursor_from_time(sort, value.replace(tzinfo=timezone.utc), ref.id, order=order) def resolve_hash_to_path( diff --git a/app/assets/services/cursor.py b/app/assets/services/cursor.py new file mode 100644 index 000000000..41ad99573 --- /dev/null +++ b/app/assets/services/cursor.py @@ -0,0 +1,225 @@ +"""Opaque keyset-pagination cursor for /api/assets. + +Payload JSON uses short keys to keep the encoded length small: + + {"s": , "v": , "id": , "o": } + +The `o` key binds the cursor to the sort direction it was minted under, +so replaying a `desc` cursor against an `asc` request fails with +``INVALID_CURSOR`` rather than silently walking the wrong direction. +`o` is mandatory on every payload — a cursor without it is rejected as +malformed. + +Encoding is base64url with no padding. JSON serialization escapes `<`, +`>`, `&`, U+2028, and U+2029 in encoded string values so asset names +containing those characters produce a stable, byte-identical wire form +across any compatible implementation of the same payload format. + +Time values are serialized as Unix microseconds (UTC) — microsecond +precision is sufficient to round-trip the timestamps stored by the +database without rounding rows in the same millisecond bucket. +""" +from __future__ import annotations + +import base64 +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Iterable, Optional + + +class InvalidCursorError(ValueError): + """Raised on a malformed, oversized, or unsupported-sort-field cursor. + + Map to a 400 response with code ``INVALID_CURSOR`` at the handler. + """ + + +# Wire-format length caps. Cursors are user-controlled, so caps protect the +# decode path from oversized allocations and downstream SQL predicates from +# unbounded strings. +# +# MAX_CURSOR_VALUE_LENGTH is 512 to fit the `AssetReference.name` column max +# (`String(512)`) — otherwise a long-named asset would mint a cursor the same +# server then refuses on the next request. +MAX_ENCODED_CURSOR_LENGTH = 1024 +MAX_CURSOR_VALUE_LENGTH = 512 +MAX_CURSOR_ID_LENGTH = 128 + + +@dataclass(frozen=True) +class CursorPayload: + sort_field: str + value: str + id: str + order: str + + +_VALID_ORDERS = ("asc", "desc") + + +def encode_cursor(sort_field: str, value: str, id: str, order: str = "desc") -> str: + """Encode a cursor payload as a base64url (no-padding) string. + + `order` binds the cursor to the sort direction it was minted under so a + later request with a flipped `order` query parameter is rejected with + ``INVALID_CURSOR`` rather than silently walking the wrong direction. + """ + if order not in _VALID_ORDERS: + raise InvalidCursorError(f"order must be one of {_VALID_ORDERS}, got {order!r}") + # Symmetric input validation: the encoder must reject anything the + # decoder rejects, or the same server will mint cursors it then 400s on + # the next request. + if not id: + raise InvalidCursorError("id must be non-empty") + if len(id) > MAX_CURSOR_ID_LENGTH: + raise InvalidCursorError("id exceeds maximum length") + if len(value) > MAX_CURSOR_VALUE_LENGTH: + raise InvalidCursorError("value exceeds maximum length") + payload = {"s": sort_field, "v": value, "id": id, "o": order} + raw = json.dumps(payload, separators=(",", ":"), ensure_ascii=False) + # Match the default JSON escaping of HTML-significant characters and JS + # line/paragraph separators (U+2028 / U+2029) so an asset name carrying + # any of them encodes to identical bytes across runtimes. None of these + # characters appear in JSON structural syntax, so a global replace on the + # serialized output can only touch encoded values. Use explicit \uXXXX + # escapes for U+2028 / U+2029 so the source survives any editor / git + # tooling that normalizes invisible separators. + raw = ( + raw.replace("<", "\\u003c") + .replace(">", "\\u003e") + .replace("&", "\\u0026") + .replace("\u2028", "\\u2028") + .replace("\u2029", "\\u2029") + ) + encoded = base64.urlsafe_b64encode(raw.encode("utf-8")).rstrip(b"=").decode("ascii") + # Final wire-size guard: the per-field caps above are char-counted, but the + # wire cap applies to the base64url of the UTF-8-encoded, escape-expanded + # payload. A value full of multibyte or HTML-significant characters (e.g. + # 512 \u00d7 "\u00e9" or 512 \u00d7 "<") inflates well past MAX_ENCODED_CURSOR_LENGTH even + # though it passes the char-count check. Refuse to mint a cursor the decoder + # on the next request would reject. + if len(encoded) > MAX_ENCODED_CURSOR_LENGTH: + raise InvalidCursorError("encoded cursor exceeds maximum length") + return encoded + + +def encode_cursor_from_time(sort_field: str, t: datetime, id: str, order: str = "desc") -> str: + """Encode a time-typed cursor at Unix microsecond precision. + + Accepts an aware datetime (any timezone) and normalizes to UTC. Naive + datetimes are rejected so callers can't accidentally encode the local + wall-clock value of a UTC-stored timestamp. + """ + if t.tzinfo is None: + raise ValueError("encode_cursor_from_time requires an aware datetime") + micros = _datetime_to_unix_micros(t.astimezone(timezone.utc)) + return encode_cursor(sort_field, str(micros), id, order=order) + + +def decode_cursor( + cursor: str, + allowed_sort_fields: Iterable[str], + expected_order: str | None = None, +) -> CursorPayload: + """Parse an opaque cursor. + + ``allowed_sort_fields`` is the endpoint's accepted sort-field list — a + cursor carrying a field outside this set is rejected so a cursor minted + for one column can't be replayed against another (e.g. a ``created_at`` + timestamp string compared against a ``name`` column). + + ``expected_order`` (``"asc"``/``"desc"``), when supplied, must match the + payload's ``o`` field. ``o`` is required on every payload; a cursor + missing it is rejected as malformed. + + Passing no allowed fields rejects every cursor. + """ + if len(cursor) > MAX_ENCODED_CURSOR_LENGTH: + raise InvalidCursorError("cursor exceeds maximum length") + + try: + # urlsafe_b64decode requires correct padding; we strip on encode, so + # restore the trailing '=' pad here. + padding = "=" * (-len(cursor) % 4) + raw = base64.urlsafe_b64decode(cursor + padding) + except (ValueError, base64.binascii.Error) as e: + raise InvalidCursorError(f"encoding: {e}") from e + + try: + decoded = json.loads(raw) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + raise InvalidCursorError(f"payload: {e}") from e + + if not isinstance(decoded, dict): + raise InvalidCursorError("payload: expected object") + + sort_field = decoded.get("s") + value = decoded.get("v") + id = decoded.get("id") + order = decoded.get("o") + + if not isinstance(sort_field, str) or not isinstance(value, str) or not isinstance(id, str): + raise InvalidCursorError("payload: missing or non-string s/v/id") + + if id == "": + raise InvalidCursorError("missing id") + if len(id) > MAX_CURSOR_ID_LENGTH: + raise InvalidCursorError("id exceeds maximum length") + if len(value) > MAX_CURSOR_VALUE_LENGTH: + raise InvalidCursorError("value exceeds maximum length") + + if sort_field not in allowed_sort_fields: + raise InvalidCursorError(f"unsupported sort field {sort_field!r}") + + if not isinstance(order, str): + raise InvalidCursorError("missing or non-string o") + if order not in _VALID_ORDERS: + raise InvalidCursorError(f"unsupported order {order!r}") + if expected_order is not None and order != expected_order: + raise InvalidCursorError( + f"cursor order {order!r} does not match request order {expected_order!r}" + ) + + return CursorPayload(sort_field=sort_field, value=value, id=id, order=order) + + +def decode_cursor_time(payload: Optional[CursorPayload]) -> datetime: + """Parse a time-typed cursor value as Unix microseconds, returning UTC.""" + if payload is None: + raise InvalidCursorError("nil cursor payload") + try: + micros = int(payload.value) + except ValueError as e: + raise InvalidCursorError(f"value is not a valid timestamp: {e}") from e + try: + return _unix_micros_to_datetime(micros) + except (OverflowError, OSError, ValueError) as e: + # Crafted out-of-range microseconds (e.g. > datetime.MAX_YEAR) blow up + # in fromtimestamp / datetime construction. Map to 400, not 500. + raise InvalidCursorError(f"value is out of representable range: {e}") from e + + +def decode_cursor_int(payload: Optional[CursorPayload]) -> int: + """Parse a cursor value as a base-10 integer.""" + if payload is None: + raise InvalidCursorError("nil cursor payload") + try: + return int(payload.value) + except ValueError as e: + raise InvalidCursorError(f"value is not a valid integer: {e}") from e + + +_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) + + +def _datetime_to_unix_micros(t: datetime) -> int: + """Convert an aware UTC datetime to Unix microseconds (integer math).""" + delta = t - _EPOCH + return (delta.days * 86_400 + delta.seconds) * 1_000_000 + delta.microseconds + + +def _unix_micros_to_datetime(micros: int) -> datetime: + """Convert Unix microseconds to a UTC datetime, preserving precision.""" + seconds, micro_remainder = divmod(micros, 1_000_000) + return datetime.fromtimestamp(seconds, tz=timezone.utc).replace(microsecond=micro_remainder) diff --git a/app/assets/services/schemas.py b/app/assets/services/schemas.py index 0eb128f58..fb304fca7 100644 --- a/app/assets/services/schemas.py +++ b/app/assets/services/schemas.py @@ -71,6 +71,7 @@ class AssetSummaryData: class ListAssetsResult: items: list[AssetSummaryData] total: int + next_cursor: str | None = None @dataclass(frozen=True) diff --git a/openapi.yaml b/openapi.yaml index 81d34ee83..6eb5e2d1d 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -1517,6 +1517,22 @@ paths: schema: type: integer default: 0 + description: | + Offset-based pagination. Cursor pagination via `after` is preferred + for sequential walks (stable across concurrent inserts/deletes) but + `offset` remains fully supported for random access (jump-to-page + UIs, "showing items X–Y of N" displays). When both are supplied, + `after` wins and `offset` is ignored. + - name: after + in: query + schema: + type: string + description: | + Opaque cursor for keyset pagination. Pass the `next_cursor` value + from a previous response to fetch the next page. Stable across + inserts/deletes between pages. Supported with `sort` values + `created_at`, `updated_at`, `name`, and `size`. Malformed or + unsupported cursors return 400 with `INVALID_CURSOR`. - name: include_tags in: query schema: @@ -1575,6 +1591,12 @@ paths: application/json: schema: $ref: "#/components/schemas/ListAssetsResponse" + "400": + description: Malformed query or cursor (e.g. `INVALID_CURSOR`) + content: + application/json: + schema: + $ref: "#/components/schemas/AssetsApiError" post: operationId: createAsset tags: [assets] @@ -6761,6 +6783,42 @@ components: type: integer has_more: type: boolean + next_cursor: + type: string + description: | + Opaque cursor to fetch the next page. Pass back as the `after` + query parameter. Omitted when there are no more results. + + AssetsApiError: + type: object + description: Error envelope returned by the assets API on 400 responses. + required: + - error + properties: + error: + type: object + required: + - code + - message + - details + properties: + code: + type: string + description: | + Machine-readable error code. `INVALID_CURSOR` is returned when the + `after` cursor is malformed, oversized, or its sort field does + not match the request's `sort`. `INVALID_QUERY` covers other + Pydantic validation failures. + enum: [INVALID_CURSOR, INVALID_QUERY] + message: + type: string + details: + type: object + description: | + Free-form, code-specific context. `INVALID_QUERY` populates this + with an `errors` array of Pydantic validation entries; + `INVALID_CURSOR` returns an empty object. + additionalProperties: true TagInfo: type: object diff --git a/tests-unit/assets_test/queries/test_asset_reference_keyset.py b/tests-unit/assets_test/queries/test_asset_reference_keyset.py new file mode 100644 index 000000000..d143d60f9 --- /dev/null +++ b/tests-unit/assets_test/queries/test_asset_reference_keyset.py @@ -0,0 +1,112 @@ +"""Keyset-pagination tiebreaker tests for list_references_page. + +When multiple rows share the same primary sort value (e.g. four assets +created in the same microsecond), the secondary `ORDER BY id` is what keeps +keyset pagination from losing or repeating rows. This file exercises that +branch directly against an in-memory SQLite session — engineering identical +timestamps via HTTP is unreliable enough that we work at the query layer. +""" +import uuid +from datetime import datetime + +import pytest +from sqlalchemy.orm import Session + +from app.assets.database.models import Asset, AssetReference +from app.assets.database.queries.asset_reference import list_references_page + + +def _make_ref(session: Session, created_at: datetime, name: str, owner: str = "") -> AssetReference: + asset = Asset(hash=f"blake3:{uuid.uuid4().hex}", size_bytes=1024) + session.add(asset) + session.flush() + ref = AssetReference( + id=str(uuid.uuid4()), + asset_id=asset.id, + owner_id=owner, + name=name, + file_path=f"/tmp/{name}", + created_at=created_at, + updated_at=created_at, + last_access_time=created_at, + is_missing=False, + ) + session.add(ref) + return ref + + +@pytest.mark.parametrize("order", ["desc", "asc"]) +def test_tiebreaker_walks_duplicate_sort_values(session: Session, order: str): + """Four rows with the SAME created_at must paginate cleanly under cursor + mode — no row dropped, no row repeated, despite the primary sort column + being non-discriminating. + """ + shared_ts = datetime(2024, 5, 20, 12, 0, 0) # naive UTC, like the DB stores + refs = [_make_ref(session, shared_ts, f"tie_{i}.png") for i in range(4)] + session.commit() + + expected_ids = sorted([r.id for r in refs], reverse=(order == "desc")) + + # Walk the cursor by hand: page size 2, take 3 pages (2 + 2 + 0). + seen: list[str] = [] + after_value = None + after_id = None + for _ in range(4): # generous loop bound; ought to be 2 iterations + page, _tag_map, _total = list_references_page( + session, + limit=2, + sort="created_at", + order=order, + after_cursor_value=after_value, + after_cursor_id=after_id, + ) + if not page: + break + seen.extend(p.id for p in page) + # Use the last row's (created_at, id) as the next cursor input. + last = page[-1] + after_value, after_id = last.created_at, last.id + if len(page) < 2: + break + + assert seen == expected_ids, ( + f"keyset tiebreaker failed for order={order}: expected {expected_ids}, got {seen}" + ) + + +def test_tiebreaker_no_duplicates_under_mixed_collisions(session: Session): + """Some rows share a timestamp, some don't. The cursor must still walk + every row exactly once regardless of where ties sit relative to a + page boundary.""" + t1 = datetime(2024, 5, 20, 12, 0, 0) + t2 = datetime(2024, 5, 20, 12, 0, 1) + layout = [t1, t1, t1, t2, t2] # three rows at t1, two at t2 + refs = [_make_ref(session, ts, f"mix_{i}.png") for i, ts in enumerate(layout)] + session.commit() + + all_ids = {r.id for r in refs} + seen_set: set[str] = set() + seen_list: list[str] = [] + after_value = None + after_id = None + for _ in range(6): + page, _, _ = list_references_page( + session, + limit=2, + sort="created_at", + order="desc", + after_cursor_value=after_value, + after_cursor_id=after_id, + ) + if not page: + break + for p in page: + assert p.id not in seen_set, f"duplicate row {p.id} appeared in cursor walk" + seen_set.add(p.id) + seen_list.append(p.id) + last = page[-1] + after_value, after_id = last.created_at, last.id + if len(page) < 2: + break + + assert seen_set == all_ids, f"missing rows: expected {all_ids}, got {seen_set}" diff --git a/tests-unit/assets_test/services/test_cursor.py b/tests-unit/assets_test/services/test_cursor.py new file mode 100644 index 000000000..db114ad47 --- /dev/null +++ b/tests-unit/assets_test/services/test_cursor.py @@ -0,0 +1,354 @@ +"""Tests for app.assets.services.cursor. + +The byte-identity fixtures below pin the wire format so a parallel +implementation in another runtime can mint exchange-compatible cursors +for the same payload. Drift here would break frontend pagination against +any compatible backend. +""" +from __future__ import annotations + +import base64 +from datetime import datetime, timedelta, timezone + +import pytest + +from app.assets.services.cursor import ( + MAX_CURSOR_ID_LENGTH, + MAX_CURSOR_VALUE_LENGTH, + MAX_ENCODED_CURSOR_LENGTH, + CursorPayload, + InvalidCursorError, + decode_cursor, + decode_cursor_int, + decode_cursor_time, + encode_cursor, + encode_cursor_from_time, +) + + +ALLOWED = ("created_at", "updated_at", "name", "size") + + +class TestRoundTrip: + @pytest.mark.parametrize( + "sort_field, value, id", + [ + ("created_at", "1716200000000000", "a1b2c3d4-e5f6-7a89-b0c1-d2e3f4a5b6c7"), + ("size", "1024", "asset-123"), + ("name", "my-asset.png", "asset-abc"), + ("name", "résumé.txt", "asset-uni"), + ], + ) + def test_encode_decode(self, sort_field, value, id): + encoded = encode_cursor(sort_field, value, id) + assert encoded != "" + payload = decode_cursor(encoded, ALLOWED) + assert payload.sort_field == sort_field + assert payload.value == value + assert payload.id == id + + +class TestTimeCursor: + def test_microsecond_precision_preserved(self): + # Pick a time with non-zero microseconds — encoding at ms would lose the µs. + ts = datetime(2024, 5, 20, 12, 53, 20, 123456, tzinfo=timezone.utc) + encoded = encode_cursor_from_time("created_at", ts, "id-1") + payload = decode_cursor(encoded, ALLOWED) + # Value must be a microsecond integer string, not a millisecond one. + assert payload.value == "1716209600123456" + decoded = decode_cursor_time(payload) + assert decoded == ts + + def test_decode_returns_utc(self): + payload = CursorPayload(sort_field="created_at", value="1716200000123456", id="id-1", order="desc") + decoded = decode_cursor_time(payload) + assert decoded.tzinfo == timezone.utc + + def test_naive_datetime_rejected_on_encode(self): + naive = datetime(2024, 5, 20, 12, 0, 0) + with pytest.raises(ValueError): + encode_cursor_from_time("created_at", naive, "id-1") + + def test_non_integer_value_rejected_on_decode(self): + with pytest.raises(InvalidCursorError): + decode_cursor_time(CursorPayload("created_at", "not-a-number", "id-1", "desc")) + + def test_none_payload_rejected(self): + with pytest.raises(InvalidCursorError): + decode_cursor_time(None) + + def test_non_utc_aware_normalized(self): + # Same instant, different timezone — must encode to the same micros. + utc_ts = datetime(2024, 5, 20, 12, 0, 0, tzinfo=timezone.utc) + offset_ts = utc_ts.astimezone(timezone(timedelta(hours=-5))) + assert encode_cursor_from_time("created_at", utc_ts, "x") == encode_cursor_from_time( + "created_at", offset_ts, "x" + ) + + +class TestIntCursor: + def test_decode_int(self): + assert decode_cursor_int(CursorPayload("size", "1024", "id-1", "desc")) == 1024 + + def test_decode_int_rejects_non_int(self): + with pytest.raises(InvalidCursorError): + decode_cursor_int(CursorPayload("size", "abc", "id-1", "desc")) + + def test_decode_int_rejects_none(self): + with pytest.raises(InvalidCursorError): + decode_cursor_int(None) + + +class TestInvalidInputs: + def test_oversized_cursor(self): + oversized = "a" * (MAX_ENCODED_CURSOR_LENGTH + 1) + with pytest.raises(InvalidCursorError, match="maximum length"): + decode_cursor(oversized, ALLOWED) + + def test_not_base64(self): + with pytest.raises(InvalidCursorError): + decode_cursor("not base64!!!", ALLOWED) + + def test_not_json(self): + encoded = base64.urlsafe_b64encode(b"definitely not json").rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError): + decode_cursor(encoded, ALLOWED) + + def test_empty_id(self): + # Encoder rejects empty id symmetrically with the decoder, so build the + # payload manually to exercise the decoder's missing-id branch. + raw = b'{"s":"created_at","v":"1","id":"","o":"desc"}' + encoded = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError, match="missing id"): + decode_cursor(encoded, ALLOWED) + + def test_oversized_id(self): + # Encoder enforces the cap symmetrically; hand-build to exercise decode. + big_id = "a" * (MAX_CURSOR_ID_LENGTH + 1) + raw = ('{"s":"created_at","v":"1","id":"' + big_id + '","o":"desc"}').encode("ascii") + encoded = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError, match="id exceeds maximum length"): + decode_cursor(encoded, ALLOWED) + + def test_oversized_value(self): + # Encoder enforces the cap symmetrically; hand-build to exercise decode. + big_v = "v" * (MAX_CURSOR_VALUE_LENGTH + 1) + raw = ('{"s":"created_at","v":"' + big_v + '","id":"id-1","o":"desc"}').encode("ascii") + encoded = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError, match="value exceeds maximum length"): + decode_cursor(encoded, ALLOWED) + + def test_unsupported_sort_field(self): + encoded = encode_cursor("execution_time", "1", "id-1") + with pytest.raises(InvalidCursorError, match="unsupported sort field"): + decode_cursor(encoded, ALLOWED) + + def test_no_allowed_fields_rejects_everything(self): + encoded = encode_cursor("created_at", "1", "id-1") + with pytest.raises(InvalidCursorError): + decode_cursor(encoded, ()) + + def test_non_dict_payload_rejected(self): + encoded = base64.urlsafe_b64encode(b'["array","not","dict"]').rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError, match="expected object"): + decode_cursor(encoded, ALLOWED) + + +class TestEncodeAtCapsFits: + def test_max_field_lengths_fit_wire_cap(self): + # Worst-case payload: value and id at their per-field caps, with a long + # sort field name. The encoded cursor must fit within MAX_ENCODED_CURSOR_LENGTH + # so the wire cap cannot reject a cursor the encoder mints at the per-field caps. + value = "v" * MAX_CURSOR_VALUE_LENGTH + id = "i" * MAX_CURSOR_ID_LENGTH + sort_field = "very_long_sort_field_name" + + encoded = encode_cursor(sort_field, value, id) + assert len(encoded) <= MAX_ENCODED_CURSOR_LENGTH + payload = decode_cursor(encoded, (sort_field,)) + assert payload.value == value + assert payload.id == id + + +class TestDatetimeOverflow: + """Crafted cursors with extreme micros must map to InvalidCursorError, + not OverflowError/OSError leaking as 500. + """ + + @pytest.mark.parametrize( + "micros_str", + [ + "999999999999999999999", # 10^21 µs — past datetime.MAX_YEAR by ~14 orders + "-999999999999999999999", # symmetric negative — pre-epoch overflow + ], + ) + def test_out_of_range_micros_rejected(self, micros_str): + encoded = encode_cursor("created_at", micros_str, "asset-x") + payload = decode_cursor(encoded, ALLOWED) + with pytest.raises(InvalidCursorError): + decode_cursor_time(payload) + + +class TestEncoderDecoderSymmetry: + """The encoder must reject inputs the decoder rejects, or the same server + will mint a cursor it then 400s on the next request. + """ + + def test_long_name_within_cap_round_trips(self): + """Assets allow names up to 512 chars (`String(512)`); the cursor + encoder must round-trip a value at that cap so a freshly minted + cursor never fails decode on the next request.""" + long_name = "n" * MAX_CURSOR_VALUE_LENGTH + encoded = encode_cursor("name", long_name, "asset-x") + payload = decode_cursor(encoded, ALLOWED) + assert payload.value == long_name + + def test_encoder_rejects_empty_id(self): + with pytest.raises(InvalidCursorError, match="id must be non-empty"): + encode_cursor("created_at", "1", "") + + def test_encoder_rejects_oversized_id(self): + with pytest.raises(InvalidCursorError, match="id exceeds maximum length"): + encode_cursor("created_at", "1", "a" * (MAX_CURSOR_ID_LENGTH + 1)) + + def test_encoder_rejects_oversized_value(self): + with pytest.raises(InvalidCursorError, match="value exceeds maximum length"): + encode_cursor("name", "v" * (MAX_CURSOR_VALUE_LENGTH + 1), "id-1") + + def test_encoder_rejects_multibyte_value_over_wire_cap(self): + """A value that passes the char-count cap can still inflate past the + wire cap once UTF-8-encoded. Asset name made of 512 × multibyte + characters (e.g. 'é' = 2 bytes) must be rejected at encode time, not + minted into a cursor the next request will 400.""" + with pytest.raises(InvalidCursorError, match="encoded cursor exceeds maximum length"): + encode_cursor("name", "é" * MAX_CURSOR_VALUE_LENGTH, "asset-multibyte") + + def test_encoder_rejects_escape_heavy_value_over_wire_cap(self): + """Same wire-cap concern via escape expansion: each `<` serializes to + the six-byte sequence `\\u003c`, so 512 of them blow past the encoded + cap even though the raw char count is within the per-field limit.""" + with pytest.raises(InvalidCursorError, match="encoded cursor exceeds maximum length"): + encode_cursor("name", "<" * MAX_CURSOR_VALUE_LENGTH, "asset-escape") + + +class TestOrderBinding: + def test_order_baked_into_payload(self): + encoded = encode_cursor("created_at", "1", "id-1", order="asc") + payload = decode_cursor(encoded, ALLOWED) + assert payload.order == "asc" + + def test_mismatched_order_rejected(self): + encoded = encode_cursor("created_at", "1", "id-1", order="desc") + with pytest.raises(InvalidCursorError, match="does not match request order"): + decode_cursor(encoded, ALLOWED, expected_order="asc") + + def test_matching_order_accepted(self): + encoded = encode_cursor("created_at", "1", "id-1", order="desc") + payload = decode_cursor(encoded, ALLOWED, expected_order="desc") + assert payload.order == "desc" + + def test_invalid_order_token_rejected_on_encode(self): + with pytest.raises(ValueError): + encode_cursor("created_at", "1", "id-1", order="sideways") + + def test_invalid_order_token_rejected_on_decode(self): + # Hand-craft a payload with an illegal `o` value. + raw = b'{"s":"name","v":"x","id":"id-1","o":"sideways"}' + encoded = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError, match="unsupported order"): + decode_cursor(encoded, ALLOWED) + + def test_cursor_without_order_rejected(self): + """`o` is mandatory. A cursor minted without it is rejected as + malformed rather than silently walking the keyset in whatever + direction the request happens to ask for.""" + raw = b'{"s":"name","v":"x","id":"id-1"}' + encoded = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + with pytest.raises(InvalidCursorError, match="missing or non-string o"): + decode_cursor(encoded, ALLOWED, expected_order="desc") + + +class TestHtmlSignificantCharEscaping: + """An asset name containing `<`, `>`, `&`, U+2028, or U+2029 must encode + to the same escaped wire bytes as any compatible implementation of the + same payload format. Drift here breaks cross-runtime byte-identity for + those characters. + """ + + @pytest.mark.parametrize( + "value, escaped_substring", + [ + ("foo.png", "\\u003c"), # `<` escaped + ("foo.png", "\\u003e"), # `>` escaped + ("foo&bar.png", "\\u0026"), + ("foo
bar.png", "\\u2028"), # JS line separator + ("foo
bar.png", "\\u2029"), # JS paragraph separator + ], + ) + def test_html_significant_chars_escaped(self, value, escaped_substring): + encoded = encode_cursor("name", value, "id-1") + decoded_bytes = base64.urlsafe_b64decode(encoded + "=" * (-len(encoded) % 4)) + assert escaped_substring in decoded_bytes.decode("ascii"), ( + f"Expected {escaped_substring!r} in serialized payload, got: {decoded_bytes!r}" + ) + + def test_value_round_trips_through_escape(self): + """Encoding then decoding a value with `<>&` should yield the original + string — the escape only affects the wire form, not the decoded value.""" + original = "foo<&>bar.png" + encoded = encode_cursor("name", original, "id-1") + payload = decode_cursor(encoded, ALLOWED) + assert payload.value == original + + +class TestByteIdentityFixtures: + """Pin the wire format so it doesn't drift silently. + + These fixtures assert exact byte equality of the encoded JSON payload — + a change in key order, escape choice, separator whitespace, or anything + else that shifts a byte fails the test loudly rather than diverging + silently from any external consumer of the same payload format. + """ + + @pytest.mark.parametrize( + "sort_field, value, id, order, expected_payload", + [ + ( + "created_at", + "1716200000000000", + "a1b2c3d4-e5f6-7a89-b0c1-d2e3f4a5b6c7", + "desc", + '{"s":"created_at","v":"1716200000000000","id":"a1b2c3d4-e5f6-7a89-b0c1-d2e3f4a5b6c7","o":"desc"}', + ), + ( + "size", + "1024", + "asset-123", + "asc", + '{"s":"size","v":"1024","id":"asset-123","o":"asc"}', + ), + ( + "name", + "my-asset.png", + "asset-abc", + "desc", + '{"s":"name","v":"my-asset.png","id":"asset-abc","o":"desc"}', + ), + ( + "name", + "foo&baz.png", + "asset-html", + "desc", + # `<`, `>`, `&` escape to <, >, & in the value. + '{"s":"name","v":"foo\\u003cbar\\u003e\\u0026baz.png","id":"asset-html","o":"desc"}', + ), + ], + ) + def test_encoded_payload_shape_pinned(self, sort_field, value, id, order, expected_payload): + encoded = encode_cursor(sort_field, value, id, order=order) + decoded_bytes = base64.urlsafe_b64decode(encoded + "=" * (-len(encoded) % 4)) + assert decoded_bytes.decode("utf-8") == expected_payload, ( + f"wire format drifted for sort={sort_field!r}, value={value!r}:\n" + f" expected: {expected_payload!r}\n" + f" actual: {decoded_bytes.decode('utf-8')!r}" + ) diff --git a/tests-unit/assets_test/test_list_cursor.py b/tests-unit/assets_test/test_list_cursor.py new file mode 100644 index 000000000..a37019fd6 --- /dev/null +++ b/tests-unit/assets_test/test_list_cursor.py @@ -0,0 +1,349 @@ +"""Integration tests for cursor-based pagination on GET /api/assets. + +These tests exercise the handler/service/query path end-to-end; +cursor-encoding-level tests live in +tests-unit/assets_test/services/test_cursor.py. +""" +import pytest +import requests + + +def _seed(asset_factory, make_asset_bytes, count: int, tag: str) -> list[str]: + names = [f"cursor_{i:02d}.safetensors" for i in range(count)] + for n in names: + asset_factory( + n, + ["models", "checkpoints", "unit-tests", tag], + {}, + make_asset_bytes(n, size=2048), + ) + return sorted(names) + + +def test_cursor_pages_all_items_in_order(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + names = _seed(asset_factory, make_asset_bytes, count=5, tag="cursor-walk") + + params = { + "include_tags": "unit-tests,cursor-walk", + "sort": "name", + "order": "asc", + "limit": "2", + } + + seen: list[str] = [] + after: str | None = None + pages = 0 + while True: + page_params = dict(params) + if after is not None: + page_params["after"] = after + r = http.get(api_base + "/api/assets", params=page_params, timeout=120) + assert r.status_code == 200, r.text + body = r.json() + seen.extend(a["name"] for a in body["assets"]) + pages += 1 + after = body.get("next_cursor") + if after is None: + break + assert body["has_more"] is True + assert pages < 10, "guard against runaway cursor loop" + + assert seen == names, f"expected {names}, got {seen}" + # Last page should have has_more False + assert body["has_more"] is False + assert "next_cursor" not in body + + +def test_cursor_invalid_returns_400(http: requests.Session, api_base: str): + r = http.get( + api_base + "/api/assets", + params={"after": "not-a-real-cursor", "sort": "created_at"}, + timeout=120, + ) + assert r.status_code == 400, r.text + body = r.json() + assert body["error"]["code"] == "INVALID_CURSOR" + + +def test_cursor_sort_mismatch_returns_400(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + _seed(asset_factory, make_asset_bytes, count=2, tag="cursor-mismatch") + + # Take a real cursor minted for sort=name. + r = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-mismatch", + "sort": "name", + "order": "asc", + "limit": "1", + }, + timeout=120, + ) + assert r.status_code == 200 + cursor = r.json()["next_cursor"] + assert cursor is not None + + # Replay against sort=created_at — should fail with INVALID_CURSOR. + r2 = http.get( + api_base + "/api/assets", + params={"after": cursor, "sort": "created_at"}, + timeout=120, + ) + assert r2.status_code == 400, r2.text + assert r2.json()["error"]["code"] == "INVALID_CURSOR" + + +def test_cursor_wins_over_offset(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + names = _seed(asset_factory, make_asset_bytes, count=4, tag="cursor-vs-offset") + + # Take a cursor that points past the first item. + r = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-vs-offset", + "sort": "name", + "order": "asc", + "limit": "1", + }, + timeout=120, + ) + assert r.status_code == 200, r.text + cursor = r.json()["next_cursor"] + assert cursor is not None + + # Pass both 'after' and a large offset. Cursor must win; offset is ignored. + r2 = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-vs-offset", + "sort": "name", + "order": "asc", + "limit": "1", + "after": cursor, + "offset": "999", + }, + timeout=120, + ) + assert r2.status_code == 200 + body = r2.json() + # Should land on the second name in sorted order — not skip ahead by 999. + assert [a["name"] for a in body["assets"]] == [names[1]] + + +def test_next_cursor_absent_when_no_more_results(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + _seed(asset_factory, make_asset_bytes, count=2, tag="cursor-exhaust") + + r = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-exhaust", + "sort": "name", + "order": "asc", + "limit": "50", + }, + timeout=120, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["has_more"] is False + assert "next_cursor" not in body + + +def test_cursor_pagination_first_page_mints_cursor(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """First-page request (no `after`) must still return `next_cursor` when + more rows exist, or pagination is unreachable from a cold start. + """ + _seed(asset_factory, make_asset_bytes, count=3, tag="cursor-first-page") + r = http.get( + api_base + "/api/assets", + params={"include_tags": "unit-tests,cursor-first-page", "sort": "name", "order": "asc", "limit": "2"}, + timeout=120, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["has_more"] is True + assert body.get("next_cursor"), "first page must mint a cursor when more rows exist" + + +def test_cursor_no_spurious_cursor_when_page_size_equals_remainder(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """When `total` is an exact multiple of `limit`, the final page must + NOT carry a next_cursor — there is nothing past it. + """ + _seed(asset_factory, make_asset_bytes, count=4, tag="cursor-exact-multiple") + # Page 1 + r = http.get( + api_base + "/api/assets", + params={"include_tags": "unit-tests,cursor-exact-multiple", "sort": "name", "order": "asc", "limit": "2"}, + timeout=120, + ) + assert r.status_code == 200, r.text + cursor = r.json()["next_cursor"] + assert cursor is not None + # Page 2 — should exhaust the set with no cursor for a phantom page 3 + r2 = http.get( + api_base + "/api/assets", + params={"include_tags": "unit-tests,cursor-exact-multiple", "sort": "name", "order": "asc", "limit": "2", "after": cursor}, + timeout=120, + ) + assert r2.status_code == 200, r2.text + body = r2.json() + assert len(body["assets"]) == 2 + assert body["has_more"] is False + assert "next_cursor" not in body + + +@pytest.mark.parametrize("sort_field", ["created_at", "updated_at", "size"]) +def test_cursor_walks_for_non_name_sorts(sort_field, http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """Cursor pagination must work for every sort field the contract claims. + + Without this, the `created_at` / `updated_at` (time-encoded micros) and + `size` (int-encoded) cursor paths go entirely unexercised end-to-end. + """ + # Sizes increase strictly by index, so `size desc` has a deterministic + # expected order. Time-based sorts (created_at / updated_at) can tie when + # rows are inserted faster than the DB's timestamp resolution; for those + # we check coverage and no-duplicates and let the keyset tiebreaker do + # the rest, instead of sleeping between inserts and asserting an order + # that depends on clock granularity. + names = [] + for i in range(4): + n = f"cursor_{sort_field}_{i:02d}.safetensors" + asset_factory(n, ["models", "checkpoints", "unit-tests", f"cursor-{sort_field}"], {}, make_asset_bytes(n, size=2048 + i)) + names.append(n) + + params = { + "include_tags": f"unit-tests,cursor-{sort_field}", + "sort": sort_field, + "order": "desc", + "limit": "2", + } + seen: list[str] = [] + after: str | None = None + pages = 0 + while True: + page_params = dict(params) + if after is not None: + page_params["after"] = after + r = http.get(api_base + "/api/assets", params=page_params, timeout=120) + assert r.status_code == 200, r.text + body = r.json() + seen.extend(a["name"] for a in body["assets"]) + after = body.get("next_cursor") + pages += 1 + if after is None: + break + assert pages < 10, "guard against runaway cursor loop" + + # No duplicates: a faulty keyset boundary that returns the same row across + # two pages must fail this check. + assert len(seen) == len(set(seen)), ( + f"cursor walk repeated rows for sort={sort_field}: {seen}" + ) + # Full coverage: every seeded asset reached exactly once. + assert set(seen) == set(names), ( + f"missing items for sort={sort_field}: expected {set(names)}, got {set(seen)}" + ) + # Strict order check for the only field with a clock-independent ordering. + if sort_field == "size": + assert seen == list(reversed(names)), ( + f"size cursor walked out of order: got {seen}, expected {list(reversed(names))}" + ) + + +def test_cursor_order_mismatch_returns_400(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + """A cursor minted under desc order replayed against asc must 400, not + silently walk the wrong direction.""" + _seed(asset_factory, make_asset_bytes, count=3, tag="cursor-order-flip") + + r = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-order-flip", + "sort": "name", + "order": "desc", + "limit": "1", + }, + timeout=120, + ) + assert r.status_code == 200, r.text + cursor = r.json()["next_cursor"] + assert cursor is not None + + # Replay with order flipped to asc — server must reject the cursor. + r2 = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-order-flip", + "sort": "name", + "order": "asc", + "limit": "1", + "after": cursor, + }, + timeout=120, + ) + assert r2.status_code == 400, r2.text + assert r2.json()["error"]["code"] == "INVALID_CURSOR" + + +def test_cursor_invalid_cursor_at_microsecond_boundary(http: requests.Session, api_base: str): + """A cursor carrying an out-of-range microsecond timestamp must map to + 400 INVALID_CURSOR, not 500.""" + import base64 + import json + # 10^18 microseconds ≈ year 33658, well past datetime.MAX_YEAR. + # `o` and `order=` must be set; otherwise decode fails earlier on the + # missing-order branch and the µs-overflow path is never exercised. + payload = {"s": "created_at", "o": "desc", "v": "999999999999999999999", "id": "asset-x"} + raw = json.dumps(payload, separators=(",", ":")).encode("utf-8") + cursor = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii") + r = http.get( + api_base + "/api/assets", + params={"after": cursor, "sort": "created_at", "order": "desc"}, + timeout=120, + ) + assert r.status_code == 400, r.text + assert r.json()["error"]["code"] == "INVALID_CURSOR" + + +def test_cursor_pagination_stable_after_delete(http: requests.Session, api_base: str, asset_factory, make_asset_bytes): + names = _seed(asset_factory, make_asset_bytes, count=4, tag="cursor-delete") + + # Page 1. + r = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-delete", + "sort": "name", + "order": "asc", + "limit": "2", + }, + timeout=120, + ) + assert r.status_code == 200 + body = r.json() + page1_names = [a["name"] for a in body["assets"]] + cursor = body["next_cursor"] + assert cursor is not None + assert page1_names == names[:2] + + # Delete an item from page 1 (already returned) — cursor should still + # locate the next page from where it was minted, not re-index. + target_id = body["assets"][0]["id"] + d = http.delete(api_base + f"/api/assets/{target_id}", timeout=120) + assert d.status_code in (200, 204), d.text + + # Page 2 via cursor. + r2 = http.get( + api_base + "/api/assets", + params={ + "include_tags": "unit-tests,cursor-delete", + "sort": "name", + "order": "asc", + "limit": "2", + "after": cursor, + }, + timeout=120, + ) + assert r2.status_code == 200, r2.text + body2 = r2.json() + assert [a["name"] for a in body2["assets"]] == names[2:]