diff --git a/app/assets/database/queries/__init__.py b/app/assets/database/queries/__init__.py index a24f82f3c..4edaed318 100644 --- a/app/assets/database/queries/__init__.py +++ b/app/assets/database/queries/__init__.py @@ -52,48 +52,48 @@ from app.assets.database.queries.tags import ( ) __all__ = [ - "asset_exists_by_hash", - "get_asset_by_hash", - "upsert_asset", - "bulk_insert_assets", - "asset_info_exists_for_asset_id", - "get_asset_info_by_id", - "insert_asset_info", - "get_or_create_asset_info", - "update_asset_info_timestamps", - "list_asset_infos_page", - "fetch_asset_info_asset_and_tags", - "fetch_asset_info_and_asset", - "update_asset_info_access_time", - "update_asset_info_name", - "update_asset_info_updated_at", - "set_asset_info_metadata", - "delete_asset_info_by_id", - "set_asset_info_preview", - "bulk_insert_asset_infos_ignore_conflicts", - "get_asset_info_ids_by_ids", - "CacheStateRow", - "list_cache_states_by_asset_id", - "upsert_cache_state", - "delete_cache_states_outside_prefixes", - "get_orphaned_seed_asset_ids", - "delete_assets_by_ids", - "get_cache_states_for_prefixes", - "bulk_set_needs_verify", - "delete_cache_states_by_ids", - "delete_orphaned_seed_asset", - "bulk_insert_cache_states_ignore_conflicts", - "get_cache_states_by_paths_and_asset_ids", - "ensure_tags_exist", - "get_asset_tags", - "set_asset_info_tags", - "add_tags_to_asset_info", - "remove_tags_from_asset_info", - "add_missing_tag_for_asset_id", - "remove_missing_tag_for_asset_id", - "list_tags_with_usage", - "bulk_insert_tags_and_meta", "AddTagsDict", + "CacheStateRow", "RemoveTagsDict", "SetTagsDict", + "add_missing_tag_for_asset_id", + "add_tags_to_asset_info", + "asset_exists_by_hash", + "asset_info_exists_for_asset_id", + "bulk_insert_asset_infos_ignore_conflicts", + "bulk_insert_assets", + "bulk_insert_cache_states_ignore_conflicts", + "bulk_insert_tags_and_meta", + "bulk_set_needs_verify", + "delete_asset_info_by_id", + "delete_assets_by_ids", + "delete_cache_states_by_ids", + "delete_cache_states_outside_prefixes", + "delete_orphaned_seed_asset", + "ensure_tags_exist", + "fetch_asset_info_and_asset", + "fetch_asset_info_asset_and_tags", + "get_asset_by_hash", + "get_asset_info_by_id", + "get_asset_info_ids_by_ids", + "get_asset_tags", + "get_cache_states_by_paths_and_asset_ids", + "get_cache_states_for_prefixes", + "get_or_create_asset_info", + "get_orphaned_seed_asset_ids", + "insert_asset_info", + "list_asset_infos_page", + "list_cache_states_by_asset_id", + "list_tags_with_usage", + "remove_missing_tag_for_asset_id", + "remove_tags_from_asset_info", + "set_asset_info_metadata", + "set_asset_info_preview", + "set_asset_info_tags", + "update_asset_info_access_time", + "update_asset_info_name", + "update_asset_info_timestamps", + "update_asset_info_updated_at", + "upsert_asset", + "upsert_cache_state", ] diff --git a/app/assets/scanner.py b/app/assets/scanner.py index a5cceb88a..5cc9096cb 100644 --- a/app/assets/scanner.py +++ b/app/assets/scanner.py @@ -2,31 +2,27 @@ import contextlib import logging import os import time -import uuid from typing import Literal -from sqlalchemy.orm import Session - import folder_paths from app.assets.database.queries import ( add_missing_tag_for_asset_id, - bulk_insert_asset_infos_ignore_conflicts, - bulk_insert_assets, - bulk_insert_cache_states_ignore_conflicts, - bulk_insert_tags_and_meta, bulk_set_needs_verify, - delete_assets_by_ids, delete_cache_states_by_ids, - delete_cache_states_outside_prefixes, delete_orphaned_seed_asset, ensure_tags_exist, - get_asset_info_ids_by_ids, - get_cache_states_by_paths_and_asset_ids, get_cache_states_for_prefixes, - get_orphaned_seed_asset_ids, remove_missing_tag_for_asset_id, ) -from app.assets.helpers import get_utc_now +from app.assets.services.bulk_ingest import ( + batch_insert_seed_assets, + prune_orphaned_assets, +) +from app.assets.services.file_utils import ( + get_mtime_ns, + list_files_recursively, + verify_file_unchanged, +) from app.assets.services.path_utils import ( compute_relative_filename, get_comfy_models_folders, @@ -37,37 +33,6 @@ from app.database.db import create_session, dependencies_available RootType = Literal["models", "input", "output"] -def verify_asset_file_unchanged( - mtime_db: int | None, - size_db: int | None, - stat_result: os.stat_result, -) -> bool: - if mtime_db is None: - return False - actual_mtime_ns = getattr( - stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1_000_000_000) - ) - if int(mtime_db) != int(actual_mtime_ns): - return False - sz = int(size_db or 0) - if sz > 0: - return int(stat_result.st_size) == sz - return True - - -def list_files_recursively(base_dir: str) -> list[str]: - out: list[str] = [] - base_abs = os.path.abspath(base_dir) - if not os.path.isdir(base_abs): - return out - for dirpath, _subdirs, filenames in os.walk( - base_abs, topdown=True, followlinks=False - ): - for name in filenames: - out.append(os.path.abspath(os.path.join(dirpath, name))) - return out - - def get_prefixes_for_root(root: RootType) -> list[str]: if root == "models": bases: list[str] = [] @@ -102,170 +67,6 @@ def collect_models_files() -> list[str]: return out -def _batch_insert_assets_from_paths( - session: Session, - specs: list[dict], - owner_id: str = "", -) -> dict: - """Seed assets from filesystem specs in batch. - - Each spec is a dict with keys: - - abs_path: str - - size_bytes: int - - mtime_ns: int - - info_name: str - - tags: list[str] - - fname: Optional[str] - - This function orchestrates: - 1. Insert seed Assets (hash=NULL) - 2. Claim cache states with ON CONFLICT DO NOTHING - 3. Query to find winners (paths where our asset_id was inserted) - 4. Delete Assets for losers (path already claimed by another asset) - 5. Insert AssetInfo for winners - 6. Insert tags and metadata for successfully inserted AssetInfos - - Returns: - dict with keys: inserted_infos, won_states, lost_states - """ - if not specs: - return {"inserted_infos": 0, "won_states": 0, "lost_states": 0} - - now = get_utc_now() - asset_rows: list[dict] = [] - state_rows: list[dict] = [] - path_to_asset: dict[str, str] = {} - asset_to_info: dict[str, dict] = {} - path_list: list[str] = [] - - for sp in specs: - ap = os.path.abspath(sp["abs_path"]) - aid = str(uuid.uuid4()) - iid = str(uuid.uuid4()) - path_list.append(ap) - path_to_asset[ap] = aid - - asset_rows.append( - { - "id": aid, - "hash": None, - "size_bytes": sp["size_bytes"], - "mime_type": None, - "created_at": now, - } - ) - state_rows.append( - { - "asset_id": aid, - "file_path": ap, - "mtime_ns": sp["mtime_ns"], - } - ) - asset_to_info[aid] = { - "id": iid, - "owner_id": owner_id, - "name": sp["info_name"], - "asset_id": aid, - "preview_id": None, - "user_metadata": {"filename": sp["fname"]} if sp["fname"] else None, - "created_at": now, - "updated_at": now, - "last_access_time": now, - "_tags": sp["tags"], - "_filename": sp["fname"], - } - - bulk_insert_assets(session, asset_rows) - bulk_insert_cache_states_ignore_conflicts(session, state_rows) - winners_by_path = get_cache_states_by_paths_and_asset_ids(session, path_to_asset) - - all_paths_set = set(path_list) - losers_by_path = all_paths_set - winners_by_path - lost_assets = [path_to_asset[p] for p in losers_by_path] - - if lost_assets: - delete_assets_by_ids(session, lost_assets) - - if not winners_by_path: - return { - "inserted_infos": 0, - "won_states": 0, - "lost_states": len(losers_by_path), - } - - winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path] - db_info_rows = [ - { - "id": row["id"], - "owner_id": row["owner_id"], - "name": row["name"], - "asset_id": row["asset_id"], - "preview_id": row["preview_id"], - "user_metadata": row["user_metadata"], - "created_at": row["created_at"], - "updated_at": row["updated_at"], - "last_access_time": row["last_access_time"], - } - for row in winner_info_rows - ] - bulk_insert_asset_infos_ignore_conflicts(session, db_info_rows) - - all_info_ids = [row["id"] for row in winner_info_rows] - inserted_info_ids = get_asset_info_ids_by_ids(session, all_info_ids) - - tag_rows: list[dict] = [] - meta_rows: list[dict] = [] - if inserted_info_ids: - for row in winner_info_rows: - iid = row["id"] - if iid not in inserted_info_ids: - continue - for t in row["_tags"]: - tag_rows.append( - { - "asset_info_id": iid, - "tag_name": t, - "origin": "automatic", - "added_at": now, - } - ) - if row["_filename"]: - meta_rows.append( - { - "asset_info_id": iid, - "key": "filename", - "ordinal": 0, - "val_str": row["_filename"], - "val_num": None, - "val_bool": None, - "val_json": None, - } - ) - - bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows) - - return { - "inserted_infos": len(inserted_info_ids), - "won_states": len(winners_by_path), - "lost_states": len(losers_by_path), - } - - -def prune_orphaned_assets(session, valid_prefixes: list[str]) -> int: - """Prune cache states outside valid prefixes, then delete orphaned seed assets. - - Args: - session: Database session - valid_prefixes: List of absolute directory prefixes that are valid - - Returns: - Number of orphaned assets deleted - """ - delete_cache_states_outside_prefixes(session, valid_prefixes) - orphan_ids = get_orphaned_seed_asset_ids(session) - return delete_assets_by_ids(session, orphan_ids) - - def sync_cache_states_with_filesystem( session, root: RootType, @@ -305,7 +106,7 @@ def sync_cache_states_with_filesystem( fast_ok = False try: exists = True - fast_ok = verify_asset_file_unchanged( + fast_ok = verify_file_unchanged( mtime_db=row.mtime_ns, size_db=acc["size_db"], stat_result=os.stat(row.file_path, follow_symlinks=True), @@ -447,9 +248,7 @@ def _build_asset_specs( { "abs_path": abs_p, "size_bytes": stat_p.st_size, - "mtime_ns": getattr( - stat_p, "st_mtime_ns", int(stat_p.st_mtime * 1_000_000_000) - ), + "mtime_ns": get_mtime_ns(stat_p), "info_name": name, "tags": tags, "fname": compute_relative_filename(abs_p), @@ -467,9 +266,9 @@ def _insert_asset_specs(specs: list[dict], tag_pool: set[str]) -> int: with create_session() as sess: if tag_pool: ensure_tags_exist(sess, tag_pool, tag_type="user") - result = _batch_insert_assets_from_paths(sess, specs=specs, owner_id="") + result = batch_insert_seed_assets(sess, specs=specs, owner_id="") sess.commit() - return result["inserted_infos"] + return result.inserted_infos def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None: diff --git a/app/assets/services/__init__.py b/app/assets/services/__init__.py index a35c66b8a..843be2ff1 100644 --- a/app/assets/services/__init__.py +++ b/app/assets/services/__init__.py @@ -8,6 +8,17 @@ from app.assets.services.asset_management import ( set_asset_preview, update_asset_metadata, ) +from app.assets.services.bulk_ingest import ( + BulkInsertResult, + batch_insert_seed_assets, + prune_orphaned_assets, +) +from app.assets.services.file_utils import ( + get_mtime_ns, + get_size_and_mtime_ns, + list_files_recursively, + verify_file_unchanged, +) from app.assets.services.ingest import ( DependencyMissingError, HashMismatchError, @@ -39,29 +50,15 @@ from app.assets.services.tagging import ( ) __all__ = [ - "ingest_file_from_path", - "register_existing_asset", - "upload_from_temp_path", - "create_from_hash", - "HashMismatchError", - "DependencyMissingError", - "asset_exists", - "get_asset_by_hash", - "get_asset_detail", - "list_assets_page", - "resolve_asset_for_download", - "update_asset_metadata", - "delete_asset_reference", - "set_asset_preview", - "apply_tags", - "remove_tags", - "list_tags", "AddTagsResult", "AssetData", "AssetDetailResult", "AssetInfoData", "AssetSummaryData", + "BulkInsertResult", + "DependencyMissingError", "DownloadResolutionResult", + "HashMismatchError", "IngestResult", "ListAssetsResult", "RegisterAssetResult", @@ -70,4 +67,25 @@ __all__ = [ "TagUsage", "UploadResult", "UserMetadata", + "apply_tags", + "asset_exists", + "batch_insert_seed_assets", + "create_from_hash", + "delete_asset_reference", + "get_asset_by_hash", + "get_asset_detail", + "get_mtime_ns", + "get_size_and_mtime_ns", + "ingest_file_from_path", + "list_assets_page", + "list_files_recursively", + "list_tags", + "prune_orphaned_assets", + "register_existing_asset", + "remove_tags", + "resolve_asset_for_download", + "set_asset_preview", + "update_asset_metadata", + "upload_from_temp_path", + "verify_file_unchanged", ] diff --git a/app/assets/services/bulk_ingest.py b/app/assets/services/bulk_ingest.py new file mode 100644 index 000000000..d786b967f --- /dev/null +++ b/app/assets/services/bulk_ingest.py @@ -0,0 +1,191 @@ +import os +import uuid +from dataclasses import dataclass + +from sqlalchemy.orm import Session + +from app.assets.database.queries import ( + bulk_insert_asset_infos_ignore_conflicts, + bulk_insert_assets, + bulk_insert_cache_states_ignore_conflicts, + bulk_insert_tags_and_meta, + delete_assets_by_ids, + delete_cache_states_outside_prefixes, + get_asset_info_ids_by_ids, + get_cache_states_by_paths_and_asset_ids, + get_orphaned_seed_asset_ids, +) +from app.assets.helpers import get_utc_now + + +@dataclass +class BulkInsertResult: + """Result of bulk asset insertion.""" + + inserted_infos: int + won_states: int + lost_states: int + + +def batch_insert_seed_assets( + session: Session, + specs: list[dict], + owner_id: str = "", +) -> BulkInsertResult: + """Seed assets from filesystem specs in batch. + + Each spec is a dict with keys: + - abs_path: str + - size_bytes: int + - mtime_ns: int + - info_name: str + - tags: list[str] + - fname: Optional[str] + + This function orchestrates: + 1. Insert seed Assets (hash=NULL) + 2. Claim cache states with ON CONFLICT DO NOTHING + 3. Query to find winners (paths where our asset_id was inserted) + 4. Delete Assets for losers (path already claimed by another asset) + 5. Insert AssetInfo for winners + 6. Insert tags and metadata for successfully inserted AssetInfos + + Returns: + BulkInsertResult with inserted_infos, won_states, lost_states + """ + if not specs: + return BulkInsertResult(inserted_infos=0, won_states=0, lost_states=0) + + now = get_utc_now() + asset_rows: list[dict] = [] + state_rows: list[dict] = [] + path_to_asset: dict[str, str] = {} + asset_to_info: dict[str, dict] = {} + path_list: list[str] = [] + + for sp in specs: + ap = os.path.abspath(sp["abs_path"]) + aid = str(uuid.uuid4()) + iid = str(uuid.uuid4()) + path_list.append(ap) + path_to_asset[ap] = aid + + asset_rows.append( + { + "id": aid, + "hash": None, + "size_bytes": sp["size_bytes"], + "mime_type": None, + "created_at": now, + } + ) + state_rows.append( + { + "asset_id": aid, + "file_path": ap, + "mtime_ns": sp["mtime_ns"], + } + ) + asset_to_info[aid] = { + "id": iid, + "owner_id": owner_id, + "name": sp["info_name"], + "asset_id": aid, + "preview_id": None, + "user_metadata": {"filename": sp["fname"]} if sp["fname"] else None, + "created_at": now, + "updated_at": now, + "last_access_time": now, + "_tags": sp["tags"], + "_filename": sp["fname"], + } + + bulk_insert_assets(session, asset_rows) + bulk_insert_cache_states_ignore_conflicts(session, state_rows) + winners_by_path = get_cache_states_by_paths_and_asset_ids(session, path_to_asset) + + all_paths_set = set(path_list) + losers_by_path = all_paths_set - winners_by_path + lost_assets = [path_to_asset[p] for p in losers_by_path] + + if lost_assets: + delete_assets_by_ids(session, lost_assets) + + if not winners_by_path: + return BulkInsertResult( + inserted_infos=0, + won_states=0, + lost_states=len(losers_by_path), + ) + + winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path] + db_info_rows = [ + { + "id": row["id"], + "owner_id": row["owner_id"], + "name": row["name"], + "asset_id": row["asset_id"], + "preview_id": row["preview_id"], + "user_metadata": row["user_metadata"], + "created_at": row["created_at"], + "updated_at": row["updated_at"], + "last_access_time": row["last_access_time"], + } + for row in winner_info_rows + ] + bulk_insert_asset_infos_ignore_conflicts(session, db_info_rows) + + all_info_ids = [row["id"] for row in winner_info_rows] + inserted_info_ids = get_asset_info_ids_by_ids(session, all_info_ids) + + tag_rows: list[dict] = [] + meta_rows: list[dict] = [] + if inserted_info_ids: + for row in winner_info_rows: + iid = row["id"] + if iid not in inserted_info_ids: + continue + for t in row["_tags"]: + tag_rows.append( + { + "asset_info_id": iid, + "tag_name": t, + "origin": "automatic", + "added_at": now, + } + ) + if row["_filename"]: + meta_rows.append( + { + "asset_info_id": iid, + "key": "filename", + "ordinal": 0, + "val_str": row["_filename"], + "val_num": None, + "val_bool": None, + "val_json": None, + } + ) + + bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows) + + return BulkInsertResult( + inserted_infos=len(inserted_info_ids), + won_states=len(winners_by_path), + lost_states=len(losers_by_path), + ) + + +def prune_orphaned_assets(session: Session, valid_prefixes: list[str]) -> int: + """Prune cache states outside valid prefixes, then delete orphaned seed assets. + + Args: + session: Database session + valid_prefixes: List of absolute directory prefixes that are valid + + Returns: + Number of orphaned assets deleted + """ + delete_cache_states_outside_prefixes(session, valid_prefixes) + orphan_ids = get_orphaned_seed_asset_ids(session) + return delete_assets_by_ids(session, orphan_ids) diff --git a/app/assets/services/file_utils.py b/app/assets/services/file_utils.py new file mode 100644 index 000000000..97bb2ec73 --- /dev/null +++ b/app/assets/services/file_utils.py @@ -0,0 +1,49 @@ +import os + + +def get_mtime_ns(stat_result: os.stat_result) -> int: + """Extract mtime in nanoseconds from a stat result.""" + return getattr( + stat_result, "st_mtime_ns", int(stat_result.st_mtime * 1_000_000_000) + ) + + +def get_size_and_mtime_ns(path: str, follow_symlinks: bool = True) -> tuple[int, int]: + """Get file size in bytes and mtime in nanoseconds.""" + st = os.stat(path, follow_symlinks=follow_symlinks) + return st.st_size, get_mtime_ns(st) + + +def verify_file_unchanged( + mtime_db: int | None, + size_db: int | None, + stat_result: os.stat_result, +) -> bool: + """Check if a file is unchanged based on mtime and size. + + Returns True if the file's mtime and size match the database values. + Returns False if mtime_db is None or values don't match. + """ + if mtime_db is None: + return False + actual_mtime_ns = get_mtime_ns(stat_result) + if int(mtime_db) != int(actual_mtime_ns): + return False + sz = int(size_db or 0) + if sz > 0: + return int(stat_result.st_size) == sz + return True + + +def list_files_recursively(base_dir: str) -> list[str]: + """Recursively list all files in a directory.""" + out: list[str] = [] + base_abs = os.path.abspath(base_dir) + if not os.path.isdir(base_abs): + return out + for dirpath, _subdirs, filenames in os.walk( + base_abs, topdown=True, followlinks=False + ): + for name in filenames: + out.append(os.path.abspath(os.path.join(dirpath, name))) + return out diff --git a/app/assets/services/ingest.py b/app/assets/services/ingest.py index 461d226a6..eb547f9d3 100644 --- a/app/assets/services/ingest.py +++ b/app/assets/services/ingest.py @@ -23,6 +23,7 @@ from app.assets.database.queries import ( upsert_cache_state, ) from app.assets.helpers import normalize_tags +from app.assets.services.file_utils import get_size_and_mtime_ns from app.assets.services.path_utils import ( compute_filename_for_asset, resolve_destination_from_tags, @@ -233,11 +234,6 @@ def _update_metadata_with_filename( ) -def _get_size_mtime_ns(path: str) -> tuple[int, int]: - st = os.stat(path, follow_symlinks=True) - return st.st_size, getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) - - def _sanitize_filename(name: str | None, fallback: str) -> str: n = os.path.basename((name or "").strip() or fallback) return n if n else fallback @@ -320,7 +316,7 @@ def upload_from_temp_path( raise RuntimeError(f"failed to move uploaded file into place: {e}") try: - size_bytes, mtime_ns = _get_size_mtime_ns(dest_abs) + size_bytes, mtime_ns = get_size_and_mtime_ns(dest_abs) except OSError as e: raise RuntimeError(f"failed to stat destination file: {e}")