From fa3749ced787cf15770c3ed2a9344ca6267122ab Mon Sep 17 00:00:00 2001 From: Luke Mino-Altherr Date: Wed, 4 Feb 2026 15:32:26 -0800 Subject: [PATCH] Add TypedDict types to scanner and bulk_ingest Amp-Thread-ID: https://ampcode.com/threads/T-019c2af9-4d41-73e9-b38d-78d06bc28a3f Co-authored-by: Amp --- app/assets/scanner.py | 25 ++++++++++++++++++++----- app/assets/services/bulk_ingest.py | 14 +++++++++++++- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/app/assets/scanner.py b/app/assets/scanner.py index 5cc9096cb..9c95736d4 100644 --- a/app/assets/scanner.py +++ b/app/assets/scanner.py @@ -2,7 +2,7 @@ import contextlib import logging import os import time -from typing import Literal +from typing import Literal, TypedDict import folder_paths from app.assets.database.queries import ( @@ -15,6 +15,7 @@ from app.assets.database.queries import ( remove_missing_tag_for_asset_id, ) from app.assets.services.bulk_ingest import ( + SeedAssetSpec, batch_insert_seed_assets, prune_orphaned_assets, ) @@ -30,6 +31,20 @@ from app.assets.services.path_utils import ( ) from app.database.db import create_session, dependencies_available + +class _StateInfo(TypedDict): + sid: int + fp: str + exists: bool + fast_ok: bool + needs_verify: bool + + +class _AssetAccumulator(TypedDict): + hash: str | None + size_db: int + states: list[_StateInfo] + RootType = Literal["models", "input", "output"] @@ -96,7 +111,7 @@ def sync_cache_states_with_filesystem( rows = get_cache_states_for_prefixes(session, prefixes) - by_asset: dict[str, dict] = {} + by_asset: dict[str, _AssetAccumulator] = {} for row in rows: acc = by_asset.get(row.asset_id) if acc is None: @@ -226,9 +241,9 @@ def _collect_paths_for_roots(roots: tuple[RootType, ...]) -> list[str]: def _build_asset_specs( paths: list[str], existing_paths: set[str], -) -> tuple[list[dict], set[str], int]: +) -> tuple[list[SeedAssetSpec], set[str], int]: """Build asset specs from paths, returning (specs, tag_pool, skipped_count).""" - specs: list[dict] = [] + specs: list[SeedAssetSpec] = [] tag_pool: set[str] = set() skipped = 0 @@ -259,7 +274,7 @@ def _build_asset_specs( return specs, tag_pool, skipped -def _insert_asset_specs(specs: list[dict], tag_pool: set[str]) -> int: +def _insert_asset_specs(specs: list[SeedAssetSpec], tag_pool: set[str]) -> int: """Insert asset specs into database, returning count of created infos.""" if not specs: return 0 diff --git a/app/assets/services/bulk_ingest.py b/app/assets/services/bulk_ingest.py index d786b967f..3f4904f84 100644 --- a/app/assets/services/bulk_ingest.py +++ b/app/assets/services/bulk_ingest.py @@ -1,9 +1,21 @@ import os import uuid from dataclasses import dataclass +from typing import TypedDict from sqlalchemy.orm import Session + +class SeedAssetSpec(TypedDict): + """Spec for seeding an asset from filesystem.""" + + abs_path: str + size_bytes: int + mtime_ns: int + info_name: str + tags: list[str] + fname: str + from app.assets.database.queries import ( bulk_insert_asset_infos_ignore_conflicts, bulk_insert_assets, @@ -29,7 +41,7 @@ class BulkInsertResult: def batch_insert_seed_assets( session: Session, - specs: list[dict], + specs: list[SeedAssetSpec], owner_id: str = "", ) -> BulkInsertResult: """Seed assets from filesystem specs in batch.