fix: use UPSERT to eliminate rare race condition during ingesting many small files in parallel

This commit is contained in:
bigcat88 2025-09-08 18:13:32 +03:00
parent 6282d495ca
commit 3fa0fc496c
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721

View File

@ -10,6 +10,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func from sqlalchemy import select, delete, func
from sqlalchemy.orm import contains_eager, noload from sqlalchemy.orm import contains_eager, noload
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.dialects import sqlite as d_sqlite
from sqlalchemy.dialects import postgresql as d_pg
from .models import Asset, AssetInfo, AssetInfoTag, AssetCacheState, Tag, AssetInfoMeta, AssetLocation from .models import Asset, AssetInfo, AssetInfoTag, AssetCacheState, Tag, AssetInfoMeta, AssetLocation
from .timeutil import utcnow from .timeutil import utcnow
@ -143,39 +145,49 @@ async def ingest_fs_asset(
else: else:
logging.error("Asset %s not found after PK conflict; skipping update.", asset_hash) logging.error("Asset %s not found after PK conflict; skipping update.", asset_hash)
# ---- Step 2: INSERT/UPDATE AssetCacheState (mtime_ns, file_path) ---- # ---- Step 2: UPSERT AssetCacheState (mtime_ns, file_path) ----
with contextlib.suppress(IntegrityError): dialect = session.bind.dialect.name # "sqlite" or "postgresql"
async with session.begin_nested(): vals = {
session.add( "asset_hash": asset_hash,
AssetCacheState( "file_path": locator,
asset_hash=asset_hash, "mtime_ns": int(mtime_ns),
file_path=locator, }
mtime_ns=int(mtime_ns), # 2-step idempotent write so we can set flags deterministically:
# INSERT ... ON CONFLICT(file_path) DO NOTHING
# if conflicted, UPDATE only when values actually change
if dialect == "sqlite":
ins = (
d_sqlite.insert(AssetCacheState)
.values(**vals)
.on_conflict_do_nothing(index_elements=[AssetCacheState.file_path])
)
elif dialect == "postgresql":
ins = (
d_pg.insert(AssetCacheState)
.values(**vals)
.on_conflict_do_nothing(index_elements=[AssetCacheState.file_path])
)
else:
raise NotImplementedError(f"Unsupported database dialect: {dialect}")
res = await session.execute(ins)
if int(res.rowcount or 0) > 0:
out["state_created"] = True
else:
upd = (
sa.update(AssetCacheState)
.where(AssetCacheState.file_path == locator)
.where(
sa.or_(
AssetCacheState.asset_hash != asset_hash,
AssetCacheState.mtime_ns.is_(None),
AssetCacheState.mtime_ns != int(mtime_ns),
) )
) )
await session.flush() .values(asset_hash=asset_hash, mtime_ns=int(mtime_ns))
out["state_created"] = True )
res2 = await session.execute(upd)
if not out["state_created"]: if int(res2.rowcount or 0) > 0:
# unique(file_path) conflict -> update that row out["state_updated"] = True
state = (
await session.execute(
select(AssetCacheState).where(AssetCacheState.file_path == locator).limit(1)
)
).scalars().first()
if state is not None:
changed = False
if state.asset_hash != asset_hash:
state.asset_hash = asset_hash
changed = True
if state.mtime_ns != int(mtime_ns):
state.mtime_ns = int(mtime_ns)
changed = True
if changed:
await session.flush()
out["state_updated"] = True
else:
logging.error("Locator state missing for %s after conflict; skipping update.", asset_hash)
# ---- Optional: AssetInfo + tag links ---- # ---- Optional: AssetInfo + tag links ----
if info_name: if info_name: