import os import uuid import sqlalchemy from typing import Iterable from sqlalchemy.orm import Session from sqlalchemy.dialects import sqlite from app.assets.helpers import utcnow from app.assets.database.models import Asset, AssetCacheState, AssetInfo, AssetInfoTag, AssetInfoMeta MAX_BIND_PARAMS = 800 def _chunk_rows(rows: list[dict], cols_per_row: int, max_bind_params: int) -> Iterable[list[dict]]: if not rows: return [] rows_per_stmt = max(1, max_bind_params // max(1, cols_per_row)) for i in range(0, len(rows), rows_per_stmt): yield rows[i:i + rows_per_stmt] def _iter_chunks(seq, n: int): for i in range(0, len(seq), n): yield seq[i:i + n] def _rows_per_stmt(cols: int) -> int: return max(1, MAX_BIND_PARAMS // max(1, cols)) def seed_from_paths_batch( session: Session, *, specs: list[dict], owner_id: str = "", ) -> dict: """Each spec is a dict with keys: - abs_path: str - size_bytes: int - mtime_ns: int - info_name: str - tags: list[str] - fname: Optional[str] """ if not specs: return {"inserted_infos": 0, "won_states": 0, "lost_states": 0} now = utcnow() asset_rows: list[dict] = [] state_rows: list[dict] = [] path_to_asset: dict[str, str] = {} asset_to_info: dict[str, dict] = {} # asset_id -> prepared info row path_list: list[str] = [] for sp in specs: ap = os.path.abspath(sp["abs_path"]) aid = str(uuid.uuid4()) iid = str(uuid.uuid4()) path_list.append(ap) path_to_asset[ap] = aid asset_rows.append( { "id": aid, "hash": None, "size_bytes": sp["size_bytes"], "mime_type": None, "created_at": now, } ) state_rows.append( { "asset_id": aid, "file_path": ap, "mtime_ns": sp["mtime_ns"], } ) asset_to_info[aid] = { "id": iid, "owner_id": owner_id, "name": sp["info_name"], "asset_id": aid, "preview_id": None, "user_metadata": {"filename": sp["fname"]} if sp["fname"] else None, "created_at": now, "updated_at": now, "last_access_time": now, "_tags": sp["tags"], "_filename": sp["fname"], } # insert all seed Assets (hash=NULL) ins_asset = sqlite.insert(Asset) for chunk in _iter_chunks(asset_rows, _rows_per_stmt(5)): session.execute(ins_asset, chunk) # try to claim AssetCacheState (file_path) # Insert with ON CONFLICT DO NOTHING, then query to find which paths were actually inserted ins_state = ( sqlite.insert(AssetCacheState) .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) ) for chunk in _iter_chunks(state_rows, _rows_per_stmt(3)): session.execute(ins_state, chunk) # Query to find which of our paths won (were actually inserted) winners_by_path: set[str] = set() for chunk in _iter_chunks(path_list, MAX_BIND_PARAMS): result = session.execute( sqlalchemy.select(AssetCacheState.file_path) .where(AssetCacheState.file_path.in_(chunk)) .where(AssetCacheState.asset_id.in_([path_to_asset[p] for p in chunk])) ) winners_by_path.update(result.scalars().all()) all_paths_set = set(path_list) losers_by_path = all_paths_set - winners_by_path lost_assets = [path_to_asset[p] for p in losers_by_path] if lost_assets: # losers get their Asset removed for id_chunk in _iter_chunks(lost_assets, MAX_BIND_PARAMS): session.execute(sqlalchemy.delete(Asset).where(Asset.id.in_(id_chunk))) if not winners_by_path: return {"inserted_infos": 0, "won_states": 0, "lost_states": len(losers_by_path)} # insert AssetInfo only for winners # Insert with ON CONFLICT DO NOTHING, then query to find which were actually inserted winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path] ins_info = ( sqlite.insert(AssetInfo) .on_conflict_do_nothing(index_elements=[AssetInfo.asset_id, AssetInfo.owner_id, AssetInfo.name]) ) for chunk in _iter_chunks(winner_info_rows, _rows_per_stmt(9)): session.execute(ins_info, chunk) # Query to find which info rows were actually inserted (by matching our generated IDs) all_info_ids = [row["id"] for row in winner_info_rows] inserted_info_ids: set[str] = set() for chunk in _iter_chunks(all_info_ids, MAX_BIND_PARAMS): result = session.execute( sqlalchemy.select(AssetInfo.id).where(AssetInfo.id.in_(chunk)) ) inserted_info_ids.update(result.scalars().all()) # build and insert tag + meta rows for the AssetInfo tag_rows: list[dict] = [] meta_rows: list[dict] = [] if inserted_info_ids: for row in winner_info_rows: iid = row["id"] if iid not in inserted_info_ids: continue for t in row["_tags"]: tag_rows.append({ "asset_info_id": iid, "tag_name": t, "origin": "automatic", "added_at": now, }) if row["_filename"]: meta_rows.append( { "asset_info_id": iid, "key": "filename", "ordinal": 0, "val_str": row["_filename"], "val_num": None, "val_bool": None, "val_json": None, } ) bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows, max_bind_params=MAX_BIND_PARAMS) return { "inserted_infos": len(inserted_info_ids), "won_states": len(winners_by_path), "lost_states": len(losers_by_path), } def bulk_insert_tags_and_meta( session: Session, *, tag_rows: list[dict], meta_rows: list[dict], max_bind_params: int, ) -> None: """Batch insert into asset_info_tags and asset_info_meta with ON CONFLICT DO NOTHING. - tag_rows keys: asset_info_id, tag_name, origin, added_at - meta_rows keys: asset_info_id, key, ordinal, val_str, val_num, val_bool, val_json """ if tag_rows: ins_links = ( sqlite.insert(AssetInfoTag) .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) ) for chunk in _chunk_rows(tag_rows, cols_per_row=4, max_bind_params=max_bind_params): session.execute(ins_links, chunk) if meta_rows: ins_meta = ( sqlite.insert(AssetInfoMeta) .on_conflict_do_nothing( index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal] ) ) for chunk in _chunk_rows(meta_rows, cols_per_row=7, max_bind_params=max_bind_params): session.execute(ins_meta, chunk)