Clean up imports, remove section banners, and fix seed endpoint with autoscan disabled

Move inline/lazy imports to top-level now that blake3 is a main dependency and
circular-import concerns are resolved. Remove decorative section-header comments
in asset_reference queries. Allow the seed API endpoint to temporarily re-enable
the seeder when --disable-assets-autoscan is active so manual triggers still work.
Add retry-on-409 logic in test helpers and remove try/finally around prompt
execution in favour of sequential pause/resume.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-23 15:58:00 -08:00
parent cdc6c1c143
commit 196959472a
10 changed files with 44 additions and 90 deletions

View File

@ -1,3 +1,4 @@
import json
import logging import logging
import os import os
import urllib.parse import urllib.parse
@ -80,8 +81,6 @@ def _build_error_response(
def _build_validation_error_response(code: str, ve: ValidationError) -> web.Response: def _build_validation_error_response(code: str, ve: ValidationError) -> web.Response:
import json
errors = json.loads(ve.json()) errors = json.loads(ve.json())
return _build_error_response(400, code, "Validation failed.", {"errors": errors}) return _build_error_response(400, code, "Validation failed.", {"errors": errors})
@ -489,8 +488,6 @@ async def get_tags(request: web.Request) -> web.Response:
try: try:
query = schemas_in.TagsListQuery.model_validate(query_map) query = schemas_in.TagsListQuery.model_validate(query_map)
except ValidationError as e: except ValidationError as e:
import json
return _build_error_response( return _build_error_response(
400, 400,
"INVALID_QUERY", "INVALID_QUERY",
@ -631,12 +628,22 @@ async def seed_assets(request: web.Request) -> web.Response:
wait_param = request.query.get("wait", "").lower() wait_param = request.query.get("wait", "").lower()
should_wait = wait_param in ("true", "1", "yes") should_wait = wait_param in ("true", "1", "yes")
# Temporarily enable seeder for explicit API calls (--disable-assets-autoscan
# only prevents the automatic startup scan, not manual triggers)
was_disabled = asset_seeder.is_disabled()
if was_disabled:
asset_seeder.enable()
started = asset_seeder.start(roots=valid_roots) started = asset_seeder.start(roots=valid_roots)
if not started: if not started:
if was_disabled:
asset_seeder.disable()
return web.json_response({"status": "already_running"}, status=409) return web.json_response({"status": "already_running"}, status=409)
if should_wait: if should_wait:
asset_seeder.wait() asset_seeder.wait()
if was_disabled:
asset_seeder.disable()
status = asset_seeder.get_status() status = asset_seeder.get_status()
return web.json_response( return web.json_response(
{ {

View File

@ -3,7 +3,7 @@ from sqlalchemy import select
from sqlalchemy.dialects import sqlite from sqlalchemy.dialects import sqlite
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.assets.database.models import Asset from app.assets.database.models import Asset, AssetReference
from app.assets.database.queries.common import calculate_rows_per_statement, iter_chunks from app.assets.database.queries.common import calculate_rows_per_statement, iter_chunks
@ -130,8 +130,6 @@ def reassign_asset_references(
Used when merging a stub asset into an existing asset with the same hash. Used when merging a stub asset into an existing asset with the same hash.
""" """
from app.assets.database.models import AssetReference
ref = session.get(AssetReference, reference_id) ref = session.get(AssetReference, reference_id)
if ref: if ref:
ref.asset_id = to_asset_id ref.asset_id = to_asset_id

View File

@ -32,11 +32,6 @@ from app.assets.database.queries.common import (
from app.assets.helpers import escape_sql_like_string, get_utc_now, normalize_tags from app.assets.helpers import escape_sql_like_string, get_utc_now, normalize_tags
# =============================================================================
# Metadata conversion helpers (from former asset_info.py)
# =============================================================================
def _check_is_scalar(v): def _check_is_scalar(v):
if v is None: if v is None:
return True return True
@ -84,11 +79,6 @@ def convert_metadata_to_rows(key: str, value) -> list[dict]:
return [{"key": key, "ordinal": 0, "val_json": value}] return [{"key": key, "ordinal": 0, "val_json": value}]
# =============================================================================
# Filter helpers
# =============================================================================
def _apply_tag_filters( def _apply_tag_filters(
stmt: sa.sql.Select, stmt: sa.sql.Select,
include_tags: Sequence[str] | None = None, include_tags: Sequence[str] | None = None,
@ -168,11 +158,6 @@ def _apply_metadata_filter(
return stmt return stmt
# =============================================================================
# Basic CRUD operations
# =============================================================================
def get_reference_by_id( def get_reference_by_id(
session: Session, session: Session,
reference_id: str, reference_id: str,
@ -302,11 +287,6 @@ def update_reference_timestamps(
reference.updated_at = now reference.updated_at = now
# =============================================================================
# Listing and pagination
# =============================================================================
def list_references_page( def list_references_page(
session: Session, session: Session,
owner_id: str = "", owner_id: str = "",
@ -440,11 +420,6 @@ def fetch_reference_and_asset(
return pair[0], pair[1] return pair[0], pair[1]
# =============================================================================
# Timestamp updates
# =============================================================================
def update_reference_access_time( def update_reference_access_time(
session: Session, session: Session,
reference_id: str, reference_id: str,
@ -491,11 +466,6 @@ def update_reference_updated_at(
) )
# =============================================================================
# Metadata operations
# =============================================================================
def set_reference_metadata( def set_reference_metadata(
session: Session, session: Session,
reference_id: str, reference_id: str,
@ -538,11 +508,6 @@ def set_reference_metadata(
session.flush() session.flush()
# =============================================================================
# Delete operations
# =============================================================================
def delete_reference_by_id( def delete_reference_by_id(
session: Session, session: Session,
reference_id: str, reference_id: str,
@ -576,11 +541,6 @@ def set_reference_preview(
session.flush() session.flush()
# =============================================================================
# Cache state operations (from former cache_state.py)
# =============================================================================
class CacheStateRow(NamedTuple): class CacheStateRow(NamedTuple):
"""Row from reference query with cache state data.""" """Row from reference query with cache state data."""
@ -867,11 +827,6 @@ def delete_orphaned_seed_asset(session: Session, asset_id: str) -> bool:
return False return False
# =============================================================================
# Enrichment operations
# =============================================================================
class UnenrichedReferenceRow(NamedTuple): class UnenrichedReferenceRow(NamedTuple):
"""Row for references needing enrichment.""" """Row for references needing enrichment."""
@ -968,11 +923,6 @@ def bulk_update_enrichment_level(
return result.rowcount return result.rowcount
# =============================================================================
# Bulk operations
# =============================================================================
def bulk_insert_references_ignore_conflicts( def bulk_insert_references_ignore_conflicts(
session: Session, session: Session,
rows: list[dict], rows: list[dict],

View File

@ -249,6 +249,8 @@ class AssetSeeder:
def resume(self) -> bool: def resume(self) -> bool:
"""Resume a paused scan. """Resume a paused scan.
This is a noop if the scan is not in the PAUSED state
Returns: Returns:
True if resumed, False if not paused True if resumed, False if not paused
""" """

View File

@ -2,23 +2,10 @@ import asyncio
import os import os
from typing import IO from typing import IO
from blake3 import blake3
DEFAULT_CHUNK = 8 * 1024 * 1024 DEFAULT_CHUNK = 8 * 1024 * 1024
_blake3 = None
def _get_blake3():
global _blake3
if _blake3 is None:
try:
from blake3 import blake3 as _b3
_blake3 = _b3
except ImportError:
raise ImportError(
"blake3 is required for asset hashing. Install with: pip install blake3"
)
return _blake3
def compute_blake3_hash( def compute_blake3_hash(
fp: str | IO[bytes], fp: str | IO[bytes],
@ -55,7 +42,7 @@ def _hash_file_obj(file_obj: IO, chunk_size: int = DEFAULT_CHUNK) -> str:
if orig_pos != 0: if orig_pos != 0:
file_obj.seek(0) file_obj.seek(0)
h = _get_blake3()() h = blake3()
while True: while True:
chunk = file_obj.read(chunk_size) chunk = file_obj.read(chunk_size)
if not chunk: if not chunk:

View File

@ -13,6 +13,7 @@ from app.assets.database.queries import (
add_tags_to_reference, add_tags_to_reference,
fetch_reference_and_asset, fetch_reference_and_asset,
get_asset_by_hash, get_asset_by_hash,
get_reference_by_file_path,
get_reference_tags, get_reference_tags,
get_or_create_reference, get_or_create_reference,
remove_missing_tag_for_asset_id, remove_missing_tag_for_asset_id,
@ -83,7 +84,6 @@ def _ingest_file_from_path(
) )
# Get the reference we just created/updated # Get the reference we just created/updated
from app.assets.database.queries import get_reference_by_file_path
ref = get_reference_by_file_path(session, locator) ref = get_reference_by_file_path(session, locator)
if ref: if ref:
reference_id = ref.id reference_id = ref.id

View File

@ -3,7 +3,8 @@ from pathlib import Path
from typing import Literal from typing import Literal
import folder_paths import folder_paths
from app.assets.helpers import normalize_tags from app.assets.database.queries import list_references_by_asset_id
from app.assets.helpers import normalize_tags, select_best_live_path
def get_comfy_models_folders() -> list[tuple[str, list[str]]]: def get_comfy_models_folders() -> list[tuple[str, list[str]]]:
@ -157,9 +158,6 @@ def compute_filename_for_reference(session, ref) -> str | None:
def compute_filename_for_asset(session, asset_id: str) -> str | None: def compute_filename_for_asset(session, asset_id: str) -> str | None:
"""Compute the relative filename for an asset from its best live reference path.""" """Compute the relative filename for an asset from its best live reference path."""
from app.assets.database.queries import list_references_by_asset_id
from app.assets.helpers import select_best_live_path
primary_path = select_best_live_path( primary_path = select_best_live_path(
list_references_by_asset_id(session, asset_id=asset_id) list_references_by_asset_id(session, asset_id=asset_id)
) )

View File

@ -259,10 +259,10 @@ def prompt_worker(q, server_instance):
extra_data[k] = sensitive[k] extra_data[k] = sensitive[k]
asset_seeder.pause() asset_seeder.pause()
try:
e.execute(item[2], prompt_id, extra_data, item[4]) e.execute(item[2], prompt_id, extra_data, item[4])
finally:
asset_seeder.resume() asset_seeder.resume()
need_gc = True need_gc = True
remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] remove_sensitive = lambda prompt: prompt[:5] + prompt[6:]

View File

@ -1,14 +1,27 @@
"""Helper functions for assets integration tests.""" """Helper functions for assets integration tests."""
import time
import requests import requests
def trigger_sync_seed_assets(session: requests.Session, base_url: str) -> None: def trigger_sync_seed_assets(session: requests.Session, base_url: str) -> None:
"""Force a synchronous sync/seed pass by calling the seed endpoint with wait=true.""" """Force a synchronous sync/seed pass by calling the seed endpoint with wait=true.
session.post(
base_url + "/api/assets/seed?wait=true", Retries on 409 (already running) until the previous scan finishes.
json={"roots": ["models", "input", "output"]}, """
timeout=60, deadline = time.monotonic() + 60
) while True:
r = session.post(
base_url + "/api/assets/seed?wait=true",
json={"roots": ["models", "input", "output"]},
timeout=60,
)
if r.status_code != 409:
assert r.status_code == 200, f"seed endpoint returned {r.status_code}: {r.text}"
return
if time.monotonic() > deadline:
raise TimeoutError("seed endpoint stuck in 409 (already running)")
time.sleep(0.25)
def get_asset_filename(asset_hash: str, extension: str) -> str: def get_asset_filename(asset_hash: str, extension: str) -> str:

View File

@ -2,7 +2,6 @@
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
import pytest
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetReference from app.assets.database.models import Asset, AssetReference