optimization: initial scan speed(batching metadata[filename])

This commit is contained in:
bigcat88 2025-09-17 16:47:27 +03:00
parent 85ef08449d
commit f9602457d6
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
5 changed files with 56 additions and 15 deletions

View File

@ -26,6 +26,7 @@ from .database.helpers import (
ensure_tags_exist, ensure_tags_exist,
escape_like_prefix, escape_like_prefix,
fast_asset_file_check, fast_asset_file_check,
insert_meta_from_batch,
insert_tags_from_batch, insert_tags_from_batch,
remove_missing_tag_for_asset_id, remove_missing_tag_for_asset_id,
) )
@ -149,6 +150,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
await ensure_tags_exist(sess, tag_pool, tag_type="user") await ensure_tags_exist(sess, tag_pool, tag_type="user")
pending_tag_links: list[dict] = [] pending_tag_links: list[dict] = []
pending_meta_rows: list[dict] = []
for ap, sz, mt, name, tags in new_specs: for ap, sz, mt, name, tags in new_specs:
await seed_from_path( await seed_from_path(
sess, sess,
@ -159,6 +161,7 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
tags=tags, tags=tags,
owner_id="", owner_id="",
collected_tag_rows=pending_tag_links, collected_tag_rows=pending_tag_links,
collected_meta_rows=pending_meta_rows,
) )
created += 1 created += 1
@ -166,9 +169,14 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
if pending_tag_links: if pending_tag_links:
await insert_tags_from_batch(sess, tag_rows=pending_tag_links) await insert_tags_from_batch(sess, tag_rows=pending_tag_links)
pending_tag_links.clear() pending_tag_links.clear()
if pending_meta_rows:
await insert_meta_from_batch(sess, rows=pending_meta_rows)
pending_meta_rows.clear()
await sess.commit() await sess.commit()
if pending_tag_links: if pending_tag_links:
await insert_tags_from_batch(sess, tag_rows=pending_tag_links) await insert_tags_from_batch(sess, tag_rows=pending_tag_links)
if pending_meta_rows:
await insert_meta_from_batch(sess, rows=pending_meta_rows)
await sess.commit() await sess.commit()
finally: finally:
LOGGER.info( LOGGER.info(

View File

@ -1,6 +1,7 @@
from .escape_like import escape_like_prefix from .escape_like import escape_like_prefix
from .fast_check import fast_asset_file_check from .fast_check import fast_asset_file_check
from .filters import apply_metadata_filter, apply_tag_filters from .filters import apply_metadata_filter, apply_tag_filters
from .meta import insert_meta_from_batch
from .ownership import visible_owner_clause from .ownership import visible_owner_clause
from .projection import is_scalar, project_kv from .projection import is_scalar, project_kv
from .tags import ( from .tags import (
@ -20,6 +21,7 @@ __all__ = [
"ensure_tags_exist", "ensure_tags_exist",
"add_missing_tag_for_asset_id", "add_missing_tag_for_asset_id",
"remove_missing_tag_for_asset_id", "remove_missing_tag_for_asset_id",
"insert_meta_from_batch",
"insert_tags_from_batch", "insert_tags_from_batch",
"visible_owner_clause", "visible_owner_clause",
] ]

View File

@ -0,0 +1,30 @@
from sqlalchemy.dialects import postgresql as d_pg
from sqlalchemy.dialects import sqlite as d_sqlite
from sqlalchemy.ext.asyncio import AsyncSession
from ..models import AssetInfoMeta
async def insert_meta_from_batch(session: AsyncSession, *, rows: list[dict]) -> None:
"""Bulk insert rows into asset_info_meta with ON CONFLICT DO NOTHING.
Each row should contain: asset_info_id, key, ordinal, val_str, val_num, val_bool, val_json
"""
if session.bind.dialect.name == "sqlite":
ins = (
d_sqlite.insert(AssetInfoMeta)
.values(rows)
.on_conflict_do_nothing(
index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal]
)
)
elif session.bind.dialect.name == "postgresql":
ins = (
d_pg.insert(AssetInfoMeta)
.values(rows)
.on_conflict_do_nothing(
index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal]
)
)
else:
raise NotImplementedError(f"Unsupported database dialect: {session.bind.dialect.name}")
await session.execute(ins)

View File

@ -97,10 +97,12 @@ async def insert_tags_from_batch(session: AsyncSession, *, tag_rows: list[dict])
.values(tag_rows) .values(tag_rows)
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
) )
else: elif session.bind.dialect.name == "postgresql":
ins_links = ( ins_links = (
d_pg.insert(AssetInfoTag) d_pg.insert(AssetInfoTag)
.values(tag_rows) .values(tag_rows)
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name]) .on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
) )
else:
raise NotImplementedError(f"Unsupported database dialect: {session.bind.dialect.name}")
await session.execute(ins_links) await session.execute(ins_links)

View File

@ -20,7 +20,7 @@ from ..helpers import (
escape_like_prefix, escape_like_prefix,
remove_missing_tag_for_asset_id, remove_missing_tag_for_asset_id,
) )
from ..models import Asset, AssetCacheState, AssetInfo, AssetInfoMeta, AssetInfoTag, Tag from ..models import Asset, AssetCacheState, AssetInfo, AssetInfoTag, Tag
from ..timeutil import utcnow from ..timeutil import utcnow
from .info import replace_asset_info_metadata_projection from .info import replace_asset_info_metadata_projection
from .queries import list_cache_states_by_asset_id, pick_best_live_path from .queries import list_cache_states_by_asset_id, pick_best_live_path
@ -68,6 +68,7 @@ async def seed_from_path(
tags: Sequence[str], tags: Sequence[str],
owner_id: str = "", owner_id: str = "",
collected_tag_rows: list[dict], collected_tag_rows: list[dict],
collected_meta_rows: list[dict],
) -> None: ) -> None:
"""Creates Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path.""" """Creates Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path."""
locator = os.path.abspath(abs_path) locator = os.path.abspath(abs_path)
@ -166,19 +167,17 @@ async def seed_from_path(
collected_tag_rows.extend(tag_rows) collected_tag_rows.extend(tag_rows)
if fname: # simple filename projection with single row if fname: # simple filename projection with single row
meta_row = { collected_meta_rows.append(
"asset_info_id": new_info_id, {
"key": "filename", "asset_info_id": new_info_id,
"ordinal": 0, "key": "filename",
"val_str": fname, "ordinal": 0,
"val_num": None, "val_str": fname,
"val_bool": None, "val_num": None,
"val_json": None, "val_bool": None,
} "val_json": None,
if dialect == "sqlite": }
await session.execute(d_sqlite.insert(AssetInfoMeta).values(**meta_row)) )
else:
await session.execute(d_pg.insert(AssetInfoMeta).values(**meta_row))
async def redirect_all_references_then_delete_asset( async def redirect_all_references_then_delete_asset(