diff --git a/app/assets_scanner.py b/app/assets_scanner.py index 33efbf047..a77f87771 100644 --- a/app/assets_scanner.py +++ b/app/assets_scanner.py @@ -16,7 +16,7 @@ from ._assets_helpers import get_comfy_models_folders from .database.db import create_session from .database.services import ( check_fs_asset_exists_quick, - list_cache_states_under_prefixes, + list_cache_states_with_asset_under_prefixes, add_missing_tag_for_asset_hash, remove_missing_tag_for_asset_hash, ) @@ -194,6 +194,7 @@ async def _pipeline_for_root( prog.started_at = time.time() try: + await _reconcile_missing_tags_for_root(root, prog) await _fast_reconcile_into_queue(root, prog, state, progress_cb=progress_cb) _start_slow_workers(root, prog, state, progress_cb=progress_cb) await _await_workers_then_finish(root, prog, state, progress_cb=progress_cb) @@ -302,16 +303,17 @@ async def _fast_reconcile_into_queue( "queued": queued, "discovered": prog.discovered, }) - - await _reconcile_missing_tags_for_root(root, prog) state.closed = True async def _reconcile_missing_tags_for_root(root: RootType, prog: ScanProgress) -> None: """ - For every AssetCacheState under the root's base directories: - - if at least one recorded file_path exists for a hash -> remove 'missing' - - if none of the recorded file_paths exist for a hash -> add 'missing' + Logic for detecting missing Assets files: + - Clear 'missing' only if at least one cached path passes fast check: + exists AND mtime_ns matches AND size matches. + - Otherwise set 'missing'. + Files that exist but fail fast check will be slow-hashed by the normal pipeline, + and ingest_fs_asset will clear 'missing' if they truly match. """ if root == "models": bases: list[str] = [] @@ -324,33 +326,41 @@ async def _reconcile_missing_tags_for_root(root: RootType, prog: ScanProgress) - try: async with await create_session() as sess: - states = await list_cache_states_under_prefixes(sess, prefixes=bases) + rows = await list_cache_states_with_asset_under_prefixes(sess, prefixes=bases) - present: set[str] = set() - missing: set[str] = set() - - for s in states: + by_hash: dict[str, dict[str, bool]] = {} # {hash: {"any_fast_ok": bool}} + for state, size_db in rows: + h = state.asset_hash + acc = by_hash.get(h) + if acc is None: + acc = {"any_fast_ok": False} + by_hash[h] = acc try: - if os.path.isfile(s.file_path): - present.add(s.asset_hash) + st = os.stat(state.file_path, follow_symlinks=True) + actual_mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) + fast_ok = False + if state.mtime_ns is not None and int(state.mtime_ns) == int(actual_mtime_ns): + if int(size_db) > 0 and int(st.st_size) == int(size_db): + fast_ok = True + if fast_ok: + acc["any_fast_ok"] = True + except FileNotFoundError: + pass # not fast_ok + except OSError as e: + _append_error(prog, phase="fast", path=state.file_path, message=str(e)) + + for h, acc in by_hash.items(): + try: + if acc["any_fast_ok"]: + await remove_missing_tag_for_asset_hash(sess, asset_hash=h) else: - missing.add(s.asset_hash) - except Exception as e: - _append_error(prog, phase="fast", path=s.file_path, message=f"stat error: {e}") - - only_missing = missing - present - - for h in present: - with contextlib.suppress(Exception): - await remove_missing_tag_for_asset_hash(sess, asset_hash=h) - - for h in only_missing: - with contextlib.suppress(Exception): - await add_missing_tag_for_asset_hash(sess, asset_hash=h, origin="automatic") + await add_missing_tag_for_asset_hash(sess, asset_hash=h, origin="automatic") + except Exception as ex: + _append_error(prog, phase="fast", path="", message=f"reconcile {h[:18]}: {ex}") await sess.commit() except Exception as e: - _append_error(prog, phase="fast", path="", message=f"missing-tag reconcile failed: {e}") + _append_error(prog, phase="fast", path="", message=f"reconcile failed: {e}") def _start_slow_workers( @@ -406,6 +416,7 @@ async def _await_workers_then_finish( ) -> None: if state.workers: await asyncio.gather(*state.workers, return_exceptions=True) + await _reconcile_missing_tags_for_root(root, prog) prog.finished_at = time.time() prog.status = "completed" if progress_cb: diff --git a/app/database/services.py b/app/database/services.py index ceed3749a..0b7e3711c 100644 --- a/app/database/services.py +++ b/app/database/services.py @@ -157,7 +157,7 @@ async def ingest_fs_asset( out["state_created"] = True if not out["state_created"]: - # most likely a unique(file_path) conflict; update that row + # unique(file_path) conflict -> update that row state = ( await session.execute( select(AssetCacheState).where(AssetCacheState.file_path == locator).limit(1) @@ -1044,12 +1044,12 @@ async def remove_missing_tag_for_asset_hash( return int(res.rowcount or 0) -async def list_cache_states_under_prefixes( +async def list_cache_states_with_asset_under_prefixes( session: AsyncSession, *, prefixes: Sequence[str], -) -> list[AssetCacheState]: - """Return AssetCacheState rows whose file_path starts with any of the given absolute prefixes.""" +) -> list[tuple[AssetCacheState, int]]: + """Return (AssetCacheState, size_bytes) tuples for rows whose file_path starts with any of the absolute prefixes.""" if not prefixes: return [] @@ -1067,9 +1067,10 @@ async def list_cache_states_under_prefixes( rows = ( await session.execute( - select(AssetCacheState) + select(AssetCacheState, Asset.size_bytes) + .join(Asset, Asset.hash == AssetCacheState.asset_hash) .where(sa.or_(*conds)) .order_by(AssetCacheState.id.asc()) ) - ).scalars().all() - return list(rows) + ).all() + return [(r[0], int(r[1] or 0)) for r in rows]