refactor(3): unite logic for Asset fast check

This commit is contained in:
bigcat88 2025-09-16 20:29:50 +03:00
parent 31ec744317
commit 677a0e2508
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721

View File

@ -105,7 +105,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
existing_paths: set[str] = set() existing_paths: set[str] = set()
for r in roots: for r in roots:
try: try:
survivors = await _fast_db_consistency_pass(r, collect_existing_paths=True) survivors = await _fast_db_consistency_pass(r, collect_existing_paths=True, update_missing_tags=True)
if survivors: if survivors:
existing_paths.update(survivors) existing_paths.update(survivors)
except Exception as ex: except Exception as ex:
@ -183,72 +183,13 @@ def _scan_progress_to_scan_status_model(progress: ScanProgress) -> schemas_out.A
) )
async def _refresh_verify_flags_for_root(root: schemas_in.RootType) -> None:
"""Fast pass to mark verify candidates by comparing stored mtime_ns with on-disk mtime."""
prefixes = prefixes_for_root(root)
if not prefixes:
return
conds = []
for p in prefixes:
base = os.path.abspath(p)
if not base.endswith(os.sep):
base += os.sep
escaped, esc = escape_like_prefix(base)
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
async with await create_session() as sess:
rows = (
await sess.execute(
sa.select(
AssetCacheState.id,
AssetCacheState.mtime_ns,
AssetCacheState.needs_verify,
Asset.hash,
Asset.size_bytes,
AssetCacheState.file_path,
)
.join(Asset, Asset.id == AssetCacheState.asset_id)
.where(sa.or_(*conds))
)
).all()
to_set = []
to_clear = []
for sid, mtime_db, needs_verify, a_hash, size_db, fp in rows:
try:
st = os.stat(fp, follow_symlinks=True)
except OSError:
continue # Missing files are handled by missing-tag reconciliation later.
if a_hash is not None:
if fast_asset_file_check(mtime_db=mtime_db, size_db=size_db, stat_result=st):
if needs_verify:
to_clear.append(sid)
elif not needs_verify:
to_set.append(sid)
if to_set:
await sess.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(to_set))
.values(needs_verify=True)
)
if to_clear:
await sess.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(to_clear))
.values(needs_verify=False)
)
await sess.commit()
async def _run_hash_verify_pipeline(root: schemas_in.RootType, prog: ScanProgress, state: SlowQueueState) -> None: async def _run_hash_verify_pipeline(root: schemas_in.RootType, prog: ScanProgress, state: SlowQueueState) -> None:
prog.status = "running" prog.status = "running"
prog.started_at = time.time() prog.started_at = time.time()
try: try:
prefixes = prefixes_for_root(root) prefixes = prefixes_for_root(root)
await _refresh_verify_flags_for_root(root) await _fast_db_consistency_pass(root)
# collect candidates from DB # collect candidates from DB
async with await create_session() as sess: async with await create_session() as sess:
@ -417,17 +358,17 @@ def _append_error(prog: ScanProgress, *, path: str, message: str) -> None:
async def _fast_db_consistency_pass( async def _fast_db_consistency_pass(
root: schemas_in.RootType, *, collect_existing_paths: bool = False root: schemas_in.RootType,
*,
collect_existing_paths: bool = False,
update_missing_tags: bool = False,
) -> Optional[set[str]]: ) -> Optional[set[str]]:
""" """Fast DB+FS pass for a root:
Quick pass over asset_cache_state for `root`: - Toggle needs_verify per state using fast check
- If file missing and Asset.hash is NULL and the Asset has no other states, delete the Asset and its infos. - For hashed assets with at least one fast-ok state in this root: delete stale missing states
- If file missing and Asset.hash is NOT NULL: - For seed assets with all states missing: delete Asset and its AssetInfos
* If at least one state for this Asset is fast-ok, delete the missing state. - Optionally add/remove 'missing' tags based on fast-ok in this root
* If none are fast-ok, add 'missing' tag to all AssetInfos for this Asset. - Optionally return surviving absolute paths
- If at least one state becomes fast-ok for a hashed Asset, remove the 'missing' tag.
When collect_existing_paths is True, returns a set of absolute file paths
that still have a live asset_cache_state row for this root after reconciliation.
""" """
prefixes = prefixes_for_root(root) prefixes = prefixes_for_root(root)
if not prefixes: if not prefixes:
@ -442,22 +383,25 @@ async def _fast_db_consistency_pass(
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc)) conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
async with await create_session() as sess: async with await create_session() as sess:
if not conds:
return set() if collect_existing_paths else None
rows = ( rows = (
await sess.execute( await sess.execute(
sa.select(AssetCacheState, Asset.hash, Asset.size_bytes) sa.select(
AssetCacheState.id,
AssetCacheState.file_path,
AssetCacheState.mtime_ns,
AssetCacheState.needs_verify,
AssetCacheState.asset_id,
Asset.hash,
Asset.size_bytes,
)
.join(Asset, Asset.id == AssetCacheState.asset_id) .join(Asset, Asset.id == AssetCacheState.asset_id)
.where(sa.or_(*conds)) .where(sa.or_(*conds))
.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc()) .order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
) )
).all() ).all()
# Group by asset_id with status per state
by_asset: dict[str, dict] = {} by_asset: dict[str, dict] = {}
for st, a_hash, a_size in rows: for sid, fp, mtime_db, needs_verify, aid, a_hash, a_size in rows:
aid = st.asset_id
acc = by_asset.get(aid) acc = by_asset.get(aid)
if acc is None: if acc is None:
acc = {"hash": a_hash, "size_db": int(a_size or 0), "states": []} acc = {"hash": a_hash, "size_db": int(a_size or 0), "states": []}
@ -465,33 +409,46 @@ async def _fast_db_consistency_pass(
fast_ok = False fast_ok = False
try: try:
fast_ok = fast_asset_file_check(
mtime_db=st.mtime_ns,
size_db=acc["size_db"],
stat_result=os.stat(st.file_path, follow_symlinks=True),
)
exists = True exists = True
fast_ok = fast_asset_file_check(
mtime_db=mtime_db,
size_db=acc["size_db"],
stat_result=os.stat(fp, follow_symlinks=True),
)
except FileNotFoundError: except FileNotFoundError:
exists = False exists = False
except OSError as ex: except OSError:
exists = False exists = False
LOGGER.debug("fast pass stat error for %s: %s", st.file_path, ex)
acc["states"].append({"obj": st, "exists": exists, "fast_ok": fast_ok}) acc["states"].append({
"sid": sid,
"fp": fp,
"exists": exists,
"fast_ok": fast_ok,
"needs_verify": bool(needs_verify),
})
# Apply actions
to_set_verify: list[int] = [] to_set_verify: list[int] = []
to_clear_verify: list[int] = []
stale_state_ids: list[int] = []
survivors: set[str] = set() survivors: set[str] = set()
for aid, acc in by_asset.items(): for aid, acc in by_asset.items():
a_hash = acc["hash"] a_hash = acc["hash"]
states = acc["states"] states = acc["states"]
any_fast_ok = any(s["fast_ok"] for s in states) any_fast_ok = any(s["fast_ok"] for s in states)
all_missing = all(not s["exists"] for s in states) all_missing = all(not s["exists"] for s in states)
missing_states = [s["obj"] for s in states if not s["exists"]]
for s in states:
if not s["exists"]:
continue
if s["fast_ok"] and s["needs_verify"]:
to_clear_verify.append(s["sid"])
if not s["fast_ok"] and not s["needs_verify"]:
to_set_verify.append(s["sid"])
if a_hash is None: if a_hash is None:
# Seed asset: if all states gone (and in practice there is only one), remove the whole Asset if states and all_missing: # remove seed Asset completely, if no valid AssetCache exists
if states and all_missing:
await sess.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id == aid)) await sess.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id == aid))
asset = await sess.get(Asset, aid) asset = await sess.get(Asset, aid)
if asset: if asset:
@ -499,34 +456,37 @@ async def _fast_db_consistency_pass(
else: else:
for s in states: for s in states:
if s["exists"]: if s["exists"]:
survivors.add(os.path.abspath(s["obj"].file_path)) survivors.add(os.path.abspath(s["fp"]))
else: continue
if any_fast_ok:
# Remove 'missing' and delete just the stale state rows if any_fast_ok: # if Asset has at least one valid AssetCache record, remove any invalid AssetCache records
for st in missing_states: for s in states:
with contextlib.suppress(Exception): if not s["exists"]:
await sess.delete(await sess.get(AssetCacheState, st.id)) stale_state_ids.append(s["sid"])
if update_missing_tags:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await remove_missing_tag_for_asset_id(sess, asset_id=aid) await remove_missing_tag_for_asset_id(sess, asset_id=aid)
for s in states: elif update_missing_tags:
if s["exists"]:
survivors.add(os.path.abspath(s["obj"].file_path))
else:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic") await add_missing_tag_for_asset_id(sess, asset_id=aid, origin="automatic")
for s in states: for s in states:
if s["exists"]: if s["exists"]:
survivors.add(os.path.abspath(s["obj"].file_path)) survivors.add(os.path.abspath(s["fp"]))
for s in states:
if s["exists"] and not s["fast_ok"]: if stale_state_ids:
to_set_verify.append(s["obj"].id) await sess.execute(sa.delete(AssetCacheState).where(AssetCacheState.id.in_(stale_state_ids)))
await sess.flush()
if to_set_verify: if to_set_verify:
await sess.execute( await sess.execute(
sa.update(AssetCacheState) sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(to_set_verify)) .where(AssetCacheState.id.in_(to_set_verify))
.values(needs_verify=True) .values(needs_verify=True)
) )
await sess.flush() if to_clear_verify:
await sess.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(to_clear_verify))
.values(needs_verify=False)
)
await sess.commit() await sess.commit()
return survivors if collect_existing_paths else None return survivors if collect_existing_paths else None