diff --git a/alembic_db/versions/0001_assets.py b/alembic_db/versions/0001_assets.py index f3b3ee0bf..bc98b5acf 100644 --- a/alembic_db/versions/0001_assets.py +++ b/alembic_db/versions/0001_assets.py @@ -99,21 +99,6 @@ def upgrade() -> None: op.create_index("ix_asset_info_meta_key_val_num", "asset_info_meta", ["key", "val_num"]) op.create_index("ix_asset_info_meta_key_val_bool", "asset_info_meta", ["key", "val_bool"]) - # ASSET_LOCATIONS: remote locations per asset - op.create_table( - "asset_locations", - sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), - sa.Column("asset_hash", sa.String(length=256), sa.ForeignKey("assets.hash", ondelete="CASCADE"), nullable=False), - sa.Column("provider", sa.String(length=32), nullable=False), # e.g., "gcs" - sa.Column("locator", sa.Text(), nullable=False), # e.g., "gs://bucket/path/to/blob" - sa.Column("expected_size_bytes", sa.BigInteger(), nullable=True), - sa.Column("etag", sa.String(length=256), nullable=True), - sa.Column("last_modified", sa.String(length=128), nullable=True), - sa.UniqueConstraint("asset_hash", "provider", "locator", name="uq_asset_locations_triplet"), - ) - op.create_index("ix_asset_locations_hash", "asset_locations", ["asset_hash"]) - op.create_index("ix_asset_locations_provider", "asset_locations", ["provider"]) - # Tags vocabulary for models tags_table = sa.table( "tags", @@ -158,10 +143,6 @@ def upgrade() -> None: def downgrade() -> None: - op.drop_index("ix_asset_locations_provider", table_name="asset_locations") - op.drop_index("ix_asset_locations_hash", table_name="asset_locations") - op.drop_table("asset_locations") - op.drop_index("ix_asset_info_meta_key_val_bool", table_name="asset_info_meta") op.drop_index("ix_asset_info_meta_key_val_num", table_name="asset_info_meta") op.drop_index("ix_asset_info_meta_key_val_str", table_name="asset_info_meta") diff --git a/app/assets_fetcher.py b/app/assets_fetcher.py deleted file mode 100644 index 36fa64ca9..000000000 --- a/app/assets_fetcher.py +++ /dev/null @@ -1,137 +0,0 @@ -from __future__ import annotations -import asyncio -import os -import tempfile -from typing import Optional -import mimetypes -import aiohttp - -from .storage.hashing import blake3_hash_sync -from .database.db import create_session -from .database.services import ingest_fs_asset, list_cache_states_by_asset_hash -from .resolvers import resolve_asset -from ._assets_helpers import resolve_destination_from_tags, ensure_within_base - -_FETCH_LOCKS: dict[str, asyncio.Lock] = {} - - -def _sanitize_filename(name: str) -> str: - return os.path.basename((name or "").strip()) or "file" - - -async def ensure_asset_cached( - asset_hash: str, - *, - preferred_name: Optional[str] = None, - tags_hint: Optional[list[str]] = None, -) -> str: - """ - Ensure there is a verified local file for asset_hash in the correct Comfy folder. - - Fast path: - - If any cache_state row has a file_path that exists, return it immediately. - Preference order is the oldest ID first for stability. - - Slow path: - - Resolve remote location + placement tags. - - Download to the correct folder, verify hash, move into place. - - Ingest identity + cache state so future fast passes can skip hashing. - """ - lock = _FETCH_LOCKS.setdefault(asset_hash, asyncio.Lock()) - async with lock: - # 1) If we already have any cache_state path present on disk, use it (oldest-first) - async with await create_session() as sess: - states = await list_cache_states_by_asset_hash(sess, asset_hash=asset_hash) - for s in states: - if s and s.file_path and os.path.isfile(s.file_path): - return s.file_path - - # 2) Resolve remote location + placement hints (must include valid tags) - res = await resolve_asset(asset_hash) - if not res: - raise FileNotFoundError(f"No resolver/locations for {asset_hash}") - - placement_tags = tags_hint or res.tags - if not placement_tags: - raise ValueError(f"Resolver did not provide placement tags for {asset_hash}") - - name_hint = res.filename or preferred_name or asset_hash.replace(":", "_") - safe_name = _sanitize_filename(name_hint) - - # 3) Map tags -> destination (strict: raises if invalid root or models category) - base_dir, subdirs = resolve_destination_from_tags(placement_tags) # may raise - dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir - os.makedirs(dest_dir, exist_ok=True) - - final_path = os.path.abspath(os.path.join(dest_dir, safe_name)) - ensure_within_base(final_path, base_dir) - - # 4) If target path exists, try to reuse; else delete invalid cache - if os.path.exists(final_path) and os.path.isfile(final_path): - existing_digest = blake3_hash_sync(final_path) - if f"blake3:{existing_digest}" == asset_hash: - size_bytes = os.path.getsize(final_path) - mtime_ns = getattr(os.stat(final_path), "st_mtime_ns", int(os.path.getmtime(final_path) * 1_000_000_000)) - async with await create_session() as sess: - await ingest_fs_asset( - sess, - asset_hash=asset_hash, - abs_path=final_path, - size_bytes=size_bytes, - mtime_ns=mtime_ns, - mime_type=None, - info_name=None, - tags=(), - ) - await sess.commit() - return final_path - else: - # Invalid cache: remove before re-downloading - os.remove(final_path) - - # 5) Download to temp next to destination - timeout = aiohttp.ClientTimeout(total=60 * 30) - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.get(res.download_url, headers=dict(res.headers)) as resp: - resp.raise_for_status() - cl = resp.headers.get("Content-Length") - if res.expected_size and cl and int(cl) != int(res.expected_size): - raise ValueError("server Content-Length does not match expected size") - with tempfile.NamedTemporaryFile("wb", delete=False, dir=dest_dir) as tmp: - tmp_path = tmp.name - async for chunk in resp.content.iter_chunked(8 * 1024 * 1024): - if chunk: - tmp.write(chunk) - - # 6) Verify content hash - digest = blake3_hash_sync(tmp_path) - canonical = f"blake3:{digest}" - if canonical != asset_hash: - try: - os.remove(tmp_path) - finally: - raise ValueError(f"Hash mismatch: expected {asset_hash}, got {canonical}") - - # 7) Atomically move into place - if os.path.exists(final_path): - os.remove(final_path) - os.replace(tmp_path, final_path) - - # 8) Record identity + cache state (+ mime type) - size_bytes = os.path.getsize(final_path) - mtime_ns = getattr(os.stat(final_path), "st_mtime_ns", int(os.path.getmtime(final_path) * 1_000_000_000)) - mime_type = mimetypes.guess_type(safe_name, strict=False)[0] - async with await create_session() as sess: - await ingest_fs_asset( - sess, - asset_hash=asset_hash, - abs_path=final_path, - size_bytes=size_bytes, - mtime_ns=mtime_ns, - mime_type=mime_type, - info_name=None, - tags=(), - ) - await sess.commit() - - return final_path diff --git a/app/assets_manager.py b/app/assets_manager.py index a2a73773a..423a860e0 100644 --- a/app/assets_manager.py +++ b/app/assets_manager.py @@ -36,7 +36,6 @@ from ._assets_helpers import ( ensure_within_base, resolve_destination_from_tags, ) -from .assets_fetcher import ensure_asset_cached async def asset_exists(*, asset_hash: str) -> bool: @@ -180,9 +179,9 @@ async def resolve_asset_content_for_download( """ Returns (abs_path, content_type, download_name) for the given AssetInfo id and touches last_access_time. Also touches last_access_time (only_if_newer). - Ensures the local cache is present (uses resolver if needed). Raises: ValueError if AssetInfo cannot be found + FileNotFoundError if file for Asset cannot be found """ async with await create_session() as session: pair = await fetch_asset_info_and_asset(session, asset_info_id=asset_info_id, owner_id=owner_id) @@ -190,13 +189,15 @@ async def resolve_asset_content_for_download( raise ValueError(f"AssetInfo {asset_info_id} not found") info, asset = pair - tag_names = await get_asset_tags(session, asset_info_id=info.id) + states = await list_cache_states_by_asset_hash(session, asset_hash=info.asset_hash) + abs_path = "" + for s in states: + if s and s.file_path and os.path.isfile(s.file_path): + abs_path = s.file_path + break + if not abs_path: + raise FileNotFoundError - # Ensure cached (download if missing) - preferred_name = info.name or info.asset_hash.split(":", 1)[-1] - abs_path = await ensure_asset_cached(info.asset_hash, preferred_name=preferred_name, tags_hint=tag_names) - - async with await create_session() as session: await touch_asset_info_by_id(session, asset_info_id=asset_info_id) await session.commit() diff --git a/app/resolvers/__init__.py b/app/resolvers/__init__.py deleted file mode 100644 index c489ebad7..000000000 --- a/app/resolvers/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -import contextlib -from dataclasses import dataclass -from typing import Protocol, Optional, Mapping - - -@dataclass -class ResolveResult: - provider: str # e.g., "gcs" - download_url: str # fully-qualified URL to fetch bytes - headers: Mapping[str, str] # optional auth headers etc - expected_size: Optional[int] = None - tags: Optional[list[str]] = None # e.g. ["models","vae","subdir"] - filename: Optional[str] = None # preferred basename - -class AssetResolver(Protocol): - provider: str - async def resolve(self, asset_hash: str) -> Optional[ResolveResult]: ... - - -_REGISTRY: list[AssetResolver] = [] - - -def register_resolver(resolver: AssetResolver) -> None: - """Append Resolver with simple de-dup per provider.""" - global _REGISTRY - _REGISTRY = [r for r in _REGISTRY if r.provider != resolver.provider] + [resolver] - - -async def resolve_asset(asset_hash: str) -> Optional[ResolveResult]: - for r in _REGISTRY: - with contextlib.suppress(Exception): # For Resolver failure we just try the next one - res = await r.resolve(asset_hash) - if res: - return res - return None