mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-05-31 03:17:23 +08:00
fix(assets): address cursor-review verified findings
- Mint next_cursor on every cursor-supported sort, not only when 'after' was supplied. A first request (no 'after') previously returned next_cursor=None, leaving cursor mode unreachable from a clean start. - Over-fetch limit+1 so an exactly-full terminal page doesn't mint a spurious cursor pointing at a phantom next page. - Map crafted out-of-range microsecond cursors (OverflowError / OSError in datetime construction) to 400 INVALID_CURSOR instead of leaking 500. - Bump MAX_CURSOR_VALUE_LENGTH 256 -> 512 to match the AssetReference name column max; without this, a long-named asset minted a cursor the same server then refused on the next request. Cross-runtime byte identity with cloud is unaffected because no cloud cursor ever carries a value > 256 (cloud schema doesn't permit it). - Return None from _encode_next_cursor when the boundary row carries a NULL sort value (e.g. an Asset without size_bytes backfilled), instead of silently encoding 0 and mis-positioning the keyset. - Fix schemas_in.py comment so it matches actual handler behavior (last_access_time + 'after' raises 400, does not fall back). - Add AssetsApiError schema + 400 response to GET /api/assets in openapi.yaml so generated clients know the INVALID_CURSOR envelope. - Extend integration coverage: first-page mint, exact-multiple terminal page, cursor walks for created_at/updated_at/size sorts, datetime overflow surfaces as 400 not 500. - Add unit coverage for datetime overflow and 512-char round-trip.
This commit is contained in:
parent
df1f6a7fcc
commit
d0258ae53d
@ -60,8 +60,9 @@ class ListAssetsQuery(BaseModel):
|
||||
limit: conint(ge=1, le=500) = 20
|
||||
offset: conint(ge=0) = 0
|
||||
# Opaque keyset cursor. When supplied, `offset` is ignored. Cursor pagination
|
||||
# is supported for sort values `created_at`, `updated_at`, `name`, `size`;
|
||||
# `last_access_time` falls back to offset/limit.
|
||||
# is supported for sort values `created_at`, `updated_at`, `name`, `size`.
|
||||
# Supplying `after` together with `sort=last_access_time` returns
|
||||
# 400 INVALID_CURSOR; that sort only supports offset/limit.
|
||||
after: str | None = None
|
||||
|
||||
sort: Literal["name", "created_at", "updated_at", "size", "last_access_time"] = (
|
||||
|
||||
@ -279,7 +279,11 @@ def list_assets_page(
|
||||
"""
|
||||
cursor_value: object | None = None
|
||||
cursor_id: str | None = None
|
||||
use_cursor_mode = after is not None and sort in _CURSOR_SORT_FIELDS
|
||||
# Mint next_cursor on every page where the sort is cursor-supported, not
|
||||
# only when the request itself arrived with a cursor. Otherwise a first
|
||||
# request (no `after`) returns next_cursor=None and the client can never
|
||||
# enter cursor mode.
|
||||
mint_cursor = sort in _CURSOR_SORT_FIELDS
|
||||
|
||||
if after is not None:
|
||||
if sort not in _CURSOR_SORT_FIELDS:
|
||||
@ -293,6 +297,11 @@ def list_assets_page(
|
||||
)
|
||||
cursor_value, cursor_id = _resolve_cursor_value(payload), payload.id
|
||||
|
||||
# Over-fetch by one row so we can distinguish "exactly `limit` rows total
|
||||
# remaining" from "more rows past this page" without a second query. Drop
|
||||
# the sentinel before returning.
|
||||
fetch_limit = limit + 1 if mint_cursor else limit
|
||||
|
||||
with create_session() as session:
|
||||
refs, tag_map, total = list_references_page(
|
||||
session,
|
||||
@ -301,7 +310,7 @@ def list_assets_page(
|
||||
exclude_tags=exclude_tags,
|
||||
name_contains=name_contains,
|
||||
metadata_filter=metadata_filter,
|
||||
limit=limit,
|
||||
limit=fetch_limit,
|
||||
offset=offset,
|
||||
sort=sort,
|
||||
order=order,
|
||||
@ -309,6 +318,14 @@ def list_assets_page(
|
||||
after_cursor_id=cursor_id,
|
||||
)
|
||||
|
||||
next_cursor: str | None = None
|
||||
if mint_cursor and len(refs) > limit:
|
||||
# There's at least one more row past this page — mint a cursor from
|
||||
# the last row of the page (i.e. index `limit - 1`, since we
|
||||
# over-fetched), and drop the sentinel.
|
||||
next_cursor = _encode_next_cursor(refs[limit - 1], sort)
|
||||
refs = refs[:limit]
|
||||
|
||||
items: list[AssetSummaryData] = []
|
||||
for ref in refs:
|
||||
items.append(
|
||||
@ -319,10 +336,6 @@ def list_assets_page(
|
||||
)
|
||||
)
|
||||
|
||||
next_cursor: str | None = None
|
||||
if use_cursor_mode and len(refs) == limit:
|
||||
next_cursor = _encode_next_cursor(refs[-1], sort)
|
||||
|
||||
return ListAssetsResult(items=items, total=total, next_cursor=next_cursor)
|
||||
|
||||
|
||||
@ -337,15 +350,24 @@ def _resolve_cursor_value(payload: CursorPayload) -> object:
|
||||
return payload.value # name, str-typed
|
||||
|
||||
|
||||
def _encode_next_cursor(ref, sort: str) -> str:
|
||||
"""Mint a cursor pointing at *ref* for the given sort dimension."""
|
||||
def _encode_next_cursor(ref, sort: str) -> str | None:
|
||||
"""Mint a cursor pointing at *ref* for the given sort dimension.
|
||||
|
||||
Returns None when the boundary row carries a NULL sort value (e.g. an asset
|
||||
record whose size_bytes hasn't been backfilled). Continuing pagination
|
||||
across a NULL boundary is undefined under keyset ordering — better to
|
||||
truncate cleanly here than to mint a cursor that mis-positions.
|
||||
"""
|
||||
if sort == "name":
|
||||
return encode_cursor("name", ref.name, ref.id)
|
||||
if sort == "size":
|
||||
size = ref.asset.size_bytes if ref.asset is not None else 0
|
||||
return encode_cursor("size", str(size), ref.id)
|
||||
if ref.asset is None or ref.asset.size_bytes is None:
|
||||
return None
|
||||
return encode_cursor("size", str(ref.asset.size_bytes), ref.id)
|
||||
# created_at / updated_at — DB datetimes are naive UTC; attach tz before encoding.
|
||||
value = ref.created_at if sort == "created_at" else ref.updated_at
|
||||
if value is None:
|
||||
return None
|
||||
return encode_cursor_from_time(sort, value.replace(tzinfo=timezone.utc), ref.id)
|
||||
|
||||
|
||||
|
||||
@ -32,9 +32,16 @@ class InvalidCursorError(ValueError):
|
||||
|
||||
# Wire-format length caps. Cursors are user-controlled, so caps protect the
|
||||
# decode path from oversized allocations and downstream SQL predicates from
|
||||
# unbounded strings. Same numbers as cloud/common/pagination/cursor.go.
|
||||
# unbounded strings.
|
||||
#
|
||||
# MAX_CURSOR_VALUE_LENGTH is 512 (vs cloud's 256) to fit OSS's
|
||||
# `AssetReference.name` column max (String(512)) — otherwise a long-named
|
||||
# asset would mint a cursor the same server then refuses on the next request.
|
||||
# Cloud's data model has shorter names so its lower cap is fine there;
|
||||
# cross-runtime byte-identity is unaffected because no real cloud cursor ever
|
||||
# carries a value > 256.
|
||||
MAX_ENCODED_CURSOR_LENGTH = 1024
|
||||
MAX_CURSOR_VALUE_LENGTH = 256
|
||||
MAX_CURSOR_VALUE_LENGTH = 512
|
||||
MAX_CURSOR_ID_LENGTH = 128
|
||||
|
||||
|
||||
@ -122,7 +129,12 @@ def decode_cursor_time(payload: Optional[CursorPayload]) -> datetime:
|
||||
micros = int(payload.value)
|
||||
except ValueError as e:
|
||||
raise InvalidCursorError(f"value is not a valid timestamp: {e}") from e
|
||||
return _unix_micros_to_datetime(micros)
|
||||
try:
|
||||
return _unix_micros_to_datetime(micros)
|
||||
except (OverflowError, OSError, ValueError) as e:
|
||||
# Crafted out-of-range microseconds (e.g. > datetime.MAX_YEAR) blow up
|
||||
# in fromtimestamp / datetime construction. Map to 400, not 500.
|
||||
raise InvalidCursorError(f"value is out of representable range: {e}") from e
|
||||
|
||||
|
||||
def decode_cursor_int(payload: Optional[CursorPayload]) -> int:
|
||||
|
||||
29
openapi.yaml
29
openapi.yaml
@ -1594,6 +1594,12 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ListAssetsResponse"
|
||||
"400":
|
||||
description: Malformed query or cursor (e.g. `INVALID_CURSOR`)
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/AssetsApiError"
|
||||
post:
|
||||
operationId: createAsset
|
||||
tags: [assets]
|
||||
@ -6498,6 +6504,29 @@ components:
|
||||
Opaque cursor to fetch the next page. Pass back as the `after`
|
||||
query parameter. Omitted when there are no more results.
|
||||
|
||||
AssetsApiError:
|
||||
type: object
|
||||
description: Error envelope returned by the assets API on 400 responses.
|
||||
required:
|
||||
- error
|
||||
properties:
|
||||
error:
|
||||
type: object
|
||||
required:
|
||||
- code
|
||||
- message
|
||||
properties:
|
||||
code:
|
||||
type: string
|
||||
description: |
|
||||
Machine-readable error code. `INVALID_CURSOR` is returned when the
|
||||
`after` cursor is malformed, oversized, or its sort field does
|
||||
not match the request's `sort`. `INVALID_QUERY` covers other
|
||||
Pydantic validation failures.
|
||||
enum: [INVALID_CURSOR, INVALID_QUERY]
|
||||
message:
|
||||
type: string
|
||||
|
||||
TagInfo:
|
||||
type: object
|
||||
description: A tag known to the asset database, with the number of assets bearing it.
|
||||
|
||||
@ -162,6 +162,40 @@ class TestEncodeAtCapsFits:
|
||||
assert payload.id == id
|
||||
|
||||
|
||||
class TestDatetimeOverflow:
|
||||
"""Crafted cursors with extreme micros must map to InvalidCursorError,
|
||||
not OverflowError/OSError leaking as 500.
|
||||
"""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"micros_str",
|
||||
[
|
||||
"999999999999999999999", # 10^21 µs — past datetime.MAX_YEAR by ~14 orders
|
||||
"-999999999999999999999", # symmetric negative — pre-epoch overflow
|
||||
],
|
||||
)
|
||||
def test_out_of_range_micros_rejected(self, micros_str):
|
||||
encoded = encode_cursor("created_at", micros_str, "asset-x")
|
||||
payload = decode_cursor(encoded, ALLOWED)
|
||||
with pytest.raises(InvalidCursorError):
|
||||
decode_cursor_time(payload)
|
||||
|
||||
|
||||
class TestEncoderDecoderSymmetry:
|
||||
"""The encoder must reject inputs the decoder rejects, or the same server
|
||||
will mint a cursor it then 400s on the next request.
|
||||
"""
|
||||
|
||||
def test_long_name_within_cap_round_trips(self):
|
||||
"""OSS assets allow names up to 512 chars (`String(512)`); cursor must
|
||||
handle that. Cloud's lower cap is acceptable on its side because the
|
||||
cloud schema doesn't permit names that long."""
|
||||
long_name = "n" * MAX_CURSOR_VALUE_LENGTH
|
||||
encoded = encode_cursor("name", long_name, "asset-x")
|
||||
payload = decode_cursor(encoded, ALLOWED)
|
||||
assert payload.value == long_name
|
||||
|
||||
|
||||
class TestByteIdentityWithCloud:
|
||||
"""Lock the wire format against drift from cloud's Go implementation.
|
||||
|
||||
|
||||
@ -4,6 +4,9 @@ Wire contract is shared with cloud's Go implementation (BE-893). These tests
|
||||
exercise the handler/service/query path end-to-end; cursor-encoding-level
|
||||
tests live in tests-unit/assets_test/services/test_cursor.py.
|
||||
"""
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
|
||||
@ -146,6 +149,105 @@ def test_next_cursor_absent_when_no_more_results(http: requests.Session, api_bas
|
||||
assert "next_cursor" not in body
|
||||
|
||||
|
||||
def test_cursor_pagination_first_page_mints_cursor(http: requests.Session, api_base: str, asset_factory, make_asset_bytes):
|
||||
"""First-page request (no `after`) must still return `next_cursor` when
|
||||
more rows exist, or pagination is unreachable from a cold start.
|
||||
"""
|
||||
_seed(asset_factory, make_asset_bytes, count=3, tag="cursor-first-page")
|
||||
r = http.get(
|
||||
api_base + "/api/assets",
|
||||
params={"include_tags": "unit-tests,cursor-first-page", "sort": "name", "order": "asc", "limit": "2"},
|
||||
timeout=120,
|
||||
)
|
||||
body = r.json()
|
||||
assert body["has_more"] is True
|
||||
assert body.get("next_cursor"), "first page must mint a cursor when more rows exist"
|
||||
|
||||
|
||||
def test_cursor_no_spurious_cursor_when_page_size_equals_remainder(http: requests.Session, api_base: str, asset_factory, make_asset_bytes):
|
||||
"""When `total` is an exact multiple of `limit`, the final page must
|
||||
NOT carry a next_cursor — there is nothing past it.
|
||||
"""
|
||||
_seed(asset_factory, make_asset_bytes, count=4, tag="cursor-exact-multiple")
|
||||
# Page 1
|
||||
r = http.get(
|
||||
api_base + "/api/assets",
|
||||
params={"include_tags": "unit-tests,cursor-exact-multiple", "sort": "name", "order": "asc", "limit": "2"},
|
||||
timeout=120,
|
||||
)
|
||||
cursor = r.json()["next_cursor"]
|
||||
assert cursor is not None
|
||||
# Page 2 — should exhaust the set with no cursor for a phantom page 3
|
||||
r2 = http.get(
|
||||
api_base + "/api/assets",
|
||||
params={"include_tags": "unit-tests,cursor-exact-multiple", "sort": "name", "order": "asc", "limit": "2", "after": cursor},
|
||||
timeout=120,
|
||||
)
|
||||
body = r2.json()
|
||||
assert len(body["assets"]) == 2
|
||||
assert body["has_more"] is False
|
||||
assert "next_cursor" not in body
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sort_field", ["created_at", "updated_at", "size"])
|
||||
def test_cursor_walks_for_non_name_sorts(sort_field, http: requests.Session, api_base: str, asset_factory, make_asset_bytes):
|
||||
"""Cursor pagination must work for every sort field the contract claims.
|
||||
|
||||
Without this, the `created_at` / `updated_at` (time-encoded micros) and
|
||||
`size` (int-encoded) cursor paths go entirely unexercised end-to-end.
|
||||
"""
|
||||
# Stagger create_time slightly so created_at / updated_at sort is well-defined.
|
||||
names = []
|
||||
for i in range(4):
|
||||
n = f"cursor_{sort_field}_{i:02d}.safetensors"
|
||||
asset_factory(n, ["models", "checkpoints", "unit-tests", f"cursor-{sort_field}"], {}, make_asset_bytes(n, size=2048 + i))
|
||||
time.sleep(0.05) # ensure distinct timestamps
|
||||
names.append(n)
|
||||
|
||||
params = {
|
||||
"include_tags": f"unit-tests,cursor-{sort_field}",
|
||||
"sort": sort_field,
|
||||
"order": "desc",
|
||||
"limit": "2",
|
||||
}
|
||||
seen: list[str] = []
|
||||
after: str | None = None
|
||||
pages = 0
|
||||
while True:
|
||||
page_params = dict(params)
|
||||
if after is not None:
|
||||
page_params["after"] = after
|
||||
r = http.get(api_base + "/api/assets", params=page_params, timeout=120)
|
||||
assert r.status_code == 200, r.text
|
||||
body = r.json()
|
||||
seen.extend(a["name"] for a in body["assets"])
|
||||
after = body.get("next_cursor")
|
||||
pages += 1
|
||||
if after is None:
|
||||
break
|
||||
assert pages < 10, "guard against runaway cursor loop"
|
||||
|
||||
assert set(seen) == set(names), f"missing items for sort={sort_field}: expected {set(names)}, got {set(seen)}"
|
||||
|
||||
|
||||
def test_cursor_invalid_cursor_at_microsecond_boundary(http: requests.Session, api_base: str):
|
||||
"""A cursor carrying an out-of-range microsecond timestamp must map to
|
||||
400 INVALID_CURSOR, not 500."""
|
||||
import base64
|
||||
import json
|
||||
# 10^18 microseconds ≈ year 33658, well past datetime.MAX_YEAR.
|
||||
payload = {"s": "created_at", "v": "999999999999999999999", "id": "asset-x"}
|
||||
raw = json.dumps(payload, separators=(",", ":")).encode("utf-8")
|
||||
cursor = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")
|
||||
r = http.get(
|
||||
api_base + "/api/assets",
|
||||
params={"after": cursor, "sort": "created_at"},
|
||||
timeout=120,
|
||||
)
|
||||
assert r.status_code == 400, r.text
|
||||
assert r.json()["error"]["code"] == "INVALID_CURSOR"
|
||||
|
||||
|
||||
def test_cursor_pagination_stable_after_delete(http: requests.Session, api_base: str, asset_factory, make_asset_bytes):
|
||||
names = _seed(asset_factory, make_asset_bytes, count=4, tag="cursor-delete")
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user