refactor: move scanner to services layer with pure query extraction

- Move app/assets/scanner.py to app/assets/services/scanner.py
- Extract pure queries from fast_db_consistency_pass:
  - get_cache_states_for_prefixes()
  - bulk_set_needs_verify()
  - delete_cache_states_by_ids()
  - delete_orphaned_seed_asset()
- Split prune_orphaned_assets into pure queries:
  - delete_cache_states_outside_prefixes()
  - get_orphaned_seed_asset_ids()
  - delete_assets_by_ids()
- Add reconcile_cache_states_for_root() service function
- Add prune_orphaned_assets() service function
- Remove function injection pattern
- Update imports in main.py, server.py, routes.py

Amp-Thread-ID: https://ampcode.com/threads/T-019c24f1-3385-701b-87e0-8b6bc87e841b
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-03 11:27:07 -08:00
parent fba4570e49
commit 64d2f51dfc
7 changed files with 400 additions and 263 deletions

View File

@ -11,7 +11,7 @@ import app.assets.manager as manager
from app import user_manager from app import user_manager
from app.assets.api import schemas_in from app.assets.api import schemas_in
from app.assets.helpers import get_query_dict from app.assets.helpers import get_query_dict
from app.assets.scanner import seed_assets from app.assets.services.scanner import seed_assets
import folder_paths import folder_paths

View File

@ -23,10 +23,16 @@ from app.assets.database.queries.asset_info import (
) )
from app.assets.database.queries.cache_state import ( from app.assets.database.queries.cache_state import (
CacheStateRow,
list_cache_states_by_asset_id, list_cache_states_by_asset_id,
upsert_cache_state, upsert_cache_state,
prune_orphaned_assets, delete_cache_states_outside_prefixes,
fast_db_consistency_pass, get_orphaned_seed_asset_ids,
delete_assets_by_ids,
get_cache_states_for_prefixes,
bulk_set_needs_verify,
delete_cache_states_by_ids,
delete_orphaned_seed_asset,
) )
from app.assets.database.queries.tags import ( from app.assets.database.queries.tags import (
@ -59,10 +65,16 @@ __all__ = [
"delete_asset_info_by_id", "delete_asset_info_by_id",
"set_asset_info_preview", "set_asset_info_preview",
# cache_state.py # cache_state.py
"CacheStateRow",
"list_cache_states_by_asset_id", "list_cache_states_by_asset_id",
"upsert_cache_state", "upsert_cache_state",
"prune_orphaned_assets", "delete_cache_states_outside_prefixes",
"fast_db_consistency_pass", "get_orphaned_seed_asset_ids",
"delete_assets_by_ids",
"get_cache_states_for_prefixes",
"bulk_set_needs_verify",
"delete_cache_states_by_ids",
"delete_orphaned_seed_asset",
# tags.py # tags.py
"ensure_tags_exist", "ensure_tags_exist",
"get_asset_tags", "get_asset_tags",

View File

@ -1,6 +1,5 @@
import contextlib
import os import os
from typing import Sequence from typing import NamedTuple, Sequence
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import select from sqlalchemy import select
@ -10,6 +9,31 @@ from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetCacheState, AssetInfo from app.assets.database.models import Asset, AssetCacheState, AssetInfo
from app.assets.helpers import escape_like_prefix from app.assets.helpers import escape_like_prefix
__all__ = [
"CacheStateRow",
"list_cache_states_by_asset_id",
"upsert_cache_state",
"delete_cache_states_outside_prefixes",
"get_orphaned_seed_asset_ids",
"delete_assets_by_ids",
"get_cache_states_for_prefixes",
"bulk_set_needs_verify",
"delete_cache_states_by_ids",
"delete_orphaned_seed_asset",
]
class CacheStateRow(NamedTuple):
"""Row from cache state query with joined asset data."""
state_id: int
file_path: str
mtime_ns: int | None
needs_verify: bool
asset_id: str
asset_hash: str | None
size_bytes: int
def list_cache_states_by_asset_id( def list_cache_states_by_asset_id(
session: Session, *, asset_id: str session: Session, *, asset_id: str
@ -63,19 +87,17 @@ def upsert_cache_state(
return False, updated return False, updated
def prune_orphaned_assets(session: Session, roots: tuple[str, ...], prefixes_for_root_fn) -> int: def delete_cache_states_outside_prefixes(session: Session, valid_prefixes: list[str]) -> int:
"""Prune cache states outside configured prefixes, then delete orphaned seed assets. """Delete cache states with file_path not matching any of the valid prefixes.
Args: Args:
session: Database session session: Database session
roots: Tuple of root types to prune valid_prefixes: List of absolute directory prefixes that are valid
prefixes_for_root_fn: Function to get prefixes for a root type
Returns: Returns:
Number of orphaned assets deleted Number of cache states deleted
""" """
all_prefixes = [os.path.abspath(p) for r in roots for p in prefixes_for_root_fn(r)] if not valid_prefixes:
if not all_prefixes:
return 0 return 0
def make_prefix_condition(prefix: str): def make_prefix_condition(prefix: str):
@ -83,153 +105,131 @@ def prune_orphaned_assets(session: Session, roots: tuple[str, ...], prefixes_for
escaped, esc = escape_like_prefix(base) escaped, esc = escape_like_prefix(base)
return AssetCacheState.file_path.like(escaped + "%", escape=esc) return AssetCacheState.file_path.like(escaped + "%", escape=esc)
matches_valid_prefix = sa.or_(*[make_prefix_condition(p) for p in all_prefixes]) matches_valid_prefix = sa.or_(*[make_prefix_condition(p) for p in valid_prefixes])
result = session.execute(sa.delete(AssetCacheState).where(~matches_valid_prefix))
return result.rowcount
def get_orphaned_seed_asset_ids(session: Session) -> list[str]:
"""Get IDs of seed assets (hash is None) with no remaining cache states.
Returns:
List of asset IDs that are orphaned
"""
orphan_subq = ( orphan_subq = (
sa.select(Asset.id) sa.select(Asset.id)
.outerjoin(AssetCacheState, AssetCacheState.asset_id == Asset.id) .outerjoin(AssetCacheState, AssetCacheState.asset_id == Asset.id)
.where(Asset.hash.is_(None), AssetCacheState.id.is_(None)) .where(Asset.hash.is_(None), AssetCacheState.id.is_(None))
).scalar_subquery() )
return [row[0] for row in session.execute(orphan_subq).all()]
session.execute(sa.delete(AssetCacheState).where(~matches_valid_prefix))
session.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id.in_(orphan_subq))) def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
result = session.execute(sa.delete(Asset).where(Asset.id.in_(orphan_subq))) """Delete assets and their AssetInfos by ID.
Args:
session: Database session
asset_ids: List of asset IDs to delete
Returns:
Number of assets deleted
"""
if not asset_ids:
return 0
session.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id.in_(asset_ids)))
result = session.execute(sa.delete(Asset).where(Asset.id.in_(asset_ids)))
return result.rowcount return result.rowcount
def fast_db_consistency_pass( def get_cache_states_for_prefixes(
session: Session, session: Session,
root: str, prefixes: list[str],
prefixes_for_root_fn, ) -> list[CacheStateRow]:
escape_like_prefix_fn, """Get all cache states with paths matching any of the given prefixes.
fast_asset_file_check_fn,
add_missing_tag_fn, Args:
remove_missing_tag_fn, session: Database session
collect_existing_paths: bool = False, prefixes: List of absolute directory prefixes to match
update_missing_tags: bool = False,
) -> set[str] | None: Returns:
"""Fast DB+FS pass for a root: List of cache state rows with joined asset data, ordered by asset_id, state_id
- Toggle needs_verify per state using fast check
- For hashed assets with at least one fast-ok state in this root: delete stale missing states
- For seed assets with all states missing: delete Asset and its AssetInfos
- Optionally add/remove 'missing' tags based on fast-ok in this root
- Optionally return surviving absolute paths
""" """
prefixes = prefixes_for_root_fn(root)
if not prefixes: if not prefixes:
return set() if collect_existing_paths else None return []
conds = [] conds = []
for p in prefixes: for p in prefixes:
base = os.path.abspath(p) base = os.path.abspath(p)
if not base.endswith(os.sep): if not base.endswith(os.sep):
base += os.sep base += os.sep
escaped, esc = escape_like_prefix_fn(base) escaped, esc = escape_like_prefix(base)
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc)) conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
rows = ( rows = session.execute(
session.execute( sa.select(
sa.select( AssetCacheState.id,
AssetCacheState.id, AssetCacheState.file_path,
AssetCacheState.file_path, AssetCacheState.mtime_ns,
AssetCacheState.mtime_ns, AssetCacheState.needs_verify,
AssetCacheState.needs_verify, AssetCacheState.asset_id,
AssetCacheState.asset_id, Asset.hash,
Asset.hash, Asset.size_bytes,
Asset.size_bytes,
)
.join(Asset, Asset.id == AssetCacheState.asset_id)
.where(sa.or_(*conds))
.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
) )
.join(Asset, Asset.id == AssetCacheState.asset_id)
.where(sa.or_(*conds))
.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
).all() ).all()
by_asset: dict[str, dict] = {} return [
for sid, fp, mtime_db, needs_verify, aid, a_hash, a_size in rows: CacheStateRow(
acc = by_asset.get(aid) state_id=row[0],
if acc is None: file_path=row[1],
acc = {"hash": a_hash, "size_db": int(a_size or 0), "states": []} mtime_ns=row[2],
by_asset[aid] = acc needs_verify=row[3],
asset_id=row[4],
fast_ok = False asset_hash=row[5],
try: size_bytes=int(row[6] or 0),
exists = True
fast_ok = fast_asset_file_check_fn(
mtime_db=mtime_db,
size_db=acc["size_db"],
stat_result=os.stat(fp, follow_symlinks=True),
)
except FileNotFoundError:
exists = False
except OSError:
exists = False
acc["states"].append({
"sid": sid,
"fp": fp,
"exists": exists,
"fast_ok": fast_ok,
"needs_verify": bool(needs_verify),
})
to_set_verify: list[int] = []
to_clear_verify: list[int] = []
stale_state_ids: list[int] = []
survivors: set[str] = set()
for aid, acc in by_asset.items():
a_hash = acc["hash"]
states = acc["states"]
any_fast_ok = any(s["fast_ok"] for s in states)
all_missing = all(not s["exists"] for s in states)
for s in states:
if not s["exists"]:
continue
if s["fast_ok"] and s["needs_verify"]:
to_clear_verify.append(s["sid"])
if not s["fast_ok"] and not s["needs_verify"]:
to_set_verify.append(s["sid"])
if a_hash is None:
if states and all_missing:
session.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id == aid))
asset = session.get(Asset, aid)
if asset:
session.delete(asset)
else:
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["fp"]))
continue
if any_fast_ok:
for s in states:
if not s["exists"]:
stale_state_ids.append(s["sid"])
if update_missing_tags:
with contextlib.suppress(Exception):
remove_missing_tag_fn(session, asset_id=aid)
elif update_missing_tags:
with contextlib.suppress(Exception):
add_missing_tag_fn(session, asset_id=aid, origin="automatic")
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["fp"]))
if stale_state_ids:
session.execute(sa.delete(AssetCacheState).where(AssetCacheState.id.in_(stale_state_ids)))
if to_set_verify:
session.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(to_set_verify))
.values(needs_verify=True)
) )
if to_clear_verify: for row in rows
session.execute( ]
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(to_clear_verify))
.values(needs_verify=False) def bulk_set_needs_verify(session: Session, state_ids: list[int], value: bool) -> int:
) """Set needs_verify flag for multiple cache states.
return survivors if collect_existing_paths else None
Returns: Number of rows updated
"""
if not state_ids:
return 0
result = session.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.id.in_(state_ids))
.values(needs_verify=value)
)
return result.rowcount
def delete_cache_states_by_ids(session: Session, state_ids: list[int]) -> int:
"""Delete cache states by their IDs.
Returns: Number of rows deleted
"""
if not state_ids:
return 0
result = session.execute(
sa.delete(AssetCacheState).where(AssetCacheState.id.in_(state_ids))
)
return result.rowcount
def delete_orphaned_seed_asset(session: Session, asset_id: str) -> bool:
"""Delete a seed asset (hash is None) and its AssetInfos.
Returns: True if asset was deleted, False if not found
"""
session.execute(sa.delete(AssetInfo).where(AssetInfo.asset_id == asset_id))
asset = session.get(Asset, asset_id)
if asset:
session.delete(asset)
return True
return False

View File

@ -1,121 +0,0 @@
import time
import logging
import os
import folder_paths
from app.database.db import create_session, dependencies_available
from app.assets.helpers import (
collect_models_files, compute_relative_filename, fast_asset_file_check, get_name_and_tags_from_asset_path,
list_tree, prefixes_for_root, escape_like_prefix,
RootType
)
from app.assets.database.queries import (
add_missing_tag_for_asset_id,
ensure_tags_exist,
remove_missing_tag_for_asset_id,
prune_orphaned_assets,
fast_db_consistency_pass,
)
from app.assets.database.bulk_ops import seed_from_paths_batch
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
"""
Scan the given roots and seed the assets into the database.
"""
if not dependencies_available():
if enable_logging:
logging.warning("Database dependencies not available, skipping assets scan")
return
t_start = time.perf_counter()
created = 0
skipped_existing = 0
orphans_pruned = 0
paths: list[str] = []
try:
existing_paths: set[str] = set()
for r in roots:
try:
with create_session() as sess:
survivors: set[str] = fast_db_consistency_pass(
sess,
r,
prefixes_for_root_fn=prefixes_for_root,
escape_like_prefix_fn=escape_like_prefix,
fast_asset_file_check_fn=fast_asset_file_check,
add_missing_tag_fn=add_missing_tag_for_asset_id,
remove_missing_tag_fn=remove_missing_tag_for_asset_id,
collect_existing_paths=True,
update_missing_tags=True,
)
sess.commit()
if survivors:
existing_paths.update(survivors)
except Exception as e:
logging.exception("fast DB scan failed for %s: %s", r, e)
try:
with create_session() as sess:
orphans_pruned = prune_orphaned_assets(sess, roots, prefixes_for_root)
sess.commit()
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
if "models" in roots:
paths.extend(collect_models_files())
if "input" in roots:
paths.extend(list_tree(folder_paths.get_input_directory()))
if "output" in roots:
paths.extend(list_tree(folder_paths.get_output_directory()))
specs: list[dict] = []
tag_pool: set[str] = set()
for p in paths:
abs_p = os.path.abspath(p)
if abs_p in existing_paths:
skipped_existing += 1
continue
try:
stat_p = os.stat(abs_p, follow_symlinks=False)
except OSError:
continue
# skip empty files
if not stat_p.st_size:
continue
name, tags = get_name_and_tags_from_asset_path(abs_p)
specs.append(
{
"abs_path": abs_p,
"size_bytes": stat_p.st_size,
"mtime_ns": getattr(stat_p, "st_mtime_ns", int(stat_p.st_mtime * 1_000_000_000)),
"info_name": name,
"tags": tags,
"fname": compute_relative_filename(abs_p),
}
)
for t in tags:
tag_pool.add(t)
# if no file specs, nothing to do
if not specs:
return
with create_session() as sess:
if tag_pool:
ensure_tags_exist(sess, tag_pool, tag_type="user")
result = seed_from_paths_batch(sess, specs=specs, owner_id="")
created += result["inserted_infos"]
sess.commit()
finally:
if enable_logging:
logging.info(
"Assets scan(roots=%s) completed in %.3fs (created=%d, skipped_existing=%d, orphans_pruned=%d, total_seen=%d)",
roots,
time.perf_counter() - t_start,
created,
skipped_existing,
orphans_pruned,
len(paths),
)

View File

@ -0,0 +1,246 @@
import contextlib
import logging
import os
import time
import folder_paths
from app.assets.database.bulk_ops import seed_from_paths_batch
from app.assets.database.queries import (
add_missing_tag_for_asset_id,
ensure_tags_exist,
remove_missing_tag_for_asset_id,
delete_cache_states_outside_prefixes,
get_orphaned_seed_asset_ids,
delete_assets_by_ids,
get_cache_states_for_prefixes,
bulk_set_needs_verify,
delete_cache_states_by_ids,
delete_orphaned_seed_asset,
)
from app.assets.helpers import (
collect_models_files,
compute_relative_filename,
fast_asset_file_check,
get_name_and_tags_from_asset_path,
list_tree,
prefixes_for_root,
RootType,
)
from app.database.db import create_session, dependencies_available
def prune_orphaned_assets(session, valid_prefixes: list[str]) -> int:
"""Prune cache states outside valid prefixes, then delete orphaned seed assets.
Args:
session: Database session
valid_prefixes: List of absolute directory prefixes that are valid
Returns:
Number of orphaned assets deleted
"""
delete_cache_states_outside_prefixes(session, valid_prefixes)
orphan_ids = get_orphaned_seed_asset_ids(session)
return delete_assets_by_ids(session, orphan_ids)
def reconcile_cache_states_for_root(
session,
root: RootType,
collect_existing_paths: bool = False,
update_missing_tags: bool = False,
) -> set[str] | None:
"""Reconcile cache states with filesystem for a root.
- Toggle needs_verify per state using fast mtime/size check
- For hashed assets with at least one fast-ok state in this root: delete stale missing states
- For seed assets with all states missing: delete Asset and its AssetInfos
- Optionally add/remove 'missing' tags based on fast-ok in this root
- Optionally return surviving absolute paths
Args:
session: Database session
root: Root type to scan
collect_existing_paths: If True, return set of surviving file paths
update_missing_tags: If True, update 'missing' tags based on file status
Returns:
Set of surviving absolute paths if collect_existing_paths=True, else None
"""
prefixes = prefixes_for_root(root)
if not prefixes:
return set() if collect_existing_paths else None
rows = get_cache_states_for_prefixes(session, prefixes)
by_asset: dict[str, dict] = {}
for row in rows:
acc = by_asset.get(row.asset_id)
if acc is None:
acc = {"hash": row.asset_hash, "size_db": row.size_bytes, "states": []}
by_asset[row.asset_id] = acc
fast_ok = False
try:
exists = True
fast_ok = fast_asset_file_check(
mtime_db=row.mtime_ns,
size_db=acc["size_db"],
stat_result=os.stat(row.file_path, follow_symlinks=True),
)
except FileNotFoundError:
exists = False
except OSError:
exists = False
acc["states"].append({
"sid": row.state_id,
"fp": row.file_path,
"exists": exists,
"fast_ok": fast_ok,
"needs_verify": row.needs_verify,
})
to_set_verify: list[int] = []
to_clear_verify: list[int] = []
stale_state_ids: list[int] = []
survivors: set[str] = set()
for aid, acc in by_asset.items():
a_hash = acc["hash"]
states = acc["states"]
any_fast_ok = any(s["fast_ok"] for s in states)
all_missing = all(not s["exists"] for s in states)
for s in states:
if not s["exists"]:
continue
if s["fast_ok"] and s["needs_verify"]:
to_clear_verify.append(s["sid"])
if not s["fast_ok"] and not s["needs_verify"]:
to_set_verify.append(s["sid"])
if a_hash is None:
if states and all_missing:
delete_orphaned_seed_asset(session, aid)
else:
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["fp"]))
continue
if any_fast_ok:
for s in states:
if not s["exists"]:
stale_state_ids.append(s["sid"])
if update_missing_tags:
with contextlib.suppress(Exception):
remove_missing_tag_for_asset_id(session, asset_id=aid)
elif update_missing_tags:
with contextlib.suppress(Exception):
add_missing_tag_for_asset_id(session, asset_id=aid, origin="automatic")
for s in states:
if s["exists"]:
survivors.add(os.path.abspath(s["fp"]))
delete_cache_states_by_ids(session, stale_state_ids)
bulk_set_needs_verify(session, to_set_verify, value=True)
bulk_set_needs_verify(session, to_clear_verify, value=False)
return survivors if collect_existing_paths else None
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
"""Scan the given roots and seed the assets into the database."""
if not dependencies_available():
if enable_logging:
logging.warning("Database dependencies not available, skipping assets scan")
return
t_start = time.perf_counter()
created = 0
skipped_existing = 0
orphans_pruned = 0
paths: list[str] = []
try:
existing_paths: set[str] = set()
for r in roots:
try:
with create_session() as sess:
survivors = reconcile_cache_states_for_root(
sess,
r,
collect_existing_paths=True,
update_missing_tags=True,
)
sess.commit()
if survivors:
existing_paths.update(survivors)
except Exception as e:
logging.exception("fast DB scan failed for %s: %s", r, e)
try:
with create_session() as sess:
all_prefixes = [
os.path.abspath(p) for r in roots for p in prefixes_for_root(r)
]
orphans_pruned = prune_orphaned_assets(sess, all_prefixes)
sess.commit()
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
if "models" in roots:
paths.extend(collect_models_files())
if "input" in roots:
paths.extend(list_tree(folder_paths.get_input_directory()))
if "output" in roots:
paths.extend(list_tree(folder_paths.get_output_directory()))
specs: list[dict] = []
tag_pool: set[str] = set()
for p in paths:
abs_p = os.path.abspath(p)
if abs_p in existing_paths:
skipped_existing += 1
continue
try:
stat_p = os.stat(abs_p, follow_symlinks=False)
except OSError:
continue
if not stat_p.st_size:
continue
name, tags = get_name_and_tags_from_asset_path(abs_p)
specs.append({
"abs_path": abs_p,
"size_bytes": stat_p.st_size,
"mtime_ns": getattr(stat_p, "st_mtime_ns", int(stat_p.st_mtime * 1_000_000_000)),
"info_name": name,
"tags": tags,
"fname": compute_relative_filename(abs_p),
})
for t in tags:
tag_pool.add(t)
if not specs:
return
with create_session() as sess:
if tag_pool:
ensure_tags_exist(sess, tag_pool, tag_type="user")
result = seed_from_paths_batch(sess, specs=specs, owner_id="")
created += result["inserted_infos"]
sess.commit()
finally:
if enable_logging:
logging.info(
"Assets scan(roots=%s) completed in %.3fs (created=%d, skipped_existing=%d, orphans_pruned=%d, total_seen=%d)",
roots,
time.perf_counter() - t_start,
created,
skipped_existing,
orphans_pruned,
len(paths),
)

View File

@ -7,7 +7,7 @@ import folder_paths
import time import time
from comfy.cli_args import args, enables_dynamic_vram from comfy.cli_args import args, enables_dynamic_vram
from app.logger import setup_logger from app.logger import setup_logger
from app.assets.scanner import seed_assets from app.assets.services.scanner import seed_assets
import itertools import itertools
import utils.extra_config import utils.extra_config
import logging import logging

View File

@ -33,7 +33,7 @@ import node_helpers
from comfyui_version import __version__ from comfyui_version import __version__
from app.frontend_management import FrontendManager, parse_version from app.frontend_management import FrontendManager, parse_version
from comfy_api.internal import _ComfyNodeInternal from comfy_api.internal import _ComfyNodeInternal
from app.assets.scanner import seed_assets from app.assets.services.scanner import seed_assets
from app.assets.api.routes import register_assets_system from app.assets.api.routes import register_assets_system
from app.user_manager import UserManager from app.user_manager import UserManager