From 677a0e2508c46fedf9d3b759fead96761328647b Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Tue, 16 Sep 2025 20:29:50 +0300 Subject: [PATCH] refactor(3): unite logic for Asset fast check --- app/assets_scanner.py | 182 ++++++++++++++++-------------------------- 1 file changed, 71 insertions(+), 111 deletions(-) diff --git a/app/assets_scanner.py b/app/assets_scanner.py index 967bc64bc..27bc52a25 100644 --- a/app/assets_scanner.py +++ b/app/assets_scanner.py @@ -105,7 +105,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: existing_paths: set[str] = set() for r in roots: try: - survivors = await _fast_db_consistency_pass(r, collect_existing_paths=True) + survivors = await _fast_db_consistency_pass(r, collect_existing_paths=True, update_missing_tags=True) if survivors: existing_paths.update(survivors) except Exception as ex: @@ -183,72 +183,13 @@ def _scan_progress_to_scan_status_model(progress: ScanProgress) -> schemas_out.A ) -async def _refresh_verify_flags_for_root(root: schemas_in.RootType) -> None: - """Fast pass to mark verify candidates by comparing stored mtime_ns with on-disk mtime.""" - prefixes = prefixes_for_root(root) - if not prefixes: - return - - conds = [] - for p in prefixes: - base = os.path.abspath(p) - if not base.endswith(os.sep): - base += os.sep - escaped, esc = escape_like_prefix(base) - conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc)) - - async with await create_session() as sess: - rows = ( - await sess.execute( - sa.select( - AssetCacheState.id, - AssetCacheState.mtime_ns, - AssetCacheState.needs_verify, - Asset.hash, - Asset.size_bytes, - AssetCacheState.file_path, - ) - .join(Asset, Asset.id == AssetCacheState.asset_id) - .where(sa.or_(*conds)) - ) - ).all() - - to_set = [] - to_clear = [] - for sid, mtime_db, needs_verify, a_hash, size_db, fp in rows: - try: - st = os.stat(fp, follow_symlinks=True) - except OSError: - continue # Missing files are handled by missing-tag reconciliation later. - - if a_hash is not None: - if fast_asset_file_check(mtime_db=mtime_db, size_db=size_db, stat_result=st): - if needs_verify: - to_clear.append(sid) - elif not needs_verify: - to_set.append(sid) - if to_set: - await sess.execute( - sa.update(AssetCacheState) - .where(AssetCacheState.id.in_(to_set)) - .values(needs_verify=True) - ) - if to_clear: - await sess.execute( - sa.update(AssetCacheState) - .where(AssetCacheState.id.in_(to_clear)) - .values(needs_verify=False) - ) - await sess.commit() - - async def _run_hash_verify_pipeline(root: schemas_in.RootType, prog: ScanProgress, state: SlowQueueState) -> None: prog.status = "running" prog.started_at = time.time() try: prefixes = prefixes_for_root(root) - await _refresh_verify_flags_for_root(root) + await _fast_db_consistency_pass(root) # collect candidates from DB async with await create_session() as sess: @@ -417,17 +358,17 @@ def _append_error(prog: ScanProgress, *, path: str, message: str) -> None: async def _fast_db_consistency_pass( - root: schemas_in.RootType, *, collect_existing_paths: bool = False + root: schemas_in.RootType, + *, + collect_existing_paths: bool = False, + update_missing_tags: bool = False, ) -> Optional[set[str]]: - """ - Quick pass over asset_cache_state for `root`: - - If file missing and Asset.hash is NULL and the Asset has no other states, delete the Asset and its infos. - - If file missing and Asset.hash is NOT NULL: - * If at least one state for this Asset is fast-ok, delete the missing state. - * If none are fast-ok, add 'missing' tag to all AssetInfos for this Asset. - - If at least one state becomes fast-ok for a hashed Asset, remove the 'missing' tag. - When collect_existing_paths is True, returns a set of absolute file paths - that still have a live asset_cache_state row for this root after reconciliation. + """Fast DB+FS pass for a root: + - Toggle needs_verify per state using fast check + - For hashed assets with at least one fast-ok state in this root: delete stale missing states + - For seed assets with all states missing: delete Asset and its AssetInfos + - Optionally add/remove 'missing' tags based on fast-ok in this root + - Optionally return surviving absolute paths """ prefixes = prefixes_for_root(root) if not prefixes: @@ -442,22 +383,25 @@ async def _fast_db_consistency_pass( conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc)) async with await create_session() as sess: - if not conds: - return set() if collect_existing_paths else None - rows = ( await sess.execute( - sa.select(AssetCacheState, Asset.hash, Asset.size_bytes) + sa.select( + AssetCacheState.id, + AssetCacheState.file_path, + AssetCacheState.mtime_ns, + AssetCacheState.needs_verify, + AssetCacheState.asset_id, + Asset.hash, + Asset.size_bytes, + ) .join(Asset, Asset.id == AssetCacheState.asset_id) .where(sa.or_(*conds)) .order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc()) ) ).all() - # Group by asset_id with status per state by_asset: dict[str, dict] = {} - for st, a_hash, a_size in rows: - aid = st.asset_id + for sid, fp, mtime_db, needs_verify, aid, a_hash, a_size in rows: acc = by_asset.get(aid) if acc is None: acc = {"hash": a_hash, "size_db": int(a_size or 0), "states": []} @@ -465,33 +409,46 @@ async def _fast_db_consistency_pass( fast_ok = False try: - fast_ok = fast_asset_file_check( - mtime_db=st.mtime_ns, - size_db=acc["size_db"], - stat_result=os.stat(st.file_path, follow_symlinks=True), - ) exists = True + fast_ok = fast_asset_file_check( + mtime_db=mtime_db, + size_db=acc["size_db"], + stat_result=os.stat(fp, follow_symlinks=True), + ) except FileNotFoundError: exists = False - except OSError as ex: + except OSError: exists = False - LOGGER.debug("fast pass stat error for %s: %s", st.file_path, ex) - acc["states"].append({"obj": st, "exists": exists, "fast_ok": fast_ok}) + acc["states"].append({ + "sid": sid, + "fp": fp, + "exists": exists, + "fast_ok": fast_ok, + "needs_verify": bool(needs_verify), + }) - # Apply actions to_set_verify: list[int] = [] + to_clear_verify: list[int] = [] + stale_state_ids: list[int] = [] survivors: set[str] = set() + for aid, acc in by_asset.items(): a_hash = acc["hash"] states = acc["states"] any_fast_ok = any(s["fast_ok"] for s in states) all_missing = all(not s["exists"] for s in states) - missing_states = [s["obj"] for s in states if not s["exists"]] + + for s in states: + if not s["exists"]: + continue + if s["fast_ok"] and s["needs_verify"]: + to_clear_verify.append(s["sid"]) + if not s["fast_ok"] and not s["needs_verify"]: + to_set_verify.append(s["sid"]) if a_hash is None: - # Seed asset: if all states gone (and in practice there is only one), remove the whole Asset - if states and all_missing: + if states and all_missing: # remove seed Asset completely, if no valid AssetCache exists await sess.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id == aid)) asset = await sess.get(Asset, aid) if asset: @@ -499,34 +456,37 @@ async def _fast_db_consistency_pass( else: for s in states: if s["exists"]: - survivors.add(os.path.abspath(s["obj"].file_path)) - else: - if any_fast_ok: - # Remove 'missing' and delete just the stale state rows - for st in missing_states: - with contextlib.suppress(Exception): - await sess.delete(await sess.get(AssetCacheState, st.id)) + survivors.add(os.path.abspath(s["fp"])) + continue + + if any_fast_ok: # if Asset has at least one valid AssetCache record, remove any invalid AssetCache records + for s in states: + if not s["exists"]: + stale_state_ids.append(s["sid"]) + if update_missing_tags: with contextlib.suppress(Exception): await remove_missing_tag_for_asset_id(sess, asset_id=aid) - for s in states: - if s["exists"]: - survivors.add(os.path.abspath(s["obj"].file_path)) - else: - with contextlib.suppress(Exception): - await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic") - for s in states: - if s["exists"]: - survivors.add(os.path.abspath(s["obj"].file_path)) - for s in states: - if s["exists"] and not s["fast_ok"]: - to_set_verify.append(s["obj"].id) - await sess.flush() + elif update_missing_tags: + with contextlib.suppress(Exception): + await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic") + + for s in states: + if s["exists"]: + survivors.add(os.path.abspath(s["fp"])) + + if stale_state_ids: + await sess.execute(sa.delete(AssetCacheState).where(AssetCacheState.id.in_(stale_state_ids))) if to_set_verify: await sess.execute( sa.update(AssetCacheState) .where(AssetCacheState.id.in_(to_set_verify)) .values(needs_verify=True) ) - await sess.flush() + if to_clear_verify: + await sess.execute( + sa.update(AssetCacheState) + .where(AssetCacheState.id.in_(to_clear_verify)) + .values(needs_verify=False) + ) await sess.commit() return survivors if collect_existing_paths else None