refactor: flatten nested try blocks and if statements in assets package

Extract helper functions to eliminate nested try-except blocks in scanner.py
and remove duplicated type-checking logic in asset_info.py. Simplify nested
conditionals in asset_management.py for clearer control flow.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-03 14:28:16 -08:00
parent fef2f01671
commit ed60e93696
3 changed files with 71 additions and 71 deletions

View File

@ -31,57 +31,41 @@ def check_is_scalar(v):
return False return False
def convert_metadata_to_rows(key: str, value): def _scalar_to_row(key: str, ordinal: int, value) -> dict:
"""Convert a scalar value to a typed projection row."""
if value is None:
return {
"key": key, "ordinal": ordinal,
"val_str": None, "val_num": None, "val_bool": None, "val_json": None
}
if isinstance(value, bool):
return {"key": key, "ordinal": ordinal, "val_bool": bool(value)}
if isinstance(value, (int, float, Decimal)):
num = value if isinstance(value, Decimal) else Decimal(str(value))
return {"key": key, "ordinal": ordinal, "val_num": num}
if isinstance(value, str):
return {"key": key, "ordinal": ordinal, "val_str": value}
return {"key": key, "ordinal": ordinal, "val_json": value}
def convert_metadata_to_rows(key: str, value) -> list[dict]:
""" """
Turn a metadata key/value into typed projection rows. Turn a metadata key/value into typed projection rows.
Returns list[dict] with keys: Returns list[dict] with keys:
key, ordinal, and one of val_str / val_num / val_bool / val_json (others None) key, ordinal, and one of val_str / val_num / val_bool / val_json (others None)
""" """
rows: list[dict] = []
def _null_row(ordinal: int) -> dict:
return {
"key": key, "ordinal": ordinal,
"val_str": None, "val_num": None, "val_bool": None, "val_json": None
}
if value is None: if value is None:
rows.append(_null_row(0)) return [_scalar_to_row(key, 0, None)]
return rows
if check_is_scalar(value): if check_is_scalar(value):
if isinstance(value, bool): return [_scalar_to_row(key, 0, value)]
rows.append({"key": key, "ordinal": 0, "val_bool": bool(value)})
elif isinstance(value, (int, float, Decimal)):
num = value if isinstance(value, Decimal) else Decimal(str(value))
rows.append({"key": key, "ordinal": 0, "val_num": num})
elif isinstance(value, str):
rows.append({"key": key, "ordinal": 0, "val_str": value})
else:
rows.append({"key": key, "ordinal": 0, "val_json": value})
return rows
if isinstance(value, list): if isinstance(value, list):
if all(check_is_scalar(x) for x in value): if all(check_is_scalar(x) for x in value):
for i, x in enumerate(value): return [_scalar_to_row(key, i, x) for i, x in enumerate(value)]
if x is None: return [{"key": key, "ordinal": i, "val_json": x} for i, x in enumerate(value)]
rows.append(_null_row(i))
elif isinstance(x, bool):
rows.append({"key": key, "ordinal": i, "val_bool": bool(x)})
elif isinstance(x, (int, float, Decimal)):
num = x if isinstance(x, Decimal) else Decimal(str(x))
rows.append({"key": key, "ordinal": i, "val_num": num})
elif isinstance(x, str):
rows.append({"key": key, "ordinal": i, "val_str": x})
else:
rows.append({"key": key, "ordinal": i, "val_json": x})
return rows
for i, x in enumerate(value):
rows.append({"key": key, "ordinal": i, "val_json": x})
return rows
rows.append({"key": key, "ordinal": 0, "val_json": value}) return [{"key": key, "ordinal": 0, "val_json": value}]
return rows
MAX_BIND_PARAMS = 800 MAX_BIND_PARAMS = 800

View File

@ -79,24 +79,22 @@ def update_asset_metadata(
# Compute filename from best live path # Compute filename from best live path
computed_filename = _compute_filename_for_asset(session, info.asset_id) computed_filename = _compute_filename_for_asset(session, info.asset_id)
# Determine if metadata needs updating
new_meta: dict | None = None
if user_metadata is not None: if user_metadata is not None:
new_meta = dict(user_metadata) new_meta = dict(user_metadata)
elif computed_filename:
current_meta = info.user_metadata or {}
if current_meta.get("filename") != computed_filename:
new_meta = dict(current_meta)
if new_meta is not None:
if computed_filename: if computed_filename:
new_meta["filename"] = computed_filename new_meta["filename"] = computed_filename
set_asset_info_metadata( set_asset_info_metadata(
session, asset_info_id=asset_info_id, user_metadata=new_meta session, asset_info_id=asset_info_id, user_metadata=new_meta
) )
touched = True touched = True
else:
if computed_filename:
current_meta = info.user_metadata or {}
if current_meta.get("filename") != computed_filename:
new_meta = dict(current_meta)
new_meta["filename"] = computed_filename
set_asset_info_metadata(
session, asset_info_id=asset_info_id, user_metadata=new_meta
)
touched = True
if tags is not None: if tags is not None:
set_asset_info_tags( set_asset_info_tags(

View File

@ -367,6 +367,41 @@ def sync_cache_states_with_filesystem(
return survivors if collect_existing_paths else None return survivors if collect_existing_paths else None
def _sync_root_safely(root: RootType) -> set[str]:
"""Sync a single root's cache states with the filesystem.
Returns survivors (existing paths) or empty set on failure.
"""
try:
with create_session() as sess:
survivors = sync_cache_states_with_filesystem(
sess,
root,
collect_existing_paths=True,
update_missing_tags=True,
)
sess.commit()
return survivors or set()
except Exception as e:
logging.exception("fast DB scan failed for %s: %s", root, e)
return set()
def _prune_orphans_safely(prefixes: list[str]) -> int:
"""Prune orphaned assets outside the given prefixes.
Returns count pruned or 0 on failure.
"""
try:
with create_session() as sess:
count = prune_orphaned_assets(sess, prefixes)
sess.commit()
return count
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
return 0
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None: def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
"""Scan the given roots and seed the assets into the database.""" """Scan the given roots and seed the assets into the database."""
if not dependencies_available(): if not dependencies_available():
@ -383,29 +418,12 @@ def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> No
try: try:
existing_paths: set[str] = set() existing_paths: set[str] = set()
for r in roots: for r in roots:
try: existing_paths.update(_sync_root_safely(r))
with create_session() as sess:
survivors = sync_cache_states_with_filesystem(
sess,
r,
collect_existing_paths=True,
update_missing_tags=True,
)
sess.commit()
if survivors:
existing_paths.update(survivors)
except Exception as e:
logging.exception("fast DB scan failed for %s: %s", r, e)
try: all_prefixes = [
with create_session() as sess: os.path.abspath(p) for r in roots for p in get_prefixes_for_root(r)
all_prefixes = [ ]
os.path.abspath(p) for r in roots for p in get_prefixes_for_root(r) orphans_pruned = _prune_orphans_safely(all_prefixes)
]
orphans_pruned = prune_orphaned_assets(sess, all_prefixes)
sess.commit()
except Exception as e:
logging.exception("orphan pruning failed: %s", e)
if "models" in roots: if "models" in roots:
paths.extend(collect_models_files()) paths.extend(collect_models_files())