From 77332d30549845eb5dc9e91de0473f052dd657f6 Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Tue, 16 Sep 2025 14:21:40 +0300 Subject: [PATCH] optimization: fast scan: commit to the DB in chunks --- app/assets_scanner.py | 92 +++++++++++++++++--------------- app/database/services/content.py | 1 + 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/app/assets_scanner.py b/app/assets_scanner.py index 29c77a7c6..1863cff92 100644 --- a/app/assets_scanner.py +++ b/app/assets_scanner.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import logging import os import time @@ -95,45 +96,55 @@ async def schedule_scans(roots: list[schemas_in.RootType]) -> schemas_out.AssetS async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: - for r in roots: - try: - await _fast_db_consistency_pass(r) - except Exception as ex: - LOGGER.exception("fast DB reconciliation failed for %s: %s", r, ex) + t_total = time.perf_counter() + try: + for r in roots: + try: + await _fast_db_consistency_pass(r) + except Exception as ex: + LOGGER.exception("fast DB reconciliation failed for %s: %s", r, ex) - paths: list[str] = [] - if "models" in roots: - paths.extend(collect_models_files()) - if "input" in roots: - paths.extend(list_tree(folder_paths.get_input_directory())) - if "output" in roots: - paths.extend(list_tree(folder_paths.get_output_directory())) + paths: list[str] = [] + if "models" in roots: + paths.extend(collect_models_files()) + if "input" in roots: + paths.extend(list_tree(folder_paths.get_input_directory())) + if "output" in roots: + paths.extend(list_tree(folder_paths.get_output_directory())) - for p in paths: - try: - st = os.stat(p, follow_symlinks=True) - if not int(st.st_size or 0): - continue - size_bytes = int(st.st_size) - mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) - name, tags = get_name_and_tags_from_asset_path(p) - await _seed_one_async(p, size_bytes, mtime_ns, name, tags) - except OSError: - continue + processed = 0 + async with await create_session() as sess: + for p in paths: + try: + st = os.stat(p, follow_symlinks=True) + if not int(st.st_size or 0): + continue + size_bytes = int(st.st_size) + mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) + name, tags = get_name_and_tags_from_asset_path(p) + await ensure_seed_for_path( + sess, + abs_path=p, + size_bytes=size_bytes, + mtime_ns=mtime_ns, + info_name=name, + tags=tags, + owner_id="", + ) -async def _seed_one_async(p: str, size_bytes: int, mtime_ns: int, name: str, tags: list[str]) -> None: - async with await create_session() as sess: - await ensure_seed_for_path( - sess, - abs_path=p, - size_bytes=size_bytes, - mtime_ns=mtime_ns, - info_name=name, - tags=tags, - owner_id="", + processed += 1 + if processed % 500 == 0: + await sess.commit() + except OSError: + continue + await sess.commit() + finally: + LOGGER.info( + "Assets scan(roots=%s) completed in %.3f s", + roots, + time.perf_counter() - t_total, ) - await sess.commit() def _status_response_for(progresses: list[ScanProgress]) -> schemas_out.AssetScanStatusResponse: @@ -482,20 +493,13 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None: if any_fast_ok: # Remove 'missing' and delete just the stale state rows for st in missing_states: - try: + with contextlib.suppress(Exception): await sess.delete(await sess.get(AssetCacheState, st.id)) - except Exception: - pass - try: + with contextlib.suppress(Exception): await remove_missing_tag_for_asset_id(sess, asset_id=aid) - except Exception: - pass else: - # No fast-ok path: mark as missing - try: + with contextlib.suppress(Exception): await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic") - except Exception: - pass await sess.flush() await sess.commit() diff --git a/app/database/services/content.py b/app/database/services/content.py index 0b33e1eee..a8ce200d1 100644 --- a/app/database/services/content.py +++ b/app/database/services/content.py @@ -87,6 +87,7 @@ async def ensure_seed_for_path( state_row.needs_verify = True if asset_row.size_bytes == 0 and size_bytes > 0: asset_row.size_bytes = int(size_bytes) + await session.flush() return asset_row.id asset = Asset(hash=None, size_bytes=int(size_bytes), mime_type=None, created_at=now)