refactor(2)/fix: skip double checking the existing files during fast check

This commit is contained in:
bigcat88 2025-09-16 19:50:21 +03:00
parent a336c7c165
commit 31ec744317
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721

View File

@ -98,14 +98,19 @@ async def schedule_scans(roots: list[schemas_in.RootType]) -> schemas_out.AssetS
async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
t_total = time.perf_counter() t_total = time.perf_counter()
created = 0
skipped_existing = 0
paths: list[str] = []
try: try:
existing_paths: set[str] = set()
for r in roots: for r in roots:
try: try:
await _fast_db_consistency_pass(r) survivors = await _fast_db_consistency_pass(r, collect_existing_paths=True)
if survivors:
existing_paths.update(survivors)
except Exception as ex: except Exception as ex:
LOGGER.exception("fast DB reconciliation failed for %s: %s", r, ex) LOGGER.exception("fast DB reconciliation failed for %s: %s", r, ex)
paths: list[str] = []
if "models" in roots: if "models" in roots:
paths.extend(collect_models_files()) paths.extend(collect_models_files())
if "input" in roots: if "input" in roots:
@ -113,10 +118,12 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
if "output" in roots: if "output" in roots:
paths.extend(list_tree(folder_paths.get_output_directory())) paths.extend(list_tree(folder_paths.get_output_directory()))
processed = 0
async with await create_session() as sess: async with await create_session() as sess:
for p in paths: for p in paths:
try: try:
if os.path.abspath(p) in existing_paths:
skipped_existing += 1
continue
st = os.stat(p, follow_symlinks=True) st = os.stat(p, follow_symlinks=True)
if not int(st.st_size or 0): if not int(st.st_size or 0):
continue continue
@ -134,17 +141,20 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
owner_id="", owner_id="",
) )
processed += 1 created += 1
if processed % 500 == 0: if created % 500 == 0:
await sess.commit() await sess.commit()
except OSError: except OSError:
continue continue
await sess.commit() await sess.commit()
finally: finally:
LOGGER.info( LOGGER.info(
"Assets scan(roots=%s) completed in %.3f s", "Assets scan(roots=%s) completed in %.3f s (created=%d, skipped_existing=%d, total_seen=%d)",
roots, roots,
time.perf_counter() - t_total, time.perf_counter() - t_total,
created,
skipped_existing,
len(paths),
) )
@ -406,7 +416,9 @@ def _append_error(prog: ScanProgress, *, path: str, message: str) -> None:
}) })
async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None: async def _fast_db_consistency_pass(
root: schemas_in.RootType, *, collect_existing_paths: bool = False
) -> Optional[set[str]]:
""" """
Quick pass over asset_cache_state for `root`: Quick pass over asset_cache_state for `root`:
- If file missing and Asset.hash is NULL and the Asset has no other states, delete the Asset and its infos. - If file missing and Asset.hash is NULL and the Asset has no other states, delete the Asset and its infos.
@ -414,10 +426,12 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None:
* If at least one state for this Asset is fast-ok, delete the missing state. * If at least one state for this Asset is fast-ok, delete the missing state.
* If none are fast-ok, add 'missing' tag to all AssetInfos for this Asset. * If none are fast-ok, add 'missing' tag to all AssetInfos for this Asset.
- If at least one state becomes fast-ok for a hashed Asset, remove the 'missing' tag. - If at least one state becomes fast-ok for a hashed Asset, remove the 'missing' tag.
When collect_existing_paths is True, returns a set of absolute file paths
that still have a live asset_cache_state row for this root after reconciliation.
""" """
prefixes = prefixes_for_root(root) prefixes = prefixes_for_root(root)
if not prefixes: if not prefixes:
return return set() if collect_existing_paths else None
conds = [] conds = []
for p in prefixes: for p in prefixes:
@ -429,7 +443,7 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None:
async with await create_session() as sess: async with await create_session() as sess:
if not conds: if not conds:
return return set() if collect_existing_paths else None
rows = ( rows = (
await sess.execute( await sess.execute(
@ -467,6 +481,7 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None:
# Apply actions # Apply actions
to_set_verify: list[int] = [] to_set_verify: list[int] = []
survivors: set[str] = set()
for aid, acc in by_asset.items(): for aid, acc in by_asset.items():
a_hash = acc["hash"] a_hash = acc["hash"]
states = acc["states"] states = acc["states"]
@ -481,7 +496,10 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None:
asset = await sess.get(Asset, aid) asset = await sess.get(Asset, aid)
if asset: if asset:
await sess.delete(asset) await sess.delete(asset)
# else leave it for the slow scan to verify/rehash else:
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["obj"].file_path))
else: else:
if any_fast_ok: if any_fast_ok:
# Remove 'missing' and delete just the stale state rows # Remove 'missing' and delete just the stale state rows
@ -490,9 +508,15 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None:
await sess.delete(await sess.get(AssetCacheState, st.id)) await sess.delete(await sess.get(AssetCacheState, st.id))
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await remove_missing_tag_for_asset_id(sess, asset_id=aid) await remove_missing_tag_for_asset_id(sess, asset_id=aid)
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["obj"].file_path))
else: else:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic") await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic")
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["obj"].file_path))
for s in states: for s in states:
if s["exists"] and not s["fast_ok"]: if s["exists"] and not s["fast_ok"]:
to_set_verify.append(s["obj"].id) to_set_verify.append(s["obj"].id)
@ -505,3 +529,4 @@ async def _fast_db_consistency_pass(root: schemas_in.RootType) -> None:
) )
await sess.flush() await sess.flush()
await sess.commit() await sess.commit()
return survivors if collect_existing_paths else None