optimization: initial scan speed(batching tags)

This commit is contained in:
bigcat88 2025-09-17 13:40:08 +03:00
parent 5b6810a2c6
commit 85ef08449d
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
4 changed files with 29 additions and 17 deletions

View File

@ -26,6 +26,7 @@ from .database.helpers import (
ensure_tags_exist,
escape_like_prefix,
fast_asset_file_check,
insert_tags_from_batch,
remove_missing_tag_for_asset_id,
)
from .database.models import Asset, AssetCacheState, AssetInfo
@ -146,6 +147,8 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
async with await create_session() as sess:
if tag_pool:
await ensure_tags_exist(sess, tag_pool, tag_type="user")
pending_tag_links: list[dict] = []
for ap, sz, mt, name, tags in new_specs:
await seed_from_path(
sess,
@ -155,12 +158,17 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
info_name=name,
tags=tags,
owner_id="",
skip_tag_ensure=True,
collected_tag_rows=pending_tag_links,
)
created += 1
if created % 500 == 0:
if pending_tag_links:
await insert_tags_from_batch(sess, tag_rows=pending_tag_links)
pending_tag_links.clear()
await sess.commit()
if pending_tag_links:
await insert_tags_from_batch(sess, tag_rows=pending_tag_links)
await sess.commit()
finally:
LOGGER.info(

View File

@ -6,6 +6,7 @@ from .projection import is_scalar, project_kv
from .tags import (
add_missing_tag_for_asset_id,
ensure_tags_exist,
insert_tags_from_batch,
remove_missing_tag_for_asset_id,
)
@ -19,5 +20,6 @@ __all__ = [
"ensure_tags_exist",
"add_missing_tag_for_asset_id",
"remove_missing_tag_for_asset_id",
"insert_tags_from_batch",
"visible_owner_clause",
]

View File

@ -88,3 +88,19 @@ async def remove_missing_tag_for_asset_id(
AssetInfoTag.tag_name == "missing",
)
)
async def insert_tags_from_batch(session: AsyncSession, *, tag_rows: list[dict]) -> None:
if session.bind.dialect.name == "sqlite":
ins_links = (
d_sqlite.insert(AssetInfoTag)
.values(tag_rows)
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
)
else:
ins_links = (
d_pg.insert(AssetInfoTag)
.values(tag_rows)
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
)
await session.execute(ins_links)

View File

@ -67,7 +67,7 @@ async def seed_from_path(
info_name: str,
tags: Sequence[str],
owner_id: str = "",
skip_tag_ensure: bool = False,
collected_tag_rows: list[dict],
) -> None:
"""Creates Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path."""
locator = os.path.abspath(abs_path)
@ -154,8 +154,6 @@ async def seed_from_path(
if info_inserted:
want = normalize_tags(tags)
if want:
if not skip_tag_ensure:
await ensure_tags_exist(session, want, tag_type="user")
tag_rows = [
{
"asset_info_id": new_info_id,
@ -165,19 +163,7 @@ async def seed_from_path(
}
for t in want
]
if dialect == "sqlite":
ins_links = (
d_sqlite.insert(AssetInfoTag)
.values(tag_rows)
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
)
else:
ins_links = (
d_pg.insert(AssetInfoTag)
.values(tag_rows)
.on_conflict_do_nothing(index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name])
)
await session.execute(ins_links)
collected_tag_rows.extend(tag_rows)
if fname: # simple filename projection with single row
meta_row = {