feat(assets): thread cursor through schemas, service, and query layer

list_assets_page accepts an opaque 'after' cursor and returns
next_cursor when more pages are available. The query applies a keyset
WHERE clause and a secondary ORDER BY id for deterministic tiebreak.

Cursor sort field is validated against the request sort, and a
last_access_time sort (OSS-only) falls back to offset/limit. Offset is
ignored whenever a cursor is supplied.
This commit is contained in:
Matt Miller 2026-05-20 12:58:55 -07:00
parent 5a70aeebe8
commit 39abd769b1
5 changed files with 110 additions and 3 deletions

View File

@ -59,6 +59,10 @@ class ListAssetsQuery(BaseModel):
limit: conint(ge=1, le=500) = 20
offset: conint(ge=0) = 0
# Opaque keyset cursor. When supplied, `offset` is ignored. Cursor pagination
# is supported for sort values `created_at`, `updated_at`, `name`, `size`;
# `last_access_time` falls back to offset/limit.
after: str | None = None
sort: Literal["name", "created_at", "updated_at", "size", "last_access_time"] = (
"created_at"

View File

@ -40,6 +40,8 @@ class AssetsList(BaseModel):
assets: list[Asset]
total: int
has_more: bool
# Opaque cursor for the next page. Omitted when there are no more results.
next_cursor: str | None = None
class TagUsage(BaseModel):

View File

@ -266,9 +266,18 @@ def list_references_page(
metadata_filter: dict | None = None,
sort: str | None = None,
order: str | None = None,
after_cursor_value: object | None = None,
after_cursor_id: str | None = None,
) -> tuple[list[AssetReference], dict[str, list[str]], int]:
"""List references with pagination, filtering, and sorting.
When ``after_cursor_value``/``after_cursor_id`` are supplied the query uses
keyset pagination ``offset`` is ignored and a WHERE clause selects rows
strictly after the given ``(sort_col, id)`` position in the active sort
direction. The cursor value must already be typed for the column
(datetime for time sorts, int for size, str for name); the caller decodes
the opaque cursor string and resolves to the typed value.
Returns (references, tag_map, total_count).
"""
base = (
@ -297,9 +306,31 @@ def list_references_page(
"size": Asset.size_bytes,
}
sort_col = sort_map.get(sort, AssetReference.created_at)
sort_exp = sort_col.desc() if order == "desc" else sort_col.asc()
descending = order == "desc"
base = base.order_by(sort_exp).limit(limit).offset(offset)
# Keyset WHERE: (sort_col, id) strictly less-than / greater-than the cursor.
# Equivalent to: sort_col <op> v OR (sort_col = v AND id <op> cursor_id).
if after_cursor_value is not None and after_cursor_id is not None:
if descending:
keyset = sa.or_(
sort_col < after_cursor_value,
sa.and_(sort_col == after_cursor_value, AssetReference.id < after_cursor_id),
)
else:
keyset = sa.or_(
sort_col > after_cursor_value,
sa.and_(sort_col == after_cursor_value, AssetReference.id > after_cursor_id),
)
base = base.where(keyset)
# Secondary ORDER BY id (matching the primary direction) gives the keyset
# comparison a deterministic tiebreaker on duplicate sort_col values.
id_exp = AssetReference.id.desc() if descending else AssetReference.id.asc()
sort_exp = sort_col.desc() if descending else sort_col.asc()
base = base.order_by(sort_exp, id_exp).limit(limit)
if after_cursor_id is None:
base = base.offset(offset)
count_stmt = (
select(sa.func.count())

View File

@ -1,8 +1,19 @@
import contextlib
import mimetypes
import os
from datetime import timezone
from typing import Sequence
from app.assets.services.cursor import (
CursorPayload,
InvalidCursorError,
decode_cursor,
decode_cursor_int,
decode_cursor_time,
encode_cursor,
encode_cursor_from_time,
)
from app.assets.database.models import Asset
from app.assets.database.queries import (
@ -242,6 +253,12 @@ def get_asset_by_hash(asset_hash: str) -> AssetData | None:
return extract_asset_data(asset)
# Sort fields that support cursor pagination. Mirrors cloud's allowlist
# (created_at, updated_at, name, size). `last_access_time` is OSS-only and
# falls back to offset/limit — no cloud contract to match.
_CURSOR_SORT_FIELDS = ("created_at", "updated_at", "name", "size")
def list_assets_page(
owner_id: str = "",
include_tags: Sequence[str] | None = None,
@ -252,7 +269,30 @@ def list_assets_page(
offset: int = 0,
sort: str = "created_at",
order: str = "desc",
after: str | None = None,
) -> ListAssetsResult:
"""List assets with optional cursor pagination.
When ``after`` is supplied it overrides ``offset``. The cursor's sort field
must match ``sort`` and be in the cursor-supported allowlist; mismatches
raise InvalidCursorError so the handler can map to 400 INVALID_CURSOR.
"""
cursor_value: object | None = None
cursor_id: str | None = None
use_cursor_mode = after is not None and sort in _CURSOR_SORT_FIELDS
if after is not None:
if sort not in _CURSOR_SORT_FIELDS:
raise InvalidCursorError(
f"cursor pagination is not supported for sort={sort!r}"
)
payload = decode_cursor(after, _CURSOR_SORT_FIELDS)
if payload.sort_field != sort:
raise InvalidCursorError(
f"cursor sort field {payload.sort_field!r} does not match request sort {sort!r}"
)
cursor_value, cursor_id = _resolve_cursor_value(payload), payload.id
with create_session() as session:
refs, tag_map, total = list_references_page(
session,
@ -265,6 +305,8 @@ def list_assets_page(
offset=offset,
sort=sort,
order=order,
after_cursor_value=cursor_value,
after_cursor_id=cursor_id,
)
items: list[AssetSummaryData] = []
@ -277,7 +319,34 @@ def list_assets_page(
)
)
return ListAssetsResult(items=items, total=total)
next_cursor: str | None = None
if use_cursor_mode and len(refs) == limit:
next_cursor = _encode_next_cursor(refs[-1], sort)
return ListAssetsResult(items=items, total=total, next_cursor=next_cursor)
def _resolve_cursor_value(payload: CursorPayload) -> object:
"""Map a decoded cursor payload to a column-typed Python value."""
if payload.sort_field in ("created_at", "updated_at"):
# DB stores naive UTC; strip tzinfo so the comparison binds against a
# `TIMESTAMP WITHOUT TIME ZONE` column without an offset shift.
return decode_cursor_time(payload).replace(tzinfo=None)
if payload.sort_field == "size":
return decode_cursor_int(payload)
return payload.value # name, str-typed
def _encode_next_cursor(ref, sort: str) -> str:
"""Mint a cursor pointing at *ref* for the given sort dimension."""
if sort == "name":
return encode_cursor("name", ref.name, ref.id)
if sort == "size":
size = ref.asset.size_bytes if ref.asset is not None else 0
return encode_cursor("size", str(size), ref.id)
# created_at / updated_at — DB datetimes are naive UTC; attach tz before encoding.
value = ref.created_at if sort == "created_at" else ref.updated_at
return encode_cursor_from_time(sort, value.replace(tzinfo=timezone.utc), ref.id)
def resolve_hash_to_path(

View File

@ -71,6 +71,7 @@ class AssetSummaryData:
class ListAssetsResult:
items: list[AssetSummaryData]
total: int
next_cursor: str | None = None
@dataclass(frozen=True)