From f9602457d6aeff401c3af5b8ac69c93555f1b070 Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Wed, 17 Sep 2025 16:47:27 +0300 Subject: [PATCH] optimization: initial scan speed(batching metadata[filename]) --- app/assets_scanner.py | 8 ++++++++ app/database/helpers/__init__.py | 2 ++ app/database/helpers/meta.py | 30 ++++++++++++++++++++++++++++++ app/database/helpers/tags.py | 4 +++- app/database/services/content.py | 27 +++++++++++++-------------- 5 files changed, 56 insertions(+), 15 deletions(-) create mode 100644 app/database/helpers/meta.py diff --git a/app/assets_scanner.py b/app/assets_scanner.py index b90be1a12..e622d6e3c 100644 --- a/app/assets_scanner.py +++ b/app/assets_scanner.py @@ -26,6 +26,7 @@ from .database.helpers import ( ensure_tags_exist, escape_like_prefix, fast_asset_file_check, + insert_meta_from_batch, insert_tags_from_batch, remove_missing_tag_for_asset_id, ) @@ -149,6 +150,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: await ensure_tags_exist(sess, tag_pool, tag_type="user") pending_tag_links: list[dict] = [] + pending_meta_rows: list[dict] = [] for ap, sz, mt, name, tags in new_specs: await seed_from_path( sess, @@ -159,6 +161,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: tags=tags, owner_id="", collected_tag_rows=pending_tag_links, + collected_meta_rows=pending_meta_rows, ) created += 1 @@ -166,9 +169,14 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None: if pending_tag_links: await insert_tags_from_batch(sess, tag_rows=pending_tag_links) pending_tag_links.clear() + if pending_meta_rows: + await insert_meta_from_batch(sess, rows=pending_meta_rows) + pending_meta_rows.clear() await sess.commit() if pending_tag_links: await insert_tags_from_batch(sess, tag_rows=pending_tag_links) + if pending_meta_rows: + await insert_meta_from_batch(sess, rows=pending_meta_rows) await sess.commit() finally: LOGGER.info( diff --git a/app/database/helpers/__init__.py b/app/database/helpers/__init__.py index 6d3db744f..fda457ca9 100644 --- a/app/database/helpers/__init__.py +++ b/app/database/helpers/__init__.py @@ -1,6 +1,7 @@ from .escape_like import escape_like_prefix from .fast_check import fast_asset_file_check from .filters import apply_metadata_filter, apply_tag_filters +from .meta import insert_meta_from_batch from .ownership import visible_owner_clause from .projection import is_scalar, project_kv from .tags import ( @@ -20,6 +21,7 @@ __all__ = [ "ensure_tags_exist", "add_missing_tag_for_asset_id", "remove_missing_tag_for_asset_id", + "insert_meta_from_batch", "insert_tags_from_batch", "visible_owner_clause", ] diff --git a/app/database/helpers/meta.py b/app/database/helpers/meta.py new file mode 100644 index 000000000..a2c801a32 --- /dev/null +++ b/app/database/helpers/meta.py @@ -0,0 +1,30 @@ +from sqlalchemy.dialects import postgresql as d_pg +from sqlalchemy.dialects import sqlite as d_sqlite +from sqlalchemy.ext.asyncio import AsyncSession + +from ..models import AssetInfoMeta + + +async def insert_meta_from_batch(session: AsyncSession, *, rows: list[dict]) -> None: + """Bulk insert rows into asset_info_meta with ON CONFLICT DO NOTHING. + Each row should contain: asset_info_id, key, ordinal, val_str, val_num, val_bool, val_json + """ + if session.bind.dialect.name == "sqlite": + ins = ( + d_sqlite.insert(AssetInfoMeta) + .values(rows) + .on_conflict_do_nothing( + index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal] + ) + ) + elif session.bind.dialect.name == "postgresql": + ins = ( + d_pg.insert(AssetInfoMeta) + .values(rows) + .on_conflict_do_nothing( + index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal] + ) + ) + else: + raise NotImplementedError(f"Unsupported database dialect: {session.bind.dialect.name}") + await session.execute(ins) diff --git a/app/database/helpers/tags.py b/app/database/helpers/tags.py index 40e22ac07..5bc393a8b 100644 --- a/app/database/helpers/tags.py +++ b/app/database/helpers/tags.py @@ -97,10 +97,12 @@ async def insert_tags_from_batch(session: AsyncSession, *, tag_rows: list[dict]) .values(tag_rows) .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) ) - else: + elif session.bind.dialect.name == "postgresql": ins_links = ( d_pg.insert(AssetInfoTag) .values(tag_rows) .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) ) + else: + raise NotImplementedError(f"Unsupported database dialect: {session.bind.dialect.name}") await session.execute(ins_links) diff --git a/app/database/services/content.py b/app/database/services/content.py index ae50e29ec..903238c9f 100644 --- a/app/database/services/content.py +++ b/app/database/services/content.py @@ -20,7 +20,7 @@ from ..helpers import ( escape_like_prefix, remove_missing_tag_for_asset_id, ) -from ..models import Asset, AssetCacheState, AssetInfo, AssetInfoMeta, AssetInfoTag, Tag +from ..models import Asset, AssetCacheState, AssetInfo, AssetInfoTag, Tag from ..timeutil import utcnow from .info import replace_asset_info_metadata_projection from .queries import list_cache_states_by_asset_id, pick_best_live_path @@ -68,6 +68,7 @@ async def seed_from_path( tags: Sequence[str], owner_id: str = "", collected_tag_rows: list[dict], + collected_meta_rows: list[dict], ) -> None: """Creates Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path.""" locator = os.path.abspath(abs_path) @@ -166,19 +167,17 @@ async def seed_from_path( collected_tag_rows.extend(tag_rows) if fname: # simple filename projection with single row - meta_row = { - "asset_info_id": new_info_id, - "key": "filename", - "ordinal": 0, - "val_str": fname, - "val_num": None, - "val_bool": None, - "val_json": None, - } - if dialect == "sqlite": - await session.execute(d_sqlite.insert(AssetInfoMeta).values(**meta_row)) - else: - await session.execute(d_pg.insert(AssetInfoMeta).values(**meta_row)) + collected_meta_rows.append( + { + "asset_info_id": new_info_id, + "key": "filename", + "ordinal": 0, + "val_str": fname, + "val_num": None, + "val_bool": None, + "val_json": None, + } + ) async def redirect_all_references_then_delete_asset(