From 39abd769b10b8d95e4c43cecd903d3ee73b73e5c Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Wed, 20 May 2026 12:58:55 -0700 Subject: [PATCH] feat(assets): thread cursor through schemas, service, and query layer list_assets_page accepts an opaque 'after' cursor and returns next_cursor when more pages are available. The query applies a keyset WHERE clause and a secondary ORDER BY id for deterministic tiebreak. Cursor sort field is validated against the request sort, and a last_access_time sort (OSS-only) falls back to offset/limit. Offset is ignored whenever a cursor is supplied. --- app/assets/api/schemas_in.py | 4 ++ app/assets/api/schemas_out.py | 2 + .../database/queries/asset_reference.py | 35 ++++++++- app/assets/services/asset_management.py | 71 ++++++++++++++++++- app/assets/services/schemas.py | 1 + 5 files changed, 110 insertions(+), 3 deletions(-) diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index 186a6ae1e..fdb3902ba 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -59,6 +59,10 @@ class ListAssetsQuery(BaseModel): limit: conint(ge=1, le=500) = 20 offset: conint(ge=0) = 0 + # Opaque keyset cursor. When supplied, `offset` is ignored. Cursor pagination + # is supported for sort values `created_at`, `updated_at`, `name`, `size`; + # `last_access_time` falls back to offset/limit. + after: str | None = None sort: Literal["name", "created_at", "updated_at", "size", "last_access_time"] = ( "created_at" diff --git a/app/assets/api/schemas_out.py b/app/assets/api/schemas_out.py index d99b1098d..49b3a26cd 100644 --- a/app/assets/api/schemas_out.py +++ b/app/assets/api/schemas_out.py @@ -40,6 +40,8 @@ class AssetsList(BaseModel): assets: list[Asset] total: int has_more: bool + # Opaque cursor for the next page. Omitted when there are no more results. + next_cursor: str | None = None class TagUsage(BaseModel): diff --git a/app/assets/database/queries/asset_reference.py b/app/assets/database/queries/asset_reference.py index 8b90ae511..792411800 100644 --- a/app/assets/database/queries/asset_reference.py +++ b/app/assets/database/queries/asset_reference.py @@ -266,9 +266,18 @@ def list_references_page( metadata_filter: dict | None = None, sort: str | None = None, order: str | None = None, + after_cursor_value: object | None = None, + after_cursor_id: str | None = None, ) -> tuple[list[AssetReference], dict[str, list[str]], int]: """List references with pagination, filtering, and sorting. + When ``after_cursor_value``/``after_cursor_id`` are supplied the query uses + keyset pagination — ``offset`` is ignored and a WHERE clause selects rows + strictly after the given ``(sort_col, id)`` position in the active sort + direction. The cursor value must already be typed for the column + (datetime for time sorts, int for size, str for name); the caller decodes + the opaque cursor string and resolves to the typed value. + Returns (references, tag_map, total_count). """ base = ( @@ -297,9 +306,31 @@ def list_references_page( "size": Asset.size_bytes, } sort_col = sort_map.get(sort, AssetReference.created_at) - sort_exp = sort_col.desc() if order == "desc" else sort_col.asc() + descending = order == "desc" - base = base.order_by(sort_exp).limit(limit).offset(offset) + # Keyset WHERE: (sort_col, id) strictly less-than / greater-than the cursor. + # Equivalent to: sort_col v OR (sort_col = v AND id cursor_id). + if after_cursor_value is not None and after_cursor_id is not None: + if descending: + keyset = sa.or_( + sort_col < after_cursor_value, + sa.and_(sort_col == after_cursor_value, AssetReference.id < after_cursor_id), + ) + else: + keyset = sa.or_( + sort_col > after_cursor_value, + sa.and_(sort_col == after_cursor_value, AssetReference.id > after_cursor_id), + ) + base = base.where(keyset) + + # Secondary ORDER BY id (matching the primary direction) gives the keyset + # comparison a deterministic tiebreaker on duplicate sort_col values. + id_exp = AssetReference.id.desc() if descending else AssetReference.id.asc() + sort_exp = sort_col.desc() if descending else sort_col.asc() + + base = base.order_by(sort_exp, id_exp).limit(limit) + if after_cursor_id is None: + base = base.offset(offset) count_stmt = ( select(sa.func.count()) diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index 5aefd9956..730384e8c 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -1,8 +1,19 @@ import contextlib import mimetypes import os +from datetime import timezone from typing import Sequence +from app.assets.services.cursor import ( + CursorPayload, + InvalidCursorError, + decode_cursor, + decode_cursor_int, + decode_cursor_time, + encode_cursor, + encode_cursor_from_time, +) + from app.assets.database.models import Asset from app.assets.database.queries import ( @@ -242,6 +253,12 @@ def get_asset_by_hash(asset_hash: str) -> AssetData | None: return extract_asset_data(asset) +# Sort fields that support cursor pagination. Mirrors cloud's allowlist +# (created_at, updated_at, name, size). `last_access_time` is OSS-only and +# falls back to offset/limit — no cloud contract to match. +_CURSOR_SORT_FIELDS = ("created_at", "updated_at", "name", "size") + + def list_assets_page( owner_id: str = "", include_tags: Sequence[str] | None = None, @@ -252,7 +269,30 @@ def list_assets_page( offset: int = 0, sort: str = "created_at", order: str = "desc", + after: str | None = None, ) -> ListAssetsResult: + """List assets with optional cursor pagination. + + When ``after`` is supplied it overrides ``offset``. The cursor's sort field + must match ``sort`` and be in the cursor-supported allowlist; mismatches + raise InvalidCursorError so the handler can map to 400 INVALID_CURSOR. + """ + cursor_value: object | None = None + cursor_id: str | None = None + use_cursor_mode = after is not None and sort in _CURSOR_SORT_FIELDS + + if after is not None: + if sort not in _CURSOR_SORT_FIELDS: + raise InvalidCursorError( + f"cursor pagination is not supported for sort={sort!r}" + ) + payload = decode_cursor(after, _CURSOR_SORT_FIELDS) + if payload.sort_field != sort: + raise InvalidCursorError( + f"cursor sort field {payload.sort_field!r} does not match request sort {sort!r}" + ) + cursor_value, cursor_id = _resolve_cursor_value(payload), payload.id + with create_session() as session: refs, tag_map, total = list_references_page( session, @@ -265,6 +305,8 @@ def list_assets_page( offset=offset, sort=sort, order=order, + after_cursor_value=cursor_value, + after_cursor_id=cursor_id, ) items: list[AssetSummaryData] = [] @@ -277,7 +319,34 @@ def list_assets_page( ) ) - return ListAssetsResult(items=items, total=total) + next_cursor: str | None = None + if use_cursor_mode and len(refs) == limit: + next_cursor = _encode_next_cursor(refs[-1], sort) + + return ListAssetsResult(items=items, total=total, next_cursor=next_cursor) + + +def _resolve_cursor_value(payload: CursorPayload) -> object: + """Map a decoded cursor payload to a column-typed Python value.""" + if payload.sort_field in ("created_at", "updated_at"): + # DB stores naive UTC; strip tzinfo so the comparison binds against a + # `TIMESTAMP WITHOUT TIME ZONE` column without an offset shift. + return decode_cursor_time(payload).replace(tzinfo=None) + if payload.sort_field == "size": + return decode_cursor_int(payload) + return payload.value # name, str-typed + + +def _encode_next_cursor(ref, sort: str) -> str: + """Mint a cursor pointing at *ref* for the given sort dimension.""" + if sort == "name": + return encode_cursor("name", ref.name, ref.id) + if sort == "size": + size = ref.asset.size_bytes if ref.asset is not None else 0 + return encode_cursor("size", str(size), ref.id) + # created_at / updated_at — DB datetimes are naive UTC; attach tz before encoding. + value = ref.created_at if sort == "created_at" else ref.updated_at + return encode_cursor_from_time(sort, value.replace(tzinfo=timezone.utc), ref.id) def resolve_hash_to_path( diff --git a/app/assets/services/schemas.py b/app/assets/services/schemas.py index 0eb128f58..fb304fca7 100644 --- a/app/assets/services/schemas.py +++ b/app/assets/services/schemas.py @@ -71,6 +71,7 @@ class AssetSummaryData: class ListAssetsResult: items: list[AssetSummaryData] total: int + next_cursor: str | None = None @dataclass(frozen=True)