diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index 68126b6a5..1155fa503 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -401,12 +401,16 @@ async def upload_asset(request: web.Request) -> web.Response: ) if spec.tags and spec.tags[0] == "models": + # tag[1] may be the standalone category ("checkpoints") or the + # slash-joined shape ("checkpoints/flux/...") that + # `get_name_and_tags_from_asset_path` and cloud both emit. Match + # `resolve_destination_from_tags` by extracting the first segment. + category = spec.tags[1].split("/", 1)[0] if len(spec.tags) >= 2 else "" if ( len(spec.tags) < 2 - or spec.tags[1] not in folder_paths.folder_names_and_paths + or category not in folder_paths.folder_names_and_paths ): delete_temp_file_if_exists(parsed.tmp_path) - category = spec.tags[1] if len(spec.tags) >= 2 else "" return _build_error_response( 400, "INVALID_BODY", f"unknown models category '{category}'" ) diff --git a/app/assets/database/queries/asset_reference.py b/app/assets/database/queries/asset_reference.py index 8b90ae511..2ef5c210e 100644 --- a/app/assets/database/queries/asset_reference.py +++ b/app/assets/database/queries/asset_reference.py @@ -327,7 +327,12 @@ def list_references_page( select(AssetReferenceTag.asset_reference_id, Tag.name) .join(Tag, Tag.name == AssetReferenceTag.tag_name) .where(AssetReferenceTag.asset_reference_id.in_(id_list)) - .order_by(AssetReferenceTag.tag_name.asc()) + # Preserve insertion order so the structural first tag (the root + # category like "models") stays in position 0 and the path-derived + # sub-path tag stays in position 1, matching cloud's behavior. + # tag_name is a deterministic tiebreaker when multiple tags share + # an added_at (same-batch insert via set_reference_tags). + .order_by(AssetReferenceTag.added_at.asc(), AssetReferenceTag.tag_name.asc()) ) for ref_id, tag_name in rows.all(): tag_map[ref_id].append(tag_name) @@ -355,7 +360,8 @@ def fetch_reference_asset_and_tags( build_visible_owner_clause(owner_id), ) .options(noload(AssetReference.tags)) - .order_by(Tag.name.asc()) + # See list_references_page for the rationale behind ordering by added_at. + .order_by(AssetReferenceTag.added_at.asc(), Tag.name.asc()) ) rows = session.execute(stmt).all() diff --git a/app/assets/database/queries/tags.py b/app/assets/database/queries/tags.py index f4126dba8..41d852be6 100644 --- a/app/assets/database/queries/tags.py +++ b/app/assets/database/queries/tags.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from datetime import datetime, timedelta from typing import Iterable, Sequence import sqlalchemy as sa @@ -20,7 +21,12 @@ from app.assets.database.queries.common import ( build_visible_owner_clause, iter_row_chunks, ) -from app.assets.helpers import escape_sql_like_string, get_utc_now, normalize_tags +from app.assets.helpers import ( + escape_sql_like_string, + expand_bucket_prefixes, + get_utc_now, + normalize_tags, +) @dataclass(frozen=True) @@ -44,6 +50,26 @@ class SetTagsResult: total: list[str] +def _next_added_at_base(session: Session, reference_id: str) -> datetime: + """Return a timestamp strictly greater than any existing + `added_at` for this reference. On platforms where the wall clock + has insufficient resolution between back-to-back commits (notably + Windows), two write batches on the same reference can otherwise + share a microsecond — the `ORDER BY added_at, tag_name` retrieval + then falls back to the alphabetic tiebreaker and user-tier tags + sort ahead of path-tier tags they were meant to follow. + """ + existing_max = session.execute( + sa.select(sa.func.max(AssetReferenceTag.added_at)).where( + AssetReferenceTag.asset_reference_id == reference_id + ) + ).scalar() + now = get_utc_now() + if existing_max is None: + return now + return max(existing_max + timedelta(microseconds=1), now) + + def validate_tags_exist(session: Session, tags: list[str]) -> None: """Raise ValueError if any of the given tag names do not exist.""" existing_tag_names = set( @@ -77,7 +103,13 @@ def get_reference_tags(session: Session, reference_id: str) -> list[str]: session.execute( select(AssetReferenceTag.tag_name) .where(AssetReferenceTag.asset_reference_id == reference_id) - .order_by(AssetReferenceTag.tag_name.asc()) + # Match the response-path ordering used by + # list_references_page / fetch_reference_asset_and_tags so + # upload responses and subsequent GETs agree on tag order. + .order_by( + AssetReferenceTag.added_at.asc(), + AssetReferenceTag.tag_name.asc(), + ) ) ).all() ] @@ -89,7 +121,7 @@ def set_reference_tags( tags: Sequence[str], origin: str = "manual", ) -> SetTagsResult: - desired = normalize_tags(tags) + desired = expand_bucket_prefixes(normalize_tags(tags)) current = set(get_reference_tags(session, reference_id)) @@ -98,15 +130,22 @@ def set_reference_tags( if to_add: ensure_tags_exist(session, to_add, tag_type="user") + # Stagger added_at by microsecond per tag so the retrieval ORDER BY + # added_at preserves input order. Per-tag get_utc_now() calls can + # collide at microsecond resolution on fast machines, dropping the + # query to the tag_name alphabetical tiebreaker — same fix as in + # batch_insert_seed_assets. Read max(existing) so this batch sorts + # strictly after any prior batch on the same reference. + base_ts = _next_added_at_base(session, reference_id) session.add_all( [ AssetReferenceTag( asset_reference_id=reference_id, tag_name=t, origin=origin, - added_at=get_utc_now(), + added_at=base_ts + timedelta(microseconds=i), ) - for t in to_add + for i, t in enumerate(to_add) ] ) session.flush() @@ -136,7 +175,7 @@ def add_tags_to_reference( if not ref: raise ValueError(f"AssetReference {reference_id} not found") - norm = normalize_tags(tags) + norm = expand_bucket_prefixes(normalize_tags(tags)) if not norm: total = get_reference_tags(session, reference_id=reference_id) return AddTagsResult(added=[], already_present=[], total_tags=total) @@ -146,10 +185,17 @@ def add_tags_to_reference( current = set(get_reference_tags(session, reference_id)) + # Preserve the caller's insertion order rather than alphabetizing — + # the retrieval ORDER BY added_at + microsecond stagger only meaningfully + # preserves insertion order if "the order we insert in" actually matches + # the caller's intent. want = set(norm) - to_add = sorted(want - current) + to_add = [t for t in norm if t not in current] if to_add: + # See set_reference_tags for the rationale behind the per-tag stagger + # and the max(existing) seed. + base_ts = _next_added_at_base(session, reference_id) with session.begin_nested() as nested: try: session.add_all( @@ -158,9 +204,9 @@ def add_tags_to_reference( asset_reference_id=reference_id, tag_name=t, origin=origin, - added_at=get_utc_now(), + added_at=base_ts + timedelta(microseconds=i), ) - for t in to_add + for i, t in enumerate(to_add) ] ) session.flush() diff --git a/app/assets/helpers.py b/app/assets/helpers.py index 3798f3933..2f9e9a0ce 100644 --- a/app/assets/helpers.py +++ b/app/assets/helpers.py @@ -47,6 +47,50 @@ def normalize_tags(tags: list[str] | None) -> list[str]: return list(dict.fromkeys(t.strip().lower() for t in (tags or []) if (t or "").strip())) +def _known_bucket_prefixes() -> set[str]: + """Lowercased model-category names eligible for standalone-prefix + expansion. Tags whose first slash segment matches one of these get + the bucket inserted as a separate token, so FE filters like + ``include_tags=models,checkpoints`` keep matching even when the + asset lives in a nested subfolder (`models/checkpoints/flux/foo`). + + Bare user labels with slashes whose first segment is not a registered + bucket (e.g. ``my-org/team-a``) pass through unchanged. + """ + try: + import folder_paths + + return { + name.lower() + for name in folder_paths.folder_names_and_paths.keys() + if name != "custom_nodes" + } + except Exception: + return set() + + +def expand_bucket_prefixes(tags: list[str]) -> list[str]: + """Insert standalone bucket tokens after any slash-joined tag whose + first segment is a registered model category. Preserves caller order + and is idempotent (existing bucket tokens are not duplicated). + """ + if not tags: + return list(tags) + buckets = _known_bucket_prefixes() + if not buckets: + return list(tags) + seen = set(tags) + result: list[str] = [] + for t in tags: + result.append(t) + if "/" in t: + prefix = t.split("/", 1)[0] + if prefix.lower() in buckets and prefix not in seen: + result.append(prefix) + seen.add(prefix) + return result + + def validate_blake3_hash(s: str) -> str: """Validate and normalize a blake3 hash string. diff --git a/app/assets/services/bulk_ingest.py b/app/assets/services/bulk_ingest.py index 67aad838f..a5ff19c6b 100644 --- a/app/assets/services/bulk_ingest.py +++ b/app/assets/services/bulk_ingest.py @@ -3,7 +3,7 @@ from __future__ import annotations import os import uuid from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, TypedDict from sqlalchemy.orm import Session @@ -13,13 +13,14 @@ from app.assets.database.queries import ( bulk_insert_references_ignore_conflicts, bulk_insert_tags_and_meta, delete_assets_by_ids, + ensure_tags_exist, get_existing_asset_ids, get_reference_ids_by_ids, get_references_by_paths_and_asset_ids, get_unreferenced_unhashed_asset_ids, restore_references_by_paths, ) -from app.assets.helpers import get_utc_now +from app.assets.helpers import expand_bucket_prefixes, get_utc_now if TYPE_CHECKING: from app.assets.services.metadata_extract import ExtractedMetadata @@ -233,13 +234,20 @@ def batch_insert_seed_assets( if ref_id not in inserted_ref_ids: continue - for tag in ref_data["tags"]: + # Stagger added_at by microsecond per tag within a reference so + # the retrieval ORDER BY added_at preserves the input list order + # (the path-derived root category stays at position 0). Without + # this, every tag in a bulk-insert batch shares current_time and + # the tag_name tiebreaker sorts them alphabetically — putting the + # subpath tag ahead of "models" since "c"/"d"/"l" < "m". + ref_tags = expand_bucket_prefixes(ref_data["tags"]) + for tag_idx, tag in enumerate(ref_tags): tag_rows.append( { "asset_reference_id": ref_id, "tag_name": tag, "origin": "automatic", - "added_at": current_time, + "added_at": current_time + timedelta(microseconds=tag_idx), } ) @@ -261,6 +269,16 @@ def batch_insert_seed_assets( } ) + if tag_rows: + # Bucket-prefix expansion may have introduced tags the caller did + # not register via the upstream tag_pool (e.g. `checkpoints` for a + # nested `checkpoints/flux/foo` path). Pre-register the full set so + # the AssetReferenceTag.tag_name FK is satisfied; the underlying + # insert is ON CONFLICT DO NOTHING so re-registration is idempotent. + ensure_tags_exist( + session, {row["tag_name"] for row in tag_rows}, tag_type="user" + ) + bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=metadata_rows) return BulkInsertResult( diff --git a/app/assets/services/path_utils.py b/app/assets/services/path_utils.py index 892140ffb..c85bd95d8 100644 --- a/app/assets/services/path_utils.py +++ b/app/assets/services/path_utils.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Literal import folder_paths -from app.assets.helpers import normalize_tags _NON_MODEL_FOLDER_NAMES = frozenset({"custom_nodes"}) @@ -27,27 +26,51 @@ def get_comfy_models_folders() -> list[tuple[str, list[str]]]: def resolve_destination_from_tags(tags: list[str]) -> tuple[str, list[str]]: - """Validates and maps tags -> (base_dir, subdirs_for_fs)""" + """Validates and maps tags -> (base_dir, subdirs_for_fs). + + Accepts both the legacy one-tag-per-directory shape + (``["models", "diffusers", "Kolors", "text_encoder"]``) and the + slash-joined shape emitted by :func:`get_name_and_tags_from_asset_path` + (``["models", "diffusers/Kolors/text_encoder"]``). Hybrid shapes that + mix the two within a single call (e.g. + ``["models", "diffusers", "Kolors/text_encoder"]``) are also + accepted: each entry after ``tags[0]`` is split on ``/`` and + concatenated, so the two shapes — and any mix of them — resolve to + the same destination. The same safety checks are applied to each + component after expansion. + """ if not tags: raise ValueError("tags must not be empty") root = tags[0].lower() + + # Expand any slash-joined entries into individual path components so + # the rest of the function can treat both tag shapes uniformly. Each + # component is also stripped, so " a / b " behaves like ["a", "b"]. + expanded: list[str] = [] + for t in tags[1:]: + for part in str(t).split("/"): + part = part.strip() + if part: + expanded.append(part) + if root == "models": - if len(tags) < 2: + if not expanded: raise ValueError("at least two tags required for model asset") + category = expanded[0] try: - bases = folder_paths.folder_names_and_paths[tags[1]][0] + bases = folder_paths.folder_names_and_paths[category][0] except KeyError: - raise ValueError(f"unknown model category '{tags[1]}'") + raise ValueError(f"unknown model category '{category}'") if not bases: - raise ValueError(f"no base path configured for category '{tags[1]}'") + raise ValueError(f"no base path configured for category '{category}'") base_dir = os.path.abspath(bases[0]) - raw_subdirs = tags[2:] + raw_subdirs = expanded[1:] elif root == "input": base_dir = os.path.abspath(folder_paths.get_input_directory()) - raw_subdirs = tags[1:] + raw_subdirs = expanded elif root == "output": base_dir = os.path.abspath(folder_paths.get_output_directory()) - raw_subdirs = tags[1:] + raw_subdirs = expanded else: raise ValueError(f"unknown root tag '{tags[0]}'; expected 'models', 'input', or 'output'") _sep_chars = frozenset(("/", "\\", os.sep)) @@ -160,7 +183,21 @@ def get_name_and_tags_from_asset_path(file_path: str) -> tuple[str, list[str]]: """Return (name, tags) derived from a filesystem path. - name: base filename with extension - - tags: [root_category] + parent folder names in order + - tags: [root_category] for paths with no parent subdirectories, + [root_category, slash_joined_subpath] otherwise. The parent subpath + (everything between the root category and the filename) is collapsed + into a single tag rather than emitted as one tag per directory, so + consumers can use ``tags[1]`` as a stable category identifier that + survives nested directory layouts (e.g. diffusers components). + + The subpath is lowercased to match the canonicalization applied by + :func:`ensure_tags_exist`; without that, the + ``asset_reference_tags.tag_name`` FK to the lowercased ``tags.name`` + would fail for any path containing uppercase letters. The root + category is lowercase by construction in + :func:`get_asset_category_and_relative_path`, so no separate cast + is applied here. Consumers that need to look up providers keyed on + original-case paths should normalize their lookup key to lowercase. Raises: ValueError: path does not belong to any known root. @@ -170,4 +207,7 @@ def get_name_and_tags_from_asset_path(file_path: str) -> tuple[str, list[str]]: parent_parts = [ part for part in p.parent.parts if part not in (".", "..", p.anchor) ] - return p.name, list(dict.fromkeys(normalize_tags([root_category, *parent_parts]))) + tags = [root_category] + if parent_parts: + tags.append("/".join(parent_parts).lower()) + return p.name, list(dict.fromkeys(t.strip() for t in tags if t.strip())) diff --git a/tests-unit/assets_test/queries/test_asset_info.py b/tests-unit/assets_test/queries/test_asset_info.py index fe510e342..62e59fe72 100644 --- a/tests-unit/assets_test/queries/test_asset_info.py +++ b/tests-unit/assets_test/queries/test_asset_info.py @@ -21,6 +21,7 @@ from app.assets.database.queries import ( get_reference_ids_by_ids, ensure_tags_exist, add_tags_to_reference, + set_reference_tags, ) from app.assets.helpers import get_utc_now @@ -159,6 +160,153 @@ class TestListReferencesPage: assert refs[0].name == "large" +class TestTagRetrievalOrder: + """End-to-end check: tags written through the public write paths come + back from the public read paths in insertion order rather than the + composite-PK alphabetical order SQLite would otherwise impose. + + Each test deliberately picks tag names that would sort differently + under alphabetical vs insertion order, so an alphabetical regression + fails loudly. + """ + + def _make_ref(self, session: Session) -> AssetReference: + asset = _make_asset(session, "h1") + return _make_reference(session, asset, name="x.bin") + + def test_set_reference_tags_preserves_input_order_in_list(self, session: Session): + ref = self._make_ref(session) + # "checkpoints" < "models" alphabetically; if added_at stagger + # works, list_references_page returns insertion order. + set_reference_tags(session, reference_id=ref.id, tags=["models", "checkpoints"]) + session.commit() + + _, tag_map, _ = list_references_page(session) + assert tag_map[ref.id] == ["models", "checkpoints"] + + def test_set_reference_tags_preserves_input_order_in_fetch(self, session: Session): + ref = self._make_ref(session) + # Subpath tag sorts before "models" alphabetically. + set_reference_tags( + session, + reference_id=ref.id, + tags=["models", "diffusers/kolors/text_encoder"], + ) + session.commit() + + result = fetch_reference_asset_and_tags(session, ref.id) + assert result is not None + _, _, tags = result + # Bucket-prefix expansion appends the standalone `diffusers` token + # at path-tier (microsecond stagger) so FE set-membership filters + # match nested category paths. + assert tags == ["models", "diffusers/kolors/text_encoder", "diffusers"] + + def test_add_tags_to_reference_lands_after_path_tags(self, session: Session): + ref = self._make_ref(session) + set_reference_tags(session, reference_id=ref.id, tags=["models", "checkpoints"]) + session.commit() + + # "aaa-..." sorts before both path tags alphabetically. If added_at + # stagger is missing, alphabetic tiebreak would hoist it to tags[0]. + add_tags_to_reference( + session, reference_id=ref.id, tags=["aaa-user-tag"], origin="manual" + ) + session.commit() + + _, tag_map, _ = list_references_page(session) + assert tag_map[ref.id] == ["models", "checkpoints", "aaa-user-tag"] + + def test_multi_tag_batch_lands_after_path_tags(self, session: Session): + ref = self._make_ref(session) + set_reference_tags(session, reference_id=ref.id, tags=["models", "checkpoints"]) + session.commit() + + # Three user tags inserted in non-alphabetical input order. Per-tag + # microsecond stagger should preserve at least the "user batch is + # after path tags" property; within the user batch insertion order + # is also preserved. + add_tags_to_reference( + session, + reference_id=ref.id, + tags=["zzz-z", "favorite", "experiment-q4"], + origin="manual", + ) + session.commit() + + _, tag_map, _ = list_references_page(session) + tags = tag_map[ref.id] + assert tags[0:2] == ["models", "checkpoints"] + assert set(tags[2:]) == {"zzz-z", "favorite", "experiment-q4"} + + def test_user_batch_lands_after_path_batch_under_clock_collision( + self, session: Session, monkeypatch: pytest.MonkeyPatch + ): + """Windows-specific race: when two back-to-back commits share the + same datetime.now() microsecond, the path-tier and user-tier + added_at values used to collide and alphabetic tiebreak would + hoist user tags ahead of path tags. The fix reads + max(existing_added_at) for the reference and seeds the next batch + past it, deterministically restoring insertion order. + + This test simulates the collision by pinning get_utc_now() so the + platform-dependent race becomes a platform-independent failure. + """ + ref = self._make_ref(session) + + from datetime import datetime + from app.assets.database import queries as queries_pkg + from app.assets.database.queries import tags as tags_module + + frozen = datetime(2026, 1, 1, 0, 0, 0) + monkeypatch.setattr(tags_module, "get_utc_now", lambda: frozen) + monkeypatch.setattr(queries_pkg, "get_utc_now", lambda: frozen, raising=False) + + set_reference_tags(session, reference_id=ref.id, tags=["models", "checkpoints"]) + session.commit() + + # Same frozen timestamp — without the max(existing) seed, the + # user batch would share added_at with the path batch and + # `aaa-user-tag` would sort to position 0 via the alphabetic + # tiebreaker. + add_tags_to_reference( + session, reference_id=ref.id, tags=["aaa-user-tag"], origin="manual" + ) + session.commit() + + _, tag_map, _ = list_references_page(session) + assert tag_map[ref.id] == ["models", "checkpoints", "aaa-user-tag"] + + def test_remove_then_add_does_not_disrupt_path_tag_positions( + self, session: Session + ): + ref = self._make_ref(session) + set_reference_tags( + session, + reference_id=ref.id, + tags=["models", "loras/my/custom/path"], + ) + session.commit() + add_tags_to_reference(session, reference_id=ref.id, tags=["temp-tag"]) + session.commit() + from app.assets.database.queries import remove_tags_from_reference + + remove_tags_from_reference(session, reference_id=ref.id, tags=["temp-tag"]) + session.commit() + add_tags_to_reference(session, reference_id=ref.id, tags=["second-tag"]) + session.commit() + + _, tag_map, _ = list_references_page(session) + # `loras` is expanded from the nested category path; user-added + # tags trail behind it via the microsecond stagger. + assert tag_map[ref.id] == [ + "models", + "loras/my/custom/path", + "loras", + "second-tag", + ] + + class TestFetchReferenceAssetAndTags: def test_returns_none_for_nonexistent(self, session: Session): result = fetch_reference_asset_and_tags(session, "nonexistent") diff --git a/tests-unit/assets_test/queries/test_tags.py b/tests-unit/assets_test/queries/test_tags.py index 4ed99aa37..bf71d2962 100644 --- a/tests-unit/assets_test/queries/test_tags.py +++ b/tests-unit/assets_test/queries/test_tags.py @@ -160,6 +160,120 @@ class TestAddTagsToReference: add_tags_to_reference(session, reference_id="nonexistent", tags=["x"]) +class TestBucketPrefixExpansion: + """The standalone bucket token must appear in the asset's tag set for + nested category paths so FE filters like + `include_tags=models,checkpoints` continue to match. + """ + + def test_set_reference_tags_inserts_bucket_for_nested_path( + self, session: Session + ): + asset = _make_asset(session, "hash-nested") + ref = _make_reference(session, asset) + + result = set_reference_tags( + session, + reference_id=ref.id, + tags=["models", "checkpoints/flux"], + ) + session.commit() + + assert set(result.total) == {"models", "checkpoints/flux", "checkpoints"} + stored = get_reference_tags(session, reference_id=ref.id) + # tag[1] keeps the slash-joined positional contract; the standalone + # bucket lands after it via path-tier microsecond stagger so user + # tags remain at the tail. + assert stored[:3] == ["models", "checkpoints/flux", "checkpoints"] + + def test_set_reference_tags_idempotent_on_replay(self, session: Session): + asset = _make_asset(session, "hash-replay") + ref = _make_reference(session, asset) + + set_reference_tags( + session, + reference_id=ref.id, + tags=["models", "checkpoints/flux"], + ) + # Replay with the same caller-supplied set; expansion is already + # baked in, so nothing should be added or removed. + result = set_reference_tags( + session, + reference_id=ref.id, + tags=["models", "checkpoints/flux"], + ) + session.commit() + + assert result.added == [] + assert result.removed == [] + assert set(result.total) == {"models", "checkpoints/flux", "checkpoints"} + + def test_add_tags_to_reference_expands_bucket(self, session: Session): + asset = _make_asset(session, "hash-add") + ref = _make_reference(session, asset) + + result = add_tags_to_reference( + session, + reference_id=ref.id, + tags=["loras/style/v2"], + ) + session.commit() + + assert set(result.added) == {"loras/style/v2", "loras"} + stored = get_reference_tags(session, reference_id=ref.id) + assert "loras" in stored + assert "loras/style/v2" in stored + + def test_add_tags_does_not_duplicate_existing_bucket(self, session: Session): + asset = _make_asset(session, "hash-dedupe") + ref = _make_reference(session, asset) + + add_tags_to_reference( + session, reference_id=ref.id, tags=["models", "checkpoints"] + ) + result = add_tags_to_reference( + session, reference_id=ref.id, tags=["checkpoints/flux"] + ) + session.commit() + + # `checkpoints` was already there from the first add; only the + # slash-joined token is genuinely new. + assert result.added == ["checkpoints/flux"] + assert "checkpoints" in result.already_present + + def test_flat_category_is_unaffected(self, session: Session): + asset = _make_asset(session, "hash-flat") + ref = _make_reference(session, asset) + + result = set_reference_tags( + session, + reference_id=ref.id, + tags=["models", "checkpoints"], + ) + session.commit() + + assert set(result.total) == {"models", "checkpoints"} + assert get_reference_tags(session, reference_id=ref.id) == [ + "models", + "checkpoints", + ] + + def test_unknown_prefix_passes_through(self, session: Session): + asset = _make_asset(session, "hash-user") + ref = _make_reference(session, asset) + + # `my-org` isn't a registered bucket — the slash-joined user tag + # should not trigger bucket expansion. + result = set_reference_tags( + session, + reference_id=ref.id, + tags=["my-org/team-a"], + ) + session.commit() + + assert result.total == ["my-org/team-a"] + + class TestRemoveTagsFromReference: def test_removes_tags(self, session: Session): asset = _make_asset(session, "hash1") diff --git a/tests-unit/assets_test/services/test_bulk_ingest.py b/tests-unit/assets_test/services/test_bulk_ingest.py index 26e22a01d..4ba6db717 100644 --- a/tests-unit/assets_test/services/test_bulk_ingest.py +++ b/tests-unit/assets_test/services/test_bulk_ingest.py @@ -4,7 +4,7 @@ from pathlib import Path from sqlalchemy.orm import Session -from app.assets.database.models import Asset, AssetReference +from app.assets.database.models import Asset, AssetReference, AssetReferenceTag from app.assets.services.bulk_ingest import SeedAssetSpec, batch_insert_seed_assets @@ -102,6 +102,82 @@ class TestBatchInsertSeedAssets: assert asset.mime_type == expected_mime, f"Expected {expected_mime} for {filename}, got {asset.mime_type}" +class TestBucketPrefixExpansionOnIngest: + """Path-scanning ingest must persist the standalone bucket token for + nested category paths so the FE set-membership filter + (`include_tags=models,checkpoints`) matches assets organized into + subfolders (`models/checkpoints/flux/foo.safetensors`). + """ + + def test_nested_path_inserts_standalone_bucket( + self, session: Session, temp_dir: Path + ): + file_path = temp_dir / "flux.safetensors" + file_path.write_bytes(b"content") + + specs: list[SeedAssetSpec] = [ + { + "abs_path": str(file_path), + "size_bytes": 7, + "mtime_ns": 1234567890000000000, + "info_name": "flux", + # Shape emitted by get_name_and_tags_from_asset_path for a + # nested model path. + "tags": ["models", "checkpoints/flux"], + "fname": "flux.safetensors", + "metadata": None, + "hash": None, + "mime_type": "application/safetensors", + } + ] + + result = batch_insert_seed_assets(session, specs=specs, owner_id="") + + assert result.inserted_refs == 1 + ref = session.query(AssetReference).filter_by(name="flux").one() + stored = [ + row.tag_name + for row in session.query(AssetReferenceTag) + .filter_by(asset_reference_id=ref.id) + .order_by(AssetReferenceTag.added_at.asc()) + .all() + ] + assert stored == ["models", "checkpoints/flux", "checkpoints"] + + def test_flat_path_remains_two_tags( + self, session: Session, temp_dir: Path + ): + file_path = temp_dir / "vanilla.safetensors" + file_path.write_bytes(b"content") + + specs: list[SeedAssetSpec] = [ + { + "abs_path": str(file_path), + "size_bytes": 7, + "mtime_ns": 1234567890000000000, + "info_name": "vanilla", + "tags": ["models", "checkpoints"], + "fname": "vanilla.safetensors", + "metadata": None, + "hash": None, + "mime_type": "application/safetensors", + } + ] + + batch_insert_seed_assets(session, specs=specs, owner_id="") + + ref = session.query(AssetReference).filter_by(name="vanilla").one() + stored = { + row.tag_name + for row in session.query(AssetReferenceTag) + .filter_by(asset_reference_id=ref.id) + .all() + } + # Dedupe means flat layouts don't pick up a redundant `checkpoints` + # row — tag[1] already serves both positional and set-membership. + assert stored == {"models", "checkpoints"} + + class TestMetadataExtraction: def test_extracts_mime_type_for_model_files(self, temp_dir: Path): """Verify metadata extraction returns correct mime_type for model files.""" diff --git a/tests-unit/assets_test/services/test_path_utils.py b/tests-unit/assets_test/services/test_path_utils.py index 3fa905f9a..210aa6230 100644 --- a/tests-unit/assets_test/services/test_path_utils.py +++ b/tests-unit/assets_test/services/test_path_utils.py @@ -6,7 +6,11 @@ from unittest.mock import patch import pytest -from app.assets.services.path_utils import get_asset_category_and_relative_path +from app.assets.services.path_utils import ( + get_asset_category_and_relative_path, + get_name_and_tags_from_asset_path, + resolve_destination_from_tags, +) @pytest.fixture @@ -38,6 +42,50 @@ def fake_dirs(): } +@pytest.fixture +def fake_dirs_multi_bucket(): + """Variant fixture with multiple model buckets (checkpoints + diffusers + loras).""" + with tempfile.TemporaryDirectory() as root: + root_path = Path(root) + input_dir = root_path / "input" + output_dir = root_path / "output" + temp_dir = root_path / "temp" + checkpoints_dir = root_path / "models" / "checkpoints" + diffusers_dir = root_path / "models" / "diffusers" + loras_dir = root_path / "models" / "loras" + for d in ( + input_dir, + output_dir, + temp_dir, + checkpoints_dir, + diffusers_dir, + loras_dir, + ): + d.mkdir(parents=True) + + with patch("app.assets.services.path_utils.folder_paths") as mock_fp: + mock_fp.get_input_directory.return_value = str(input_dir) + mock_fp.get_output_directory.return_value = str(output_dir) + mock_fp.get_temp_directory.return_value = str(temp_dir) + + with patch( + "app.assets.services.path_utils.get_comfy_models_folders", + return_value=[ + ("checkpoints", [str(checkpoints_dir)]), + ("diffusers", [str(diffusers_dir)]), + ("loras", [str(loras_dir)]), + ], + ): + yield { + "input": input_dir, + "output": output_dir, + "temp": temp_dir, + "checkpoints": checkpoints_dir, + "diffusers": diffusers_dir, + "loras": loras_dir, + } + + class TestGetAssetCategoryAndRelativePath: def test_input_file(self, fake_dirs): f = fake_dirs["input"] / "photo.png" @@ -79,3 +127,161 @@ class TestGetAssetCategoryAndRelativePath: def test_unknown_path_raises(self, fake_dirs): with pytest.raises(ValueError, match="not within"): get_asset_category_and_relative_path("/some/random/path.png") + + +class TestGetNameAndTagsFromAssetPath: + """tags collapse the parent subpath into a single slash-joined tag. + + Consumers should be able to read ``tags[1]`` as a stable category + identifier regardless of how deep the file lives in the bucket. + """ + + def test_flat_input(self, fake_dirs_multi_bucket): + f = fake_dirs_multi_bucket["input"] / "photo.png" + f.touch() + name, tags = get_name_and_tags_from_asset_path(str(f)) + assert name == "photo.png" + assert tags == ["input"] + + def test_flat_output(self, fake_dirs_multi_bucket): + f = fake_dirs_multi_bucket["output"] / "result_00001.png" + f.touch() + name, tags = get_name_and_tags_from_asset_path(str(f)) + assert name == "result_00001.png" + assert tags == ["output"] + + def test_flat_models_checkpoint(self, fake_dirs_multi_bucket): + f = fake_dirs_multi_bucket["checkpoints"] / "flux.safetensors" + f.touch() + name, tags = get_name_and_tags_from_asset_path(str(f)) + assert name == "flux.safetensors" + assert tags == ["models", "checkpoints"] + + def test_diffusers_nested_subpath_slash_joined(self, fake_dirs_multi_bucket): + """Diffusers components live in nested directories — the full subpath + must collapse into one tag so consumers can look up the model category + via tags[1] regardless of nesting depth. + + The subpath is lowercased to match the canonicalization + :func:`ensure_tags_exist` applies on the write side; without that, + the asset_reference_tags.tag_name FK to tags.name would fail for + any path containing uppercase letters. + """ + nested = ( + fake_dirs_multi_bucket["diffusers"] + / "Kolors" + / "text_encoder" + ) + nested.mkdir(parents=True) + f = nested / "model.safetensors" + f.touch() + name, tags = get_name_and_tags_from_asset_path(str(f)) + assert name == "model.safetensors" + assert tags == ["models", "diffusers/kolors/text_encoder"] + + def test_deep_lora_user_subpath_slash_joined(self, fake_dirs_multi_bucket): + """User-created subdirectories under a model bucket also collapse to a + single tag rather than one tag per directory.""" + nested = ( + fake_dirs_multi_bucket["loras"] + / "my" + / "custom" + / "path" + ) + nested.mkdir(parents=True) + f = nested / "v0001.safetensors" + f.touch() + name, tags = get_name_and_tags_from_asset_path(str(f)) + assert name == "v0001.safetensors" + assert tags == ["models", "loras/my/custom/path"] + + +class TestResolveDestinationFromTags: + """resolve_destination_from_tags must accept both the legacy + one-tag-per-directory shape and the new slash-joined shape so that an + upload using the tags it just read back from /api/assets round-trips + to the right on-disk destination. + """ + + @pytest.fixture + def resolve_dirs(self): + with tempfile.TemporaryDirectory() as root: + root_path = Path(root) + input_dir = root_path / "input" + output_dir = root_path / "output" + checkpoints_dir = root_path / "models" / "checkpoints" + diffusers_dir = root_path / "models" / "diffusers" + loras_dir = root_path / "models" / "loras" + for d in (input_dir, output_dir, checkpoints_dir, diffusers_dir, loras_dir): + d.mkdir(parents=True) + with patch("app.assets.services.path_utils.folder_paths") as mock_fp: + mock_fp.get_input_directory.return_value = str(input_dir) + mock_fp.get_output_directory.return_value = str(output_dir) + mock_fp.folder_names_and_paths = { + "checkpoints": ([str(checkpoints_dir)], None), + "diffusers": ([str(diffusers_dir)], None), + "loras": ([str(loras_dir)], None), + } + yield { + "input": input_dir, + "output": output_dir, + "checkpoints": checkpoints_dir, + "diffusers": diffusers_dir, + "loras": loras_dir, + } + + def test_models_flat_category(self, resolve_dirs): + base, subdirs = resolve_destination_from_tags(["models", "checkpoints"]) + assert base == str(resolve_dirs["checkpoints"]) + assert subdirs == [] + + def test_models_slash_joined_new_shape(self, resolve_dirs): + # The shape get_name_and_tags_from_asset_path now emits. + base, subdirs = resolve_destination_from_tags( + ["models", "diffusers/kolors/text_encoder"] + ) + assert base == str(resolve_dirs["diffusers"]) + assert subdirs == ["kolors", "text_encoder"] + + def test_models_legacy_one_tag_per_dir(self, resolve_dirs): + # The legacy shape must still resolve identically. + base, subdirs = resolve_destination_from_tags( + ["models", "diffusers", "kolors", "text_encoder"] + ) + assert base == str(resolve_dirs["diffusers"]) + assert subdirs == ["kolors", "text_encoder"] + + def test_models_loras_slash_joined(self, resolve_dirs): + base, subdirs = resolve_destination_from_tags( + ["models", "loras/my/custom/path"] + ) + assert base == str(resolve_dirs["loras"]) + assert subdirs == ["my", "custom", "path"] + + def test_input_no_subdir(self, resolve_dirs): + base, subdirs = resolve_destination_from_tags(["input"]) + assert base == str(resolve_dirs["input"]) + assert subdirs == [] + + def test_input_slash_joined_subdir(self, resolve_dirs): + base, subdirs = resolve_destination_from_tags(["input", "portraits/2026"]) + assert base == str(resolve_dirs["input"]) + assert subdirs == ["portraits", "2026"] + + def test_output_slash_joined_subdir(self, resolve_dirs): + base, subdirs = resolve_destination_from_tags(["output", "runs/abc"]) + assert base == str(resolve_dirs["output"]) + assert subdirs == ["runs", "abc"] + + def test_unknown_category_rejected(self, resolve_dirs): + with pytest.raises(ValueError, match="unknown model category"): + resolve_destination_from_tags(["models", "not_a_real_category"]) + + def test_unknown_category_via_slash_joined(self, resolve_dirs): + # First segment of a slash-joined tag must still match a registered category. + with pytest.raises(ValueError, match="unknown model category 'bogus'"): + resolve_destination_from_tags(["models", "bogus/sub/path"]) + + def test_traversal_in_subdir_rejected(self, resolve_dirs): + with pytest.raises(ValueError, match="invalid path component"): + resolve_destination_from_tags(["models", "checkpoints/..", "evil"]) diff --git a/tests-unit/assets_test/test_assets_missing_sync.py b/tests-unit/assets_test/test_assets_missing_sync.py index 47dc130cb..f87846a1c 100644 --- a/tests-unit/assets_test/test_assets_missing_sync.py +++ b/tests-unit/assets_test/test_assets_missing_sync.py @@ -32,7 +32,7 @@ def test_seed_asset_removed_when_file_is_deleted( # Verify it is visible via API and carries no hash (seed) r1 = http.get( api_base + "/api/assets", - params={"include_tags": "unit-tests,syncseed", "name_contains": name}, + params={"include_tags": "unit-tests/syncseed", "name_contains": name}, timeout=120, ) body1 = r1.json() @@ -52,7 +52,7 @@ def test_seed_asset_removed_when_file_is_deleted( # It should disappear (AssetInfo and seed Asset gone) r2 = http.get( api_base + "/api/assets", - params={"include_tags": "unit-tests,syncseed", "name_contains": name}, + params={"include_tags": "unit-tests/syncseed", "name_contains": name}, timeout=120, ) body2 = r2.json() @@ -332,7 +332,7 @@ def test_fastpass_removes_stale_state_row_no_missing( rl = http.get( api_base + "/api/assets", - params={"include_tags": f"unit-tests,{scope}"}, + params={"include_tags": f"unit-tests/{scope}"}, timeout=120, ) bl = rl.json() diff --git a/tests-unit/assets_test/test_crud.py b/tests-unit/assets_test/test_crud.py index 07310223e..7fc1139fe 100644 --- a/tests-unit/assets_test/test_crud.py +++ b/tests-unit/assets_test/test_crud.py @@ -280,9 +280,15 @@ def test_metadata_filename_is_set_for_seed_asset_without_hash( trigger_sync_seed_assets(http, api_base) + # Scanner emits tags as ``[root, "//..."]`` — the second tag + # is the slash-joined parent subpath. For ``/unit-tests//a/b/`` + # the second tag is ``"unit-tests//a/b"``. r1 = http.get( api_base + "/api/assets", - params={"include_tags": f"unit-tests,{scope}", "name_contains": name}, + params={ + "include_tags": f"unit-tests/{scope}/a/b", + "name_contains": name, + }, timeout=120, ) body = r1.json() diff --git a/tests-unit/assets_test/test_helpers.py b/tests-unit/assets_test/test_helpers.py new file mode 100644 index 000000000..c950b726b --- /dev/null +++ b/tests-unit/assets_test/test_helpers.py @@ -0,0 +1,69 @@ +"""Unit tests for app.assets.helpers.""" + +from app.assets.helpers import expand_bucket_prefixes + + +class TestExpandBucketPrefixes: + def test_flat_category_unchanged(self): + # `checkpoints` is already a standalone token, no expansion needed. + assert expand_bucket_prefixes(["models", "checkpoints"]) == [ + "models", + "checkpoints", + ] + + def test_nested_category_inserts_bucket(self): + # Path-derived shape for `models/checkpoints/flux/foo.safetensors` — + # the standalone bucket has to be present so the FE set-membership + # filter (`include_tags=models,checkpoints`) matches the asset. + assert expand_bucket_prefixes(["models", "checkpoints/flux"]) == [ + "models", + "checkpoints/flux", + "checkpoints", + ] + + def test_deeply_nested_only_first_segment_expands(self): + # Only the FIRST slash segment ever gets emitted as a standalone — + # intermediate path segments don't have routing significance. + assert expand_bucket_prefixes( + ["models", "diffusers/kolors/text_encoder"] + ) == ["models", "diffusers/kolors/text_encoder", "diffusers"] + + def test_unknown_prefix_does_not_expand(self): + # Free-form user labels with slashes whose first segment is not a + # registered bucket pass through opaquely. + assert expand_bucket_prefixes(["models", "my-org/team-a"]) == [ + "models", + "my-org/team-a", + ] + + def test_idempotent(self): + # Re-applying the helper is a no-op once the bucket is in the set. + expanded = expand_bucket_prefixes(["models", "checkpoints/flux"]) + assert expand_bucket_prefixes(expanded) == expanded + + def test_does_not_duplicate_existing_bucket(self): + # If the caller already supplied the standalone bucket, don't add a + # second copy. + assert expand_bucket_prefixes( + ["models", "checkpoints/flux", "checkpoints"] + ) == ["models", "checkpoints/flux", "checkpoints"] + + def test_preserves_caller_order(self): + # User tags after path tags must stay after; the inserted bucket + # token slots in immediately after its slash-joined parent so the + # microsecond stagger lands it at path-tier before user-tier. + assert expand_bucket_prefixes( + ["models", "loras/style", "favorite", "v2"] + ) == ["models", "loras/style", "loras", "favorite", "v2"] + + def test_empty_input(self): + assert expand_bucket_prefixes([]) == [] + + def test_input_root_with_subpath_no_expansion(self): + # `portraits` isn't a registered model category, so the input + # subpath stays opaque (FE filter doesn't have a checkpoint-loader + # analogue for input subfolders). + assert expand_bucket_prefixes(["input", "portraits/2026"]) == [ + "input", + "portraits/2026", + ] diff --git a/tests-unit/assets_test/test_prune_orphaned_assets.py b/tests-unit/assets_test/test_prune_orphaned_assets.py index 1fbd4d4e2..8e26697f3 100644 --- a/tests-unit/assets_test/test_prune_orphaned_assets.py +++ b/tests-unit/assets_test/test_prune_orphaned_assets.py @@ -29,7 +29,10 @@ def create_seed_file(comfy_tmp_base_dir: Path): def find_asset(http: requests.Session, api_base: str): """Query API for assets matching scope and optional name.""" def _find(scope: str, name: str | None = None) -> list[dict]: - params = {"include_tags": f"unit-tests,{scope}"} + # Scanner now emits tags as ``[root, "//..."]`` rather than + # one tag per directory. For files at ``/unit-tests//...`` + # the second tag is exactly ``"unit-tests/"``. + params = {"include_tags": f"unit-tests/{scope}"} if name: params["name_contains"] = name r = http.get(f"{api_base}/api/assets", params=params, timeout=120) @@ -138,4 +141,7 @@ def test_special_chars_in_path_escaped_correctly( trigger_sync_seed_assets(http, api_base) trigger_sync_seed_assets(http, api_base) - assert find_asset(scope.split("/")[0], fp.name), "Asset with special chars should survive" + # Scanner emits the full parent subpath as a single slash-joined tag, so + # the lookup tag is ``unit-tests/`` even when itself + # contains a slash (parent + special-char dirname). + assert find_asset(scope, fp.name), "Asset with special chars should survive" diff --git a/tests-unit/assets_test/test_user_tag_http_smoke.py b/tests-unit/assets_test/test_user_tag_http_smoke.py new file mode 100644 index 000000000..c461f5a05 --- /dev/null +++ b/tests-unit/assets_test/test_user_tag_http_smoke.py @@ -0,0 +1,135 @@ +"""HTTP-layer smoke test: user-added tags via POST /api/assets/{id}/tags +land after path tags when read back via GET /api/assets. + +Exercises the full route handler -> service -> query path that the unit +tests at tests-unit/assets_test/queries/test_asset_info.py only cover at +the service layer. +""" +import json + +import pytest +import requests + + +@pytest.fixture +def smoke_asset(http: requests.Session, api_base: str): + """Upload a single asset into models/checkpoints/unit-tests/smoke + and delete it on teardown.""" + name = "smoke_user_tag.safetensors" + tags = ["models", "checkpoints", "unit-tests", "smoke"] + files = {"file": (name, b"S" * 4096, "application/octet-stream")} + form_data = { + "tags": json.dumps(tags), + "name": name, + "user_metadata": json.dumps({}), + } + r = http.post(api_base + "/api/assets", files=files, data=form_data, timeout=120) + assert r.status_code == 201, r.text + body = r.json() + yield body + http.delete( + f"{api_base}/api/assets/{body['id']}?delete_content=true", timeout=30 + ) + + +def _fetch_asset_tags(http, api_base, ref_id): + r = http.get(f"{api_base}/api/assets/{ref_id}", timeout=30) + assert r.status_code == 200, r.text + return r.json()["tags"] + + +def test_user_tag_lands_after_path_tags_via_http( + http: requests.Session, api_base: str, smoke_asset: dict +): + ref_id = smoke_asset["id"] + + initial_tags = _fetch_asset_tags(http, api_base, ref_id) + # Path tags should already be at the front in upload order. + assert initial_tags[:2] == ["models", "checkpoints"] + + # Add a user tag that would jump to position 0 under alphabetical sort. + r = http.post( + f"{api_base}/api/assets/{ref_id}/tags", + json={"tags": ["aaa-user-tag"]}, + timeout=30, + ) + assert r.status_code in (200, 201), r.text + + tags_after = _fetch_asset_tags(http, api_base, ref_id) + # Path tags must still be at the front; user tag goes to the end. + assert tags_after[0] == "models" + assert tags_after[1] == "checkpoints" + assert "aaa-user-tag" in tags_after + assert tags_after[-1] == "aaa-user-tag" + + +def test_user_tag_batch_lands_after_path_tags_via_http( + http: requests.Session, api_base: str, smoke_asset: dict +): + ref_id = smoke_asset["id"] + + # Add three user tags in a single request, in non-alphabetical input + # order. They should all land after the path tags (microsecond stagger + # in set_reference_tags / add_tags_to_reference is what makes this + # work — without it, "aaa" would jump to position 0). + r = http.post( + f"{api_base}/api/assets/{ref_id}/tags", + json={"tags": ["zzz-z", "favorite", "aaa-experiment"]}, + timeout=30, + ) + assert r.status_code in (200, 201), r.text + + tags_after = _fetch_asset_tags(http, api_base, ref_id) + assert tags_after[0] == "models" + assert tags_after[1] == "checkpoints" + user_tail = tags_after[len({"models", "checkpoints", "unit-tests", "smoke"}):] + assert set(user_tail) >= {"zzz-z", "favorite", "aaa-experiment"} + # Critically: alphabetical sort would put 'aaa-experiment' at position 0. + assert tags_after.index("aaa-experiment") > tags_after.index("models") + assert tags_after.index("aaa-experiment") > tags_after.index("checkpoints") + + +@pytest.fixture +def nested_checkpoint_asset(http: requests.Session, api_base: str): + """Upload a checkpoint at the slash-joined path shape cloud emits + (`models/checkpoints/flux/...`), then delete it on teardown. + """ + name = "nested_checkpoint.safetensors" + tags = ["models", "checkpoints/flux"] + files = {"file": (name, b"S" * 4096, "application/octet-stream")} + form_data = { + "tags": json.dumps(tags), + "name": name, + "user_metadata": json.dumps({}), + } + r = http.post(api_base + "/api/assets", files=files, data=form_data, timeout=120) + assert r.status_code == 201, r.text + body = r.json() + yield body + http.delete( + f"{api_base}/api/assets/{body['id']}?delete_content=true", timeout=30 + ) + + +def test_nested_checkpoint_satisfies_fe_set_filter( + http: requests.Session, api_base: str, nested_checkpoint_asset: dict +): + """The case Simon flagged: a nested-path checkpoint must still match + `include_tags=models,checkpoints` — the FE combo-widget filter. + """ + ref_id = nested_checkpoint_asset["id"] + + stored = _fetch_asset_tags(http, api_base, ref_id) + # tag[1] keeps cloud's slash-joined positional contract; tag[2] holds + # the standalone bucket the FE filter looks for. + assert stored[:3] == ["models", "checkpoints/flux", "checkpoints"] + + # The actual FE query — exact set-membership across both tokens. + r = http.get( + f"{api_base}/api/assets", + params=[("include_tags", "models"), ("include_tags", "checkpoints")], + timeout=30, + ) + assert r.status_code == 200, r.text + returned_ids = {a["id"] for a in r.json()["assets"]} + assert ref_id in returned_ids