diff --git a/app/database/services.py b/app/database/services.py index 0b7e3711c..42f647d91 100644 --- a/app/database/services.py +++ b/app/database/services.py @@ -10,6 +10,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete, func from sqlalchemy.orm import contains_eager, noload from sqlalchemy.exc import IntegrityError +from sqlalchemy.dialects import sqlite as d_sqlite +from sqlalchemy.dialects import postgresql as d_pg from .models import Asset, AssetInfo, AssetInfoTag, AssetCacheState, Tag, AssetInfoMeta, AssetLocation from .timeutil import utcnow @@ -143,39 +145,49 @@ async def ingest_fs_asset( else: logging.error("Asset %s not found after PK conflict; skipping update.", asset_hash) - # ---- Step 2: INSERT/UPDATE AssetCacheState (mtime_ns, file_path) ---- - with contextlib.suppress(IntegrityError): - async with session.begin_nested(): - session.add( - AssetCacheState( - asset_hash=asset_hash, - file_path=locator, - mtime_ns=int(mtime_ns), + # ---- Step 2: UPSERT AssetCacheState (mtime_ns, file_path) ---- + dialect = session.bind.dialect.name # "sqlite" or "postgresql" + vals = { + "asset_hash": asset_hash, + "file_path": locator, + "mtime_ns": int(mtime_ns), + } + # 2-step idempotent write so we can set flags deterministically: + # INSERT ... ON CONFLICT(file_path) DO NOTHING + # if conflicted, UPDATE only when values actually change + if dialect == "sqlite": + ins = ( + d_sqlite.insert(AssetCacheState) + .values(**vals) + .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) + ) + elif dialect == "postgresql": + ins = ( + d_pg.insert(AssetCacheState) + .values(**vals) + .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) + ) + else: + raise NotImplementedError(f"Unsupported database dialect: {dialect}") + res = await session.execute(ins) + if int(res.rowcount or 0) > 0: + out["state_created"] = True + else: + upd = ( + sa.update(AssetCacheState) + .where(AssetCacheState.file_path == locator) + .where( + sa.or_( + AssetCacheState.asset_hash != asset_hash, + AssetCacheState.mtime_ns.is_(None), + AssetCacheState.mtime_ns != int(mtime_ns), ) ) - await session.flush() - out["state_created"] = True - - if not out["state_created"]: - # unique(file_path) conflict -> update that row - state = ( - await session.execute( - select(AssetCacheState).where(AssetCacheState.file_path == locator).limit(1) - ) - ).scalars().first() - if state is not None: - changed = False - if state.asset_hash != asset_hash: - state.asset_hash = asset_hash - changed = True - if state.mtime_ns != int(mtime_ns): - state.mtime_ns = int(mtime_ns) - changed = True - if changed: - await session.flush() - out["state_updated"] = True - else: - logging.error("Locator state missing for %s after conflict; skipping update.", asset_hash) + .values(asset_hash=asset_hash, mtime_ns=int(mtime_ns)) + ) + res2 = await session.execute(upd) + if int(res2.rowcount or 0) > 0: + out["state_updated"] = True # ---- Optional: AssetInfo + tag links ---- if info_name: