corrected detection of missing files for assets

This commit is contained in:
bigcat88 2025-09-07 17:53:18 +03:00
parent b8ef9bb92c
commit 6282d495ca
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
2 changed files with 46 additions and 34 deletions

View File

@ -16,7 +16,7 @@ from ._assets_helpers import get_comfy_models_folders
from .database.db import create_session
from .database.services import (
check_fs_asset_exists_quick,
list_cache_states_under_prefixes,
list_cache_states_with_asset_under_prefixes,
add_missing_tag_for_asset_hash,
remove_missing_tag_for_asset_hash,
)
@ -194,6 +194,7 @@ async def _pipeline_for_root(
prog.started_at = time.time()
try:
await _reconcile_missing_tags_for_root(root, prog)
await _fast_reconcile_into_queue(root, prog, state, progress_cb=progress_cb)
_start_slow_workers(root, prog, state, progress_cb=progress_cb)
await _await_workers_then_finish(root, prog, state, progress_cb=progress_cb)
@ -302,16 +303,17 @@ async def _fast_reconcile_into_queue(
"queued": queued,
"discovered": prog.discovered,
})
await _reconcile_missing_tags_for_root(root, prog)
state.closed = True
async def _reconcile_missing_tags_for_root(root: RootType, prog: ScanProgress) -> None:
"""
For every AssetCacheState under the root's base directories:
- if at least one recorded file_path exists for a hash -> remove 'missing'
- if none of the recorded file_paths exist for a hash -> add 'missing'
Logic for detecting missing Assets files:
- Clear 'missing' only if at least one cached path passes fast check:
exists AND mtime_ns matches AND size matches.
- Otherwise set 'missing'.
Files that exist but fail fast check will be slow-hashed by the normal pipeline,
and ingest_fs_asset will clear 'missing' if they truly match.
"""
if root == "models":
bases: list[str] = []
@ -324,33 +326,41 @@ async def _reconcile_missing_tags_for_root(root: RootType, prog: ScanProgress) -
try:
async with await create_session() as sess:
states = await list_cache_states_under_prefixes(sess, prefixes=bases)
rows = await list_cache_states_with_asset_under_prefixes(sess, prefixes=bases)
present: set[str] = set()
missing: set[str] = set()
for s in states:
by_hash: dict[str, dict[str, bool]] = {} # {hash: {"any_fast_ok": bool}}
for state, size_db in rows:
h = state.asset_hash
acc = by_hash.get(h)
if acc is None:
acc = {"any_fast_ok": False}
by_hash[h] = acc
try:
if os.path.isfile(s.file_path):
present.add(s.asset_hash)
st = os.stat(state.file_path, follow_symlinks=True)
actual_mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))
fast_ok = False
if state.mtime_ns is not None and int(state.mtime_ns) == int(actual_mtime_ns):
if int(size_db) > 0 and int(st.st_size) == int(size_db):
fast_ok = True
if fast_ok:
acc["any_fast_ok"] = True
except FileNotFoundError:
pass # not fast_ok
except OSError as e:
_append_error(prog, phase="fast", path=state.file_path, message=str(e))
for h, acc in by_hash.items():
try:
if acc["any_fast_ok"]:
await remove_missing_tag_for_asset_hash(sess, asset_hash=h)
else:
missing.add(s.asset_hash)
except Exception as e:
_append_error(prog, phase="fast", path=s.file_path, message=f"stat error: {e}")
only_missing = missing - present
for h in present:
with contextlib.suppress(Exception):
await remove_missing_tag_for_asset_hash(sess, asset_hash=h)
for h in only_missing:
with contextlib.suppress(Exception):
await add_missing_tag_for_asset_hash(sess, asset_hash=h, origin="automatic")
await add_missing_tag_for_asset_hash(sess, asset_hash=h, origin="automatic")
except Exception as ex:
_append_error(prog, phase="fast", path="", message=f"reconcile {h[:18]}: {ex}")
await sess.commit()
except Exception as e:
_append_error(prog, phase="fast", path="", message=f"missing-tag reconcile failed: {e}")
_append_error(prog, phase="fast", path="", message=f"reconcile failed: {e}")
def _start_slow_workers(
@ -406,6 +416,7 @@ async def _await_workers_then_finish(
) -> None:
if state.workers:
await asyncio.gather(*state.workers, return_exceptions=True)
await _reconcile_missing_tags_for_root(root, prog)
prog.finished_at = time.time()
prog.status = "completed"
if progress_cb:

View File

@ -157,7 +157,7 @@ async def ingest_fs_asset(
out["state_created"] = True
if not out["state_created"]:
# most likely a unique(file_path) conflict; update that row
# unique(file_path) conflict -> update that row
state = (
await session.execute(
select(AssetCacheState).where(AssetCacheState.file_path == locator).limit(1)
@ -1044,12 +1044,12 @@ async def remove_missing_tag_for_asset_hash(
return int(res.rowcount or 0)
async def list_cache_states_under_prefixes(
async def list_cache_states_with_asset_under_prefixes(
session: AsyncSession,
*,
prefixes: Sequence[str],
) -> list[AssetCacheState]:
"""Return AssetCacheState rows whose file_path starts with any of the given absolute prefixes."""
) -> list[tuple[AssetCacheState, int]]:
"""Return (AssetCacheState, size_bytes) tuples for rows whose file_path starts with any of the absolute prefixes."""
if not prefixes:
return []
@ -1067,9 +1067,10 @@ async def list_cache_states_under_prefixes(
rows = (
await session.execute(
select(AssetCacheState)
select(AssetCacheState, Asset.size_bytes)
.join(Asset, Asset.hash == AssetCacheState.asset_hash)
.where(sa.or_(*conds))
.order_by(AssetCacheState.id.asc())
)
).scalars().all()
return list(rows)
).all()
return [(r[0], int(r[1] or 0)) for r in rows]