assume that DB packages always present; refactoring & cleanup

This commit is contained in:
bigcat88 2025-09-06 17:43:46 +03:00
parent 84384ca0b4
commit 789a62ce35
No known key found for this signature in database
GPG Key ID: 1F0BF0EC3CF22721
4 changed files with 67 additions and 92 deletions

View File

@ -0,0 +1,5 @@
from .database.db import init_db_engine
from .assets_scanner import start_background_assets_scan
__all__ = ["init_db_engine", "start_background_assets_scan"]

View File

@ -55,8 +55,8 @@ PROGRESS_BY_ROOT: dict[RootType, ScanProgress] = {}
SLOW_STATE_BY_ROOT: dict[RootType, SlowQueueState] = {} SLOW_STATE_BY_ROOT: dict[RootType, SlowQueueState] = {}
def _new_scan_id(root: RootType) -> str: async def start_background_assets_scan():
return f"scan-{root}-{uuid.uuid4().hex[:8]}" await fast_reconcile_and_kickoff(progress_cb=_console_cb)
def current_statuses() -> schemas_out.AssetScanStatusResponse: def current_statuses() -> schemas_out.AssetScanStatusResponse:
@ -108,7 +108,7 @@ async def schedule_scans(roots: Sequence[str]) -> schemas_out.AssetScanStatusRes
async def fast_reconcile_and_kickoff( async def fast_reconcile_and_kickoff(
roots: Sequence[str] | None = None, roots: Sequence[str] | None = None,
*, *,
progress_cb: Optional[Callable[[dict], None]] = None, progress_cb: Optional[Callable[[str, str, int, bool, dict], None]] = None,
) -> schemas_out.AssetScanStatusResponse: ) -> schemas_out.AssetScanStatusResponse:
""" """
Startup helper: do the fast pass now (so we know queue size), Startup helper: do the fast pass now (so we know queue size),
@ -179,7 +179,7 @@ def _scan_progress_to_scan_status_model(progress: ScanProgress) -> schemas_out.A
async def _pipeline_for_root( async def _pipeline_for_root(
root: RootType, root: RootType,
prog: ScanProgress, prog: ScanProgress,
progress_cb: Optional[Callable[[dict], None]], progress_cb: Optional[Callable[[str, str, int, bool, dict], None]],
) -> None: ) -> None:
state = SLOW_STATE_BY_ROOT.get(root) or SlowQueueState(queue=asyncio.Queue()) state = SLOW_STATE_BY_ROOT.get(root) or SlowQueueState(queue=asyncio.Queue())
SLOW_STATE_BY_ROOT[root] = state SLOW_STATE_BY_ROOT[root] = state
@ -208,7 +208,7 @@ async def _fast_reconcile_into_queue(
prog: ScanProgress, prog: ScanProgress,
state: SlowQueueState, state: SlowQueueState,
*, *,
progress_cb: Optional[Callable[[dict], None]], progress_cb: Optional[Callable[[str, str, int, bool, dict], None]],
) -> None: ) -> None:
""" """
Enumerate files, set 'discovered' to total files seen, increment 'processed' for fast-matched files, Enumerate files, set 'discovered' to total files seen, increment 'processed' for fast-matched files,
@ -281,29 +281,22 @@ async def _fast_reconcile_into_queue(
prog.slow_queue_total += 1 prog.slow_queue_total += 1
if progress_cb: if progress_cb:
progress_cb({ progress_cb(root, "fast", prog.processed, False, {
"root": root,
"phase": "fast",
"checked": checked, "checked": checked,
"clean": clean, "clean": clean,
"queued": queued, "queued": queued,
"discovered": prog.discovered, "discovered": prog.discovered,
"processed": prog.processed,
}) })
prog._fast_total_seen = checked prog._fast_total_seen = checked
prog._fast_clean = clean prog._fast_clean = clean
if progress_cb: if progress_cb:
progress_cb({ progress_cb(root, "fast", prog.processed, True, {
"root": root,
"phase": "fast",
"checked": checked, "checked": checked,
"clean": clean, "clean": clean,
"queued": queued, "queued": queued,
"discovered": prog.discovered, "discovered": prog.discovered,
"processed": prog.processed,
"done": True,
}) })
state.closed = True state.closed = True
@ -314,7 +307,7 @@ def _start_slow_workers(
prog: ScanProgress, prog: ScanProgress,
state: SlowQueueState, state: SlowQueueState,
*, *,
progress_cb: Optional[Callable[[dict], None]], progress_cb: Optional[Callable[[str, str, int, bool, dict], None]],
) -> None: ) -> None:
if state.workers: if state.workers:
return return
@ -334,10 +327,7 @@ def _start_slow_workers(
prog.slow_queue_finished += 1 prog.slow_queue_finished += 1
prog.processed += 1 prog.processed += 1
if progress_cb: if progress_cb:
progress_cb({ progress_cb(root, "slow", prog.processed, False, {
"root": root,
"phase": "slow",
"processed": prog.processed,
"slow_queue_finished": prog.slow_queue_finished, "slow_queue_finished": prog.slow_queue_finished,
"slow_queue_total": prog.slow_queue_total, "slow_queue_total": prog.slow_queue_total,
}) })
@ -361,20 +351,16 @@ async def _await_workers_then_finish(
prog: ScanProgress, prog: ScanProgress,
state: SlowQueueState, state: SlowQueueState,
*, *,
progress_cb: Optional[Callable[[dict], None]], progress_cb: Optional[Callable[[str, str, int, bool, dict], None]],
) -> None: ) -> None:
if state.workers: if state.workers:
await asyncio.gather(*state.workers, return_exceptions=True) await asyncio.gather(*state.workers, return_exceptions=True)
prog.finished_at = time.time() prog.finished_at = time.time()
prog.status = "completed" prog.status = "completed"
if progress_cb: if progress_cb:
progress_cb({ progress_cb(root, "slow", prog.processed, True, {
"root": root,
"phase": "slow",
"processed": prog.processed,
"slow_queue_finished": prog.slow_queue_finished, "slow_queue_finished": prog.slow_queue_finished,
"slow_queue_total": prog.slow_queue_total, "slow_queue_total": prog.slow_queue_total,
"done": True,
}) })
@ -453,3 +439,41 @@ def _ts_to_iso(ts: Optional[float]) -> Optional[str]:
return datetime.fromtimestamp(float(ts), tz=timezone.utc).replace(tzinfo=None).isoformat() return datetime.fromtimestamp(float(ts), tz=timezone.utc).replace(tzinfo=None).isoformat()
except Exception: except Exception:
return None return None
def _new_scan_id(root: RootType) -> str:
return f"scan-{root}-{uuid.uuid4().hex[:8]}"
def _console_cb(root: str, phase: str, total_processed: int, finished: bool, e: dict):
if phase == "fast":
if finished:
logging.info(
"[assets][%s] fast done: processed=%s/%s queued=%s",
root,
total_processed,
e["discovered"],
e["queued"],
)
elif e.get("checked", 0) % 500 == 0: # do not spam with fast progress
logging.info(
"[assets][%s] fast progress: processed=%s/%s",
root,
total_processed,
e["discovered"],
)
elif phase == "slow":
if finished:
logging.info(
"[assets][%s] slow done: %s/%s",
root,
e.get("slow_queue_finished", 0),
e.get("slow_queue_total", 0),
)
elif e.get('slow_queue_finished', 0) % 3 == 0:
logging.info(
"[assets][%s] slow progress: %s/%s",
root,
e.get("slow_queue_finished", 0),
e.get("slow_queue_total", 0),
)

View File

@ -4,44 +4,18 @@ import shutil
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional from typing import Optional
from app.logger import log_startup_warning
from utils.install_util import get_missing_requirements_message
from comfy.cli_args import args from comfy.cli_args import args
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import create_engine, text
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
ENGINE: Optional[AsyncEngine] = None
# Attempt imports which may not exist in some environments SESSION: Optional[async_sessionmaker] = None
try:
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import create_engine, text
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
_DB_AVAILABLE = True
ENGINE: AsyncEngine | None = None
SESSION: async_sessionmaker | None = None
except ImportError as e:
log_startup_warning(
(
"------------------------------------------------------------------------\n"
f"Error importing DB dependencies: {e}\n"
f"{get_missing_requirements_message()}\n"
"This error is happening because ComfyUI now uses a local database.\n"
"------------------------------------------------------------------------"
).strip()
)
_DB_AVAILABLE = False
ENGINE = None
SESSION = None
def dependencies_available() -> bool:
"""Check if DB dependencies are importable."""
return _DB_AVAILABLE
def _root_paths(): def _root_paths():
@ -115,9 +89,6 @@ async def init_db_engine() -> None:
""" """
global ENGINE, SESSION global ENGINE, SESSION
if not dependencies_available():
raise RuntimeError("Database dependencies are not available.")
if ENGINE is not None: if ENGINE is not None:
return return

29
main.py
View File

@ -279,37 +279,12 @@ def cleanup_temp():
shutil.rmtree(temp_dir, ignore_errors=True) shutil.rmtree(temp_dir, ignore_errors=True)
async def setup_database(): async def setup_database():
def _console_cb(e: dict): from app import init_db_engine, start_background_assets_scan
root = e.get("root")
phase = e.get("phase")
if phase == "fast":
if e.get("done"):
logging.info(
f"[assets][{root}] fast done: processed={e['processed']}/{e['discovered']} queued={e['queued']}"
)
elif e.get("checked", 0) % 500 == 0: # do not spam with fast progress
logging.info(f"[assets][{root}] fast progress: processed={e['processed']}/{e['discovered']}"
)
elif phase == "slow":
if e.get("done"):
logging.info(
f"[assets][{root}] slow done: {e.get('slow_queue_finished', 0)}/{e.get('slow_queue_total', 0)}"
)
else:
logging.info(
f"[assets][{root}] slow progress: {e.get('slow_queue_finished', 0)}/{e.get('slow_queue_total', 0)}"
)
try:
from app.database.db import init_db_engine, dependencies_available
if dependencies_available():
await init_db_engine() await init_db_engine()
if not args.disable_assets_autoscan: if not args.disable_assets_autoscan:
from app import assets_scanner await start_background_assets_scan()
await assets_scanner.fast_reconcile_and_kickoff(progress_cb=_console_cb)
except Exception as e:
logging.error(f"Failed to initialize database. Please ensure you have installed the latest requirements. If the error persists, please report this as in future the database will be required: {e}")
def start_comfyui(asyncio_loop=None): def start_comfyui(asyncio_loop=None):
""" """