diff --git a/app/assets_scanner.py b/app/assets_scanner.py index 0f199719d..5ec1ebe88 100644 --- a/app/assets_scanner.py +++ b/app/assets_scanner.py @@ -31,11 +31,11 @@ from .database.helpers import ( from .database.models import Asset, AssetCacheState, AssetInfo from .database.services import ( compute_hash_and_dedup_for_cache_state, - ensure_seed_for_path, list_cache_states_by_asset_id, list_cache_states_with_asset_under_prefixes, list_unhashed_candidates_under_prefixes, list_verify_candidates_under_prefixes, + seed_from_path, ) LOGGER = logging.getLogger(__name__) @@ -147,7 +147,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: if tag_pool: await ensure_tags_exist(sess, tag_pool, tag_type="user") for ap, sz, mt, name, tags in new_specs: - await ensure_seed_for_path( + await seed_from_path( sess, abs_path=ap, size_bytes=sz, diff --git a/app/database/services/__init__.py b/app/database/services/__init__.py index 88e97bfb0..fae9eb670 100644 --- a/app/database/services/__init__.py +++ b/app/database/services/__init__.py @@ -1,12 +1,12 @@ from .content import ( check_fs_asset_exists_quick, compute_hash_and_dedup_for_cache_state, - ensure_seed_for_path, ingest_fs_asset, list_cache_states_with_asset_under_prefixes, list_unhashed_candidates_under_prefixes, list_verify_candidates_under_prefixes, redirect_all_references_then_delete_asset, + seed_from_path, touch_asset_infos_by_fs_path, ) from .info import ( @@ -49,7 +49,7 @@ __all__ = [ "get_asset_tags", "list_tags_with_usage", "set_asset_info_preview", "fetch_asset_info_and_asset", "fetch_asset_info_asset_and_tags", # content - "check_fs_asset_exists_quick", "ensure_seed_for_path", + "check_fs_asset_exists_quick", "seed_from_path", "redirect_all_references_then_delete_asset", "compute_hash_and_dedup_for_cache_state", "list_unhashed_candidates_under_prefixes", "list_verify_candidates_under_prefixes", diff --git a/app/database/services/content.py b/app/database/services/content.py index 298660129..84fa01f01 100644 --- a/app/database/services/content.py +++ b/app/database/services/content.py @@ -1,6 +1,7 @@ import contextlib import logging import os +import uuid from datetime import datetime from typing import Any, Optional, Sequence, Union @@ -19,7 +20,7 @@ from ..helpers import ( escape_like_prefix, remove_missing_tag_for_asset_id, ) -from ..models import Asset, AssetCacheState, AssetInfo, AssetInfoTag, Tag +from ..models import Asset, AssetCacheState, AssetInfo, AssetInfoMeta, AssetInfoTag, Tag from ..timeutil import utcnow from .info import replace_asset_info_metadata_projection from .queries import list_cache_states_by_asset_id, pick_best_live_path @@ -56,7 +57,7 @@ async def check_fs_asset_exists_quick( return row is not None -async def ensure_seed_for_path( +async def seed_from_path( session: AsyncSession, *, abs_path: str, @@ -66,69 +67,131 @@ async def ensure_seed_for_path( tags: Sequence[str], owner_id: str = "", skip_tag_ensure: bool = False, -) -> str: - """Ensure: Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path. Returns asset_id.""" +) -> None: + """Creates Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path.""" locator = os.path.abspath(abs_path) now = utcnow() + dialect = session.bind.dialect.name - state = ( - await session.execute( - sa.select(AssetCacheState, Asset) - .join(Asset, Asset.id == AssetCacheState.asset_id) - .where(AssetCacheState.file_path == locator) - .limit(1) + new_asset_id = str(uuid.uuid4()) + new_info_id = str(uuid.uuid4()) + + # 1) Insert Asset (hash=NULL) – no conflict expected + asset_vals = { + "id": new_asset_id, + "hash": None, + "size_bytes": size_bytes, + "mime_type": None, + "created_at": now, + } + if dialect == "sqlite": + await session.execute(d_sqlite.insert(Asset).values(**asset_vals)) + elif dialect == "postgresql": + await session.execute(d_pg.insert(Asset).values(**asset_vals)) + else: + raise NotImplementedError(f"Unsupported database dialect: {dialect}") + + # 2) Try to claim file_path in AssetCacheState. Our concurrency gate. + acs_vals = { + "asset_id": new_asset_id, + "file_path": locator, + "mtime_ns": mtime_ns, + } + if dialect == "sqlite": + ins_state = ( + d_sqlite.insert(AssetCacheState) + .values(**acs_vals) + .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) ) - ).first() - if state: - state_row: AssetCacheState = state[0] - asset_row: Asset = state[1] - changed = state_row.mtime_ns is None or int(state_row.mtime_ns) != mtime_ns - if changed: - state_row.mtime_ns = mtime_ns - state_row.needs_verify = True - if asset_row.size_bytes == 0 and size_bytes > 0: - asset_row.size_bytes = size_bytes - await session.flush() - return asset_row.id + state_inserted = int((await session.execute(ins_state)).rowcount or 0) > 0 + else: + ins_state = ( + d_pg.insert(AssetCacheState) + .values(**acs_vals) + .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) + .returning(AssetCacheState.id) + ) + state_inserted = (await session.execute(ins_state)).scalar_one_or_none() is not None - asset = Asset(hash=None, size_bytes=size_bytes, mime_type=None, created_at=now) - session.add(asset) - await session.flush() # to get id + if not state_inserted: + # Lost the race - clean up our orphan seed Asset and exit + with contextlib.suppress(Exception): + await session.execute(sa.delete(Asset).where(Asset.id == new_asset_id)) + return - cs = AssetCacheState(asset_id=asset.id, file_path=locator, mtime_ns=mtime_ns, needs_verify=False) - session.add(cs) + # 3) Create AssetInfo (unique(asset_id, owner_id, name)). + fname = compute_relative_filename(locator) - info = AssetInfo( - owner_id=owner_id, - name=info_name, - asset_id=asset.id, - preview_id=None, - created_at=now, - updated_at=now, - last_access_time=now, - ) - session.add(info) - await session.flush() + info_vals = { + "id": new_info_id, + "owner_id": owner_id, + "name": info_name, + "asset_id": new_asset_id, + "preview_id": None, + "user_metadata": {"filename": fname} if fname else None, + "created_at": now, + "updated_at": now, + "last_access_time": now, + } + if dialect == "sqlite": + ins_info = ( + d_sqlite.insert(AssetInfo) + .values(**info_vals) + .on_conflict_do_nothing(index_elements=[AssetInfo.asset_id, AssetInfo.owner_id, AssetInfo.name]) + ) + info_inserted = int((await session.execute(ins_info)).rowcount or 0) > 0 + else: + ins_info = ( + d_pg.insert(AssetInfo) + .values(**info_vals) + .on_conflict_do_nothing(index_elements=[AssetInfo.asset_id, AssetInfo.owner_id, AssetInfo.name]) + .returning(AssetInfo.id) + ) + info_inserted = (await session.execute(ins_info)).scalar_one_or_none() is not None - with contextlib.suppress(Exception): - computed = compute_relative_filename(locator) - if computed: - await replace_asset_info_metadata_projection( - session, - asset_info_id=info.id, - user_metadata={"filename": computed}, - ) + # 4) If we actually inserted AssetInfo, attach tags and filename. + if info_inserted: + want = normalize_tags(tags) + if want: + if not skip_tag_ensure: + await ensure_tags_exist(session, want, tag_type="user") + tag_rows = [ + { + "asset_info_id": new_info_id, + "tag_name": t, + "origin": "automatic", + "added_at": now, + } + for t in want + ] + if dialect == "sqlite": + ins_links = ( + d_sqlite.insert(AssetInfoTag) + .values(tag_rows) + .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) + ) + else: + ins_links = ( + d_pg.insert(AssetInfoTag) + .values(tag_rows) + .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) + ) + await session.execute(ins_links) - want = normalize_tags(tags) - if want: - if not skip_tag_ensure: - await ensure_tags_exist(session, want, tag_type="user") - session.add_all([ - AssetInfoTag(asset_info_id=info.id, tag_name=t, origin="automatic", added_at=now) - for t in want - ]) - await session.flush() - return asset.id + if fname: # simple filename projection with single row + meta_row = { + "asset_info_id": new_info_id, + "key": "filename", + "ordinal": 0, + "val_str": fname, + "val_num": None, + "val_bool": None, + "val_json": None, + } + if dialect == "sqlite": + await session.execute(d_sqlite.insert(AssetInfoMeta).values(**meta_row)) + else: + await session.execute(d_pg.insert(AssetInfoMeta).values(**meta_row)) async def redirect_all_references_then_delete_asset(