diff --git a/app/__init__.py b/app/__init__.py index e69de29bb..5fade97a4 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -0,0 +1,5 @@ +from .database.db import init_db_engine +from .assets_scanner import start_background_assets_scan + + +__all__ = ["init_db_engine", "start_background_assets_scan"] diff --git a/app/assets_scanner.py b/app/assets_scanner.py index ccfc8e9e5..86e8b23cd 100644 --- a/app/assets_scanner.py +++ b/app/assets_scanner.py @@ -55,8 +55,8 @@ PROGRESS_BY_ROOT: dict[RootType, ScanProgress] = {} SLOW_STATE_BY_ROOT: dict[RootType, SlowQueueState] = {} -def _new_scan_id(root: RootType) -> str: - return f"scan-{root}-{uuid.uuid4().hex[:8]}" +async def start_background_assets_scan(): + await fast_reconcile_and_kickoff(progress_cb=_console_cb) def current_statuses() -> schemas_out.AssetScanStatusResponse: @@ -108,7 +108,7 @@ async def schedule_scans(roots: Sequence[str]) -> schemas_out.AssetScanStatusRes async def fast_reconcile_and_kickoff( roots: Sequence[str] | None = None, *, - progress_cb: Optional[Callable[[dict], None]] = None, + progress_cb: Optional[Callable[[str, str, int, bool, dict], None]] = None, ) -> schemas_out.AssetScanStatusResponse: """ Startup helper: do the fast pass now (so we know queue size), @@ -179,7 +179,7 @@ def _scan_progress_to_scan_status_model(progress: ScanProgress) -> schemas_out.A async def _pipeline_for_root( root: RootType, prog: ScanProgress, - progress_cb: Optional[Callable[[dict], None]], + progress_cb: Optional[Callable[[str, str, int, bool, dict], None]], ) -> None: state = SLOW_STATE_BY_ROOT.get(root) or SlowQueueState(queue=asyncio.Queue()) SLOW_STATE_BY_ROOT[root] = state @@ -208,7 +208,7 @@ async def _fast_reconcile_into_queue( prog: ScanProgress, state: SlowQueueState, *, - progress_cb: Optional[Callable[[dict], None]], + progress_cb: Optional[Callable[[str, str, int, bool, dict], None]], ) -> None: """ Enumerate files, set 'discovered' to total files seen, increment 'processed' for fast-matched files, @@ -281,29 +281,22 @@ async def _fast_reconcile_into_queue( prog.slow_queue_total += 1 if progress_cb: - progress_cb({ - "root": root, - "phase": "fast", + progress_cb(root, "fast", prog.processed, False, { "checked": checked, "clean": clean, "queued": queued, "discovered": prog.discovered, - "processed": prog.processed, }) prog._fast_total_seen = checked prog._fast_clean = clean if progress_cb: - progress_cb({ - "root": root, - "phase": "fast", + progress_cb(root, "fast", prog.processed, True, { "checked": checked, "clean": clean, "queued": queued, "discovered": prog.discovered, - "processed": prog.processed, - "done": True, }) state.closed = True @@ -314,7 +307,7 @@ def _start_slow_workers( prog: ScanProgress, state: SlowQueueState, *, - progress_cb: Optional[Callable[[dict], None]], + progress_cb: Optional[Callable[[str, str, int, bool, dict], None]], ) -> None: if state.workers: return @@ -334,10 +327,7 @@ def _start_slow_workers( prog.slow_queue_finished += 1 prog.processed += 1 if progress_cb: - progress_cb({ - "root": root, - "phase": "slow", - "processed": prog.processed, + progress_cb(root, "slow", prog.processed, False, { "slow_queue_finished": prog.slow_queue_finished, "slow_queue_total": prog.slow_queue_total, }) @@ -361,20 +351,16 @@ async def _await_workers_then_finish( prog: ScanProgress, state: SlowQueueState, *, - progress_cb: Optional[Callable[[dict], None]], + progress_cb: Optional[Callable[[str, str, int, bool, dict], None]], ) -> None: if state.workers: await asyncio.gather(*state.workers, return_exceptions=True) prog.finished_at = time.time() prog.status = "completed" if progress_cb: - progress_cb({ - "root": root, - "phase": "slow", - "processed": prog.processed, + progress_cb(root, "slow", prog.processed, True, { "slow_queue_finished": prog.slow_queue_finished, "slow_queue_total": prog.slow_queue_total, - "done": True, }) @@ -453,3 +439,41 @@ def _ts_to_iso(ts: Optional[float]) -> Optional[str]: return datetime.fromtimestamp(float(ts), tz=timezone.utc).replace(tzinfo=None).isoformat() except Exception: return None + + +def _new_scan_id(root: RootType) -> str: + return f"scan-{root}-{uuid.uuid4().hex[:8]}" + + +def _console_cb(root: str, phase: str, total_processed: int, finished: bool, e: dict): + if phase == "fast": + if finished: + logging.info( + "[assets][%s] fast done: processed=%s/%s queued=%s", + root, + total_processed, + e["discovered"], + e["queued"], + ) + elif e.get("checked", 0) % 500 == 0: # do not spam with fast progress + logging.info( + "[assets][%s] fast progress: processed=%s/%s", + root, + total_processed, + e["discovered"], + ) + elif phase == "slow": + if finished: + logging.info( + "[assets][%s] slow done: %s/%s", + root, + e.get("slow_queue_finished", 0), + e.get("slow_queue_total", 0), + ) + elif e.get('slow_queue_finished', 0) % 3 == 0: + logging.info( + "[assets][%s] slow progress: %s/%s", + root, + e.get("slow_queue_finished", 0), + e.get("slow_queue_total", 0), + ) diff --git a/app/database/db.py b/app/database/db.py index 2a619f13b..67ddf412b 100644 --- a/app/database/db.py +++ b/app/database/db.py @@ -4,44 +4,18 @@ import shutil from contextlib import asynccontextmanager from typing import Optional -from app.logger import log_startup_warning -from utils.install_util import get_missing_requirements_message from comfy.cli_args import args - +from alembic import command +from alembic.config import Config +from alembic.runtime.migration import MigrationContext +from alembic.script import ScriptDirectory +from sqlalchemy import create_engine, text +from sqlalchemy.engine import make_url +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine LOGGER = logging.getLogger(__name__) - -# Attempt imports which may not exist in some environments -try: - from alembic import command - from alembic.config import Config - from alembic.runtime.migration import MigrationContext - from alembic.script import ScriptDirectory - from sqlalchemy import create_engine, text - from sqlalchemy.engine import make_url - from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - - _DB_AVAILABLE = True - ENGINE: AsyncEngine | None = None - SESSION: async_sessionmaker | None = None -except ImportError as e: - log_startup_warning( - ( - "------------------------------------------------------------------------\n" - f"Error importing DB dependencies: {e}\n" - f"{get_missing_requirements_message()}\n" - "This error is happening because ComfyUI now uses a local database.\n" - "------------------------------------------------------------------------" - ).strip() - ) - _DB_AVAILABLE = False - ENGINE = None - SESSION = None - - -def dependencies_available() -> bool: - """Check if DB dependencies are importable.""" - return _DB_AVAILABLE +ENGINE: Optional[AsyncEngine] = None +SESSION: Optional[async_sessionmaker] = None def _root_paths(): @@ -115,9 +89,6 @@ async def init_db_engine() -> None: """ global ENGINE, SESSION - if not dependencies_available(): - raise RuntimeError("Database dependencies are not available.") - if ENGINE is not None: return diff --git a/main.py b/main.py index 017f88a63..3485a7c76 100644 --- a/main.py +++ b/main.py @@ -279,37 +279,12 @@ def cleanup_temp(): shutil.rmtree(temp_dir, ignore_errors=True) async def setup_database(): - def _console_cb(e: dict): - root = e.get("root") - phase = e.get("phase") - if phase == "fast": - if e.get("done"): - logging.info( - f"[assets][{root}] fast done: processed={e['processed']}/{e['discovered']} queued={e['queued']}" - ) - elif e.get("checked", 0) % 500 == 0: # do not spam with fast progress - logging.info(f"[assets][{root}] fast progress: processed={e['processed']}/{e['discovered']}" - ) - elif phase == "slow": - if e.get("done"): - logging.info( - f"[assets][{root}] slow done: {e.get('slow_queue_finished', 0)}/{e.get('slow_queue_total', 0)}" - ) - else: - logging.info( - f"[assets][{root}] slow progress: {e.get('slow_queue_finished', 0)}/{e.get('slow_queue_total', 0)}" - ) + from app import init_db_engine, start_background_assets_scan - try: - from app.database.db import init_db_engine, dependencies_available - if dependencies_available(): - await init_db_engine() - if not args.disable_assets_autoscan: - from app import assets_scanner + await init_db_engine() + if not args.disable_assets_autoscan: + await start_background_assets_scan() - await assets_scanner.fast_reconcile_and_kickoff(progress_cb=_console_cb) - except Exception as e: - logging.error(f"Failed to initialize database. Please ensure you have installed the latest requirements. If the error persists, please report this as in future the database will be required: {e}") def start_comfyui(asyncio_loop=None): """