fix(assets): seed added_at past max(existing) to survive Windows clock collisions
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Build package / Build Test (3.10) (push) Has been cancelled
Build package / Build Test (3.11) (push) Has been cancelled
Build package / Build Test (3.12) (push) Has been cancelled
Build package / Build Test (3.13) (push) Has been cancelled
Build package / Build Test (3.14) (push) Has been cancelled

The per-tag microsecond stagger preserves intra-batch order, but two
back-to-back write batches on the same reference (e.g.
set_reference_tags for path tags, then add_tags_to_reference for user
tags) call get_utc_now() independently. On Windows the system clock can
return the same datetime for both calls if no OS tick elapsed between
the commits — both batches end up sharing microseconds and
ORDER BY added_at, tag_name falls back to the alphabetic tiebreaker,
sorting user tags ahead of path tags they were meant to follow.

Add _next_added_at_base(reference_id) that reads max(existing added_at)
and returns max(existing + 1us, get_utc_now()), guaranteeing the new
batch sorts strictly after anything previously written for that
reference. Used by set_reference_tags and add_tags_to_reference;
batch_insert_seed_assets stays on raw get_utc_now() since seed inserts
are always the first writes for a new reference.

The accompanying regression test pins get_utc_now() to a frozen value
so the previously-Windows-only race becomes a platform-independent
failure mode under test.
This commit is contained in:
Matt Miller 2026-05-20 20:32:36 -07:00
parent 2d21956ac7
commit dc6190e8ba
2 changed files with 65 additions and 5 deletions

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import timedelta from datetime import datetime, timedelta
from typing import Iterable, Sequence from typing import Iterable, Sequence
import sqlalchemy as sa import sqlalchemy as sa
@ -50,6 +50,26 @@ class SetTagsResult:
total: list[str] total: list[str]
def _next_added_at_base(session: Session, reference_id: str) -> datetime:
"""Return a timestamp strictly greater than any existing
`added_at` for this reference. On platforms where the wall clock
has insufficient resolution between back-to-back commits (notably
Windows), two write batches on the same reference can otherwise
share a microsecond the `ORDER BY added_at, tag_name` retrieval
then falls back to the alphabetic tiebreaker and user-tier tags
sort ahead of path-tier tags they were meant to follow.
"""
existing_max = session.execute(
sa.select(sa.func.max(AssetReferenceTag.added_at)).where(
AssetReferenceTag.asset_reference_id == reference_id
)
).scalar()
now = get_utc_now()
if existing_max is None:
return now
return max(existing_max + timedelta(microseconds=1), now)
def validate_tags_exist(session: Session, tags: list[str]) -> None: def validate_tags_exist(session: Session, tags: list[str]) -> None:
"""Raise ValueError if any of the given tag names do not exist.""" """Raise ValueError if any of the given tag names do not exist."""
existing_tag_names = set( existing_tag_names = set(
@ -114,8 +134,9 @@ def set_reference_tags(
# added_at preserves input order. Per-tag get_utc_now() calls can # added_at preserves input order. Per-tag get_utc_now() calls can
# collide at microsecond resolution on fast machines, dropping the # collide at microsecond resolution on fast machines, dropping the
# query to the tag_name alphabetical tiebreaker — same fix as in # query to the tag_name alphabetical tiebreaker — same fix as in
# batch_insert_seed_assets. # batch_insert_seed_assets. Read max(existing) so this batch sorts
base_ts = get_utc_now() # strictly after any prior batch on the same reference.
base_ts = _next_added_at_base(session, reference_id)
session.add_all( session.add_all(
[ [
AssetReferenceTag( AssetReferenceTag(
@ -172,8 +193,9 @@ def add_tags_to_reference(
to_add = [t for t in norm if t not in current] to_add = [t for t in norm if t not in current]
if to_add: if to_add:
# See set_reference_tags for the rationale behind the per-tag stagger. # See set_reference_tags for the rationale behind the per-tag stagger
base_ts = get_utc_now() # and the max(existing) seed.
base_ts = _next_added_at_base(session, reference_id)
with session.begin_nested() as nested: with session.begin_nested() as nested:
try: try:
session.add_all( session.add_all(

View File

@ -239,6 +239,44 @@ class TestTagRetrievalOrder:
assert tags[0:2] == ["models", "checkpoints"] assert tags[0:2] == ["models", "checkpoints"]
assert set(tags[2:]) == {"zzz-z", "favorite", "experiment-q4"} assert set(tags[2:]) == {"zzz-z", "favorite", "experiment-q4"}
def test_user_batch_lands_after_path_batch_under_clock_collision(
self, session: Session, monkeypatch: pytest.MonkeyPatch
):
"""Windows-specific race: when two back-to-back commits share the
same datetime.now() microsecond, the path-tier and user-tier
added_at values used to collide and alphabetic tiebreak would
hoist user tags ahead of path tags. The fix reads
max(existing_added_at) for the reference and seeds the next batch
past it, deterministically restoring insertion order.
This test simulates the collision by pinning get_utc_now() so the
platform-dependent race becomes a platform-independent failure.
"""
ref = self._make_ref(session)
from datetime import datetime
from app.assets.database import queries as queries_pkg
from app.assets.database.queries import tags as tags_module
frozen = datetime(2026, 1, 1, 0, 0, 0)
monkeypatch.setattr(tags_module, "get_utc_now", lambda: frozen)
monkeypatch.setattr(queries_pkg, "get_utc_now", lambda: frozen, raising=False)
set_reference_tags(session, reference_id=ref.id, tags=["models", "checkpoints"])
session.commit()
# Same frozen timestamp — without the max(existing) seed, the
# user batch would share added_at with the path batch and
# `aaa-user-tag` would sort to position 0 via the alphabetic
# tiebreaker.
add_tags_to_reference(
session, reference_id=ref.id, tags=["aaa-user-tag"], origin="manual"
)
session.commit()
_, tag_map, _ = list_references_page(session)
assert tag_map[ref.id] == ["models", "checkpoints", "aaa-user-tag"]
def test_remove_then_add_does_not_disrupt_path_tag_positions( def test_remove_then_add_does_not_disrupt_path_tag_positions(
self, session: Session self, session: Session
): ):