refactor(4): use one query to init DB with all tags for assets

This commit is contained in:
bigcat88 2025-09-16 21:18:18 +03:00
parent 677a0e2508
commit d0aa64d57b
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
2 changed files with 50 additions and 34 deletions

View File

@ -23,6 +23,7 @@ from .api import schemas_in, schemas_out
from .database.db import create_session
from .database.helpers import (
add_missing_tag_for_asset_id,
ensure_tags_exist,
escape_like_prefix,
fast_asset_file_check,
remove_missing_tag_for_asset_id,
@ -118,38 +119,52 @@ async def sync_seed_assets(roots: list[schemas_in.RootType]) -> None:
if "output" in roots:
paths.extend(list_tree(folder_paths.get_output_directory()))
new_specs: list[tuple[str, int, int, str, list[str]]] = []
tag_pool: set[str] = set()
for p in paths:
ap = os.path.abspath(p)
if ap in existing_paths:
skipped_existing += 1
continue
try:
st = os.stat(p, follow_symlinks=True)
except OSError:
continue
if not int(st.st_size or 0):
continue
name, tags = get_name_and_tags_from_asset_path(ap)
new_specs.append((
ap,
int(st.st_size),
getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)),
name,
tags,
))
for t in tags:
tag_pool.add(t)
async with await create_session() as sess:
for p in paths:
try:
if os.path.abspath(p) in existing_paths:
skipped_existing += 1
continue
st = os.stat(p, follow_symlinks=True)
if not int(st.st_size or 0):
continue
size_bytes = int(st.st_size)
mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))
name, tags = get_name_and_tags_from_asset_path(p)
if tag_pool:
await ensure_tags_exist(sess, tag_pool, tag_type="user")
for ap, sz, mt, name, tags in new_specs:
await ensure_seed_for_path(
sess,
abs_path=ap,
size_bytes=sz,
mtime_ns=mt,
info_name=name,
tags=tags,
owner_id="",
skip_tag_ensure=True,
)
await ensure_seed_for_path(
sess,
abs_path=p,
size_bytes=size_bytes,
mtime_ns=mtime_ns,
info_name=name,
tags=tags,
owner_id="",
)
created += 1
if created % 500 == 0:
await sess.commit()
except OSError:
continue
created += 1
if created % 500 == 0:
await sess.commit()
await sess.commit()
finally:
LOGGER.info(
"Assets scan(roots=%s) completed in %.3f s (created=%d, skipped_existing=%d, total_seen=%d)",
"Assets scan(roots=%s) completed in %.3fs (created=%d, skipped_existing=%d, total_seen=%d)",
roots,
time.perf_counter() - t_total,
created,

View File

@ -65,6 +65,7 @@ async def ensure_seed_for_path(
info_name: str,
tags: Sequence[str],
owner_id: str = "",
skip_tag_ensure: bool = False,
) -> str:
"""Ensure: Asset(hash=NULL), AssetCacheState(file_path), and AssetInfo exist for the path. Returns asset_id."""
locator = os.path.abspath(abs_path)
@ -81,20 +82,20 @@ async def ensure_seed_for_path(
if state:
state_row: AssetCacheState = state[0]
asset_row: Asset = state[1]
changed = state_row.mtime_ns is None or int(state_row.mtime_ns) != int(mtime_ns)
changed = state_row.mtime_ns is None or int(state_row.mtime_ns) != mtime_ns
if changed:
state_row.mtime_ns = int(mtime_ns)
state_row.mtime_ns = mtime_ns
state_row.needs_verify = True
if asset_row.size_bytes == 0 and size_bytes > 0:
asset_row.size_bytes = int(size_bytes)
asset_row.size_bytes = size_bytes
await session.flush()
return asset_row.id
asset = Asset(hash=None, size_bytes=int(size_bytes), mime_type=None, created_at=now)
asset = Asset(hash=None, size_bytes=size_bytes, mime_type=None, created_at=now)
session.add(asset)
await session.flush() # to get id
cs = AssetCacheState(asset_id=asset.id, file_path=locator, mtime_ns=int(mtime_ns), needs_verify=False)
cs = AssetCacheState(asset_id=asset.id, file_path=locator, mtime_ns=mtime_ns, needs_verify=False)
session.add(cs)
info = AssetInfo(
@ -120,12 +121,12 @@ async def ensure_seed_for_path(
want = normalize_tags(tags)
if want:
await ensure_tags_exist(session, want, tag_type="user")
if not skip_tag_ensure:
await ensure_tags_exist(session, want, tag_type="user")
session.add_all([
AssetInfoTag(asset_info_id=info.id, tag_name=t, origin="automatic", added_at=now)
for t in want
])
await session.flush()
return asset.id