refactor(bulk_ingest): improve variable naming and add typed dicts

- Rename shorthand variables to explicit names (sp -> spec, aid -> asset_id, etc.)
- Move imports to top of file
- Add TypedDict definitions for AssetRow, CacheStateRow, AssetInfoRow, TagRow, MetadataRow
- Replace bare dict types with typed alternatives

Amp-Thread-ID: https://ampcode.com/threads/T-019c316d-13f7-77f8-b92b-ea7276c3e09c
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-05 21:32:01 -08:00
parent 82885b40b9
commit 58edc6fc6b

View File

@ -3,10 +3,24 @@ from __future__ import annotations
import os import os
import uuid import uuid
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, TypedDict from typing import TYPE_CHECKING, Any, TypedDict
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.assets.database.queries import (
bulk_insert_asset_infos_ignore_conflicts,
bulk_insert_assets,
bulk_insert_cache_states_ignore_conflicts,
bulk_insert_tags_and_meta,
delete_assets_by_ids,
get_asset_info_ids_by_ids,
get_cache_states_by_paths_and_asset_ids,
get_unreferenced_unhashed_asset_ids,
mark_cache_states_missing_outside_prefixes,
)
from app.assets.helpers import get_utc_now
if TYPE_CHECKING: if TYPE_CHECKING:
from app.assets.services.metadata_extract import ExtractedMetadata from app.assets.services.metadata_extract import ExtractedMetadata
@ -22,18 +36,75 @@ class SeedAssetSpec(TypedDict):
fname: str fname: str
metadata: ExtractedMetadata | None metadata: ExtractedMetadata | None
from app.assets.database.queries import (
bulk_insert_asset_infos_ignore_conflicts, class AssetRow(TypedDict):
bulk_insert_assets, """Row data for inserting an Asset."""
bulk_insert_cache_states_ignore_conflicts,
bulk_insert_tags_and_meta, id: str
delete_assets_by_ids, hash: str | None
get_asset_info_ids_by_ids, size_bytes: int
get_cache_states_by_paths_and_asset_ids, mime_type: str | None
get_unreferenced_unhashed_asset_ids, created_at: datetime
mark_cache_states_missing_outside_prefixes,
)
from app.assets.helpers import get_utc_now class CacheStateRow(TypedDict):
"""Row data for inserting a CacheState."""
asset_id: str
file_path: str
mtime_ns: int
class AssetInfoRow(TypedDict):
"""Row data for inserting an AssetInfo."""
id: str
owner_id: str
name: str
asset_id: str
preview_id: str | None
user_metadata: dict[str, Any] | None
created_at: datetime
updated_at: datetime
last_access_time: datetime
class AssetInfoRowInternal(TypedDict):
"""Internal row data for AssetInfo with extra tracking fields."""
id: str
owner_id: str
name: str
asset_id: str
preview_id: str | None
user_metadata: dict[str, Any] | None
created_at: datetime
updated_at: datetime
last_access_time: datetime
_tags: list[str]
_filename: str
_extracted_metadata: ExtractedMetadata | None
class TagRow(TypedDict):
"""Row data for inserting a Tag."""
asset_info_id: str
tag_name: str
origin: str
added_at: datetime
class MetadataRow(TypedDict):
"""Row data for inserting asset metadata."""
asset_info_id: str
key: str
ordinal: int
val_str: str | None
val_num: float | None
val_bool: bool | None
val_json: dict[str, Any] | None
@dataclass @dataclass
@ -74,139 +145,141 @@ def batch_insert_seed_assets(
if not specs: if not specs:
return BulkInsertResult(inserted_infos=0, won_states=0, lost_states=0) return BulkInsertResult(inserted_infos=0, won_states=0, lost_states=0)
now = get_utc_now() current_time = get_utc_now()
asset_rows: list[dict] = [] asset_rows: list[AssetRow] = []
state_rows: list[dict] = [] cache_state_rows: list[CacheStateRow] = []
path_to_asset: dict[str, str] = {} path_to_asset_id: dict[str, str] = {}
asset_to_info: dict[str, dict] = {} asset_id_to_info: dict[str, AssetInfoRowInternal] = {}
path_list: list[str] = [] absolute_path_list: list[str] = []
for sp in specs: for spec in specs:
ap = os.path.abspath(sp["abs_path"]) absolute_path = os.path.abspath(spec["abs_path"])
aid = str(uuid.uuid4()) asset_id = str(uuid.uuid4())
iid = str(uuid.uuid4()) asset_info_id = str(uuid.uuid4())
path_list.append(ap) absolute_path_list.append(absolute_path)
path_to_asset[ap] = aid path_to_asset_id[absolute_path] = asset_id
asset_rows.append( asset_rows.append(
{ {
"id": aid, "id": asset_id,
"hash": None, "hash": None,
"size_bytes": sp["size_bytes"], "size_bytes": spec["size_bytes"],
"mime_type": None, "mime_type": None,
"created_at": now, "created_at": current_time,
} }
) )
state_rows.append( cache_state_rows.append(
{ {
"asset_id": aid, "asset_id": asset_id,
"file_path": ap, "file_path": absolute_path,
"mtime_ns": sp["mtime_ns"], "mtime_ns": spec["mtime_ns"],
} }
) )
# Build user_metadata from extracted metadata or fallback to filename # Build user_metadata from extracted metadata or fallback to filename
extracted = sp.get("metadata") extracted_metadata = spec.get("metadata")
if extracted: if extracted_metadata:
user_metadata: dict[str, Any] | None = extracted.to_user_metadata() user_metadata: dict[str, Any] | None = extracted_metadata.to_user_metadata()
elif sp["fname"]: elif spec["fname"]:
user_metadata = {"filename": sp["fname"]} user_metadata = {"filename": spec["fname"]}
else: else:
user_metadata = None user_metadata = None
asset_to_info[aid] = { asset_id_to_info[asset_id] = {
"id": iid, "id": asset_info_id,
"owner_id": owner_id, "owner_id": owner_id,
"name": sp["info_name"], "name": spec["info_name"],
"asset_id": aid, "asset_id": asset_id,
"preview_id": None, "preview_id": None,
"user_metadata": user_metadata, "user_metadata": user_metadata,
"created_at": now, "created_at": current_time,
"updated_at": now, "updated_at": current_time,
"last_access_time": now, "last_access_time": current_time,
"_tags": sp["tags"], "_tags": spec["tags"],
"_filename": sp["fname"], "_filename": spec["fname"],
"_extracted_metadata": extracted, "_extracted_metadata": extracted_metadata,
} }
bulk_insert_assets(session, asset_rows) bulk_insert_assets(session, asset_rows)
bulk_insert_cache_states_ignore_conflicts(session, state_rows) bulk_insert_cache_states_ignore_conflicts(session, cache_state_rows)
winners_by_path = get_cache_states_by_paths_and_asset_ids(session, path_to_asset) winning_paths = get_cache_states_by_paths_and_asset_ids(session, path_to_asset_id)
all_paths_set = set(path_list) all_paths_set = set(absolute_path_list)
losers_by_path = all_paths_set - winners_by_path losing_paths = all_paths_set - winning_paths
lost_assets = [path_to_asset[p] for p in losers_by_path] lost_asset_ids = [path_to_asset_id[path] for path in losing_paths]
if lost_assets: if lost_asset_ids:
delete_assets_by_ids(session, lost_assets) delete_assets_by_ids(session, lost_asset_ids)
if not winners_by_path: if not winning_paths:
return BulkInsertResult( return BulkInsertResult(
inserted_infos=0, inserted_infos=0,
won_states=0, won_states=0,
lost_states=len(losers_by_path), lost_states=len(losing_paths),
) )
winner_info_rows = [asset_to_info[path_to_asset[p]] for p in winners_by_path] winner_info_rows = [
db_info_rows = [ asset_id_to_info[path_to_asset_id[path]] for path in winning_paths
{
"id": row["id"],
"owner_id": row["owner_id"],
"name": row["name"],
"asset_id": row["asset_id"],
"preview_id": row["preview_id"],
"user_metadata": row["user_metadata"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"last_access_time": row["last_access_time"],
}
for row in winner_info_rows
] ]
bulk_insert_asset_infos_ignore_conflicts(session, db_info_rows) database_info_rows: list[AssetInfoRow] = [
{
"id": info_row["id"],
"owner_id": info_row["owner_id"],
"name": info_row["name"],
"asset_id": info_row["asset_id"],
"preview_id": info_row["preview_id"],
"user_metadata": info_row["user_metadata"],
"created_at": info_row["created_at"],
"updated_at": info_row["updated_at"],
"last_access_time": info_row["last_access_time"],
}
for info_row in winner_info_rows
]
bulk_insert_asset_infos_ignore_conflicts(session, database_info_rows)
all_info_ids = [row["id"] for row in winner_info_rows] all_info_ids = [info_row["id"] for info_row in winner_info_rows]
inserted_info_ids = get_asset_info_ids_by_ids(session, all_info_ids) inserted_info_ids = get_asset_info_ids_by_ids(session, all_info_ids)
tag_rows: list[dict] = [] tag_rows: list[TagRow] = []
meta_rows: list[dict] = [] metadata_rows: list[MetadataRow] = []
if inserted_info_ids: if inserted_info_ids:
for row in winner_info_rows: for info_row in winner_info_rows:
iid = row["id"] info_id = info_row["id"]
if iid not in inserted_info_ids: if info_id not in inserted_info_ids:
continue continue
for t in row["_tags"]: for tag in info_row["_tags"]:
tag_rows.append( tag_rows.append(
{ {
"asset_info_id": iid, "asset_info_id": info_id,
"tag_name": t, "tag_name": tag,
"origin": "automatic", "origin": "automatic",
"added_at": now, "added_at": current_time,
} }
) )
# Use extracted metadata for meta rows if available # Use extracted metadata for meta rows if available
extracted = row.get("_extracted_metadata") extracted_metadata = info_row.get("_extracted_metadata")
if extracted: if extracted_metadata:
meta_rows.extend(extracted.to_meta_rows(iid)) metadata_rows.extend(extracted_metadata.to_meta_rows(info_id))
elif row["_filename"]: elif info_row["_filename"]:
# Fallback: just store filename # Fallback: just store filename
meta_rows.append( metadata_rows.append(
{ {
"asset_info_id": iid, "asset_info_id": info_id,
"key": "filename", "key": "filename",
"ordinal": 0, "ordinal": 0,
"val_str": row["_filename"], "val_str": info_row["_filename"],
"val_num": None, "val_num": None,
"val_bool": None, "val_bool": None,
"val_json": None, "val_json": None,
} }
) )
bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=meta_rows) bulk_insert_tags_and_meta(session, tag_rows=tag_rows, meta_rows=metadata_rows)
return BulkInsertResult( return BulkInsertResult(
inserted_infos=len(inserted_info_ids), inserted_infos=len(inserted_info_ids),
won_states=len(winners_by_path), won_states=len(winning_paths),
lost_states=len(losers_by_path), lost_states=len(losing_paths),
) )