feat: non-destructive asset pruning with is_missing flag

- Add is_missing column to AssetCacheState for soft-delete
- Replace hard-delete pruning with mark_cache_states_missing_outside_prefixes
- Auto-restore missing cache states when files are re-scanned
- Filter out missing cache states from queries by default
- Rename functions for clarity:
  - mark_cache_states_missing_outside_prefixes (was delete_cache_states_outside_prefixes)
  - get_unreferenced_unhashed_asset_ids (was get_orphaned_seed_asset_ids)
  - mark_assets_missing_outside_prefixes (was prune_orphaned_assets)
  - mark_missing_outside_prefixes_safely (was prune_orphans_safely)
- Add restore_cache_states_by_paths for explicit restoration
- Add cleanup_unreferenced_assets for explicit hard-delete when needed
- Update API endpoint /api/assets/prune to use new soft-delete behavior

This preserves user metadata (tags, etc.) when base directories change,
allowing assets to be restored when the original paths become available again.

Amp-Thread-ID: https://ampcode.com/threads/T-019c3114-bf28-73a9-a4d2-85b208fd5462
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-05 21:21:46 -08:00
parent b4f5bb2faa
commit a51bbd0b25
11 changed files with 282 additions and 103 deletions

View File

@ -0,0 +1,37 @@
"""
Add is_missing column to asset_cache_state for non-destructive soft-delete
Revision ID: 0002_add_is_missing
Revises: 0001_assets
Create Date: 2025-02-05 00:00:00
"""
from alembic import op
import sqlalchemy as sa
revision = "0002_add_is_missing"
down_revision = "0001_assets"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"asset_cache_state",
sa.Column(
"is_missing",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
),
)
op.create_index(
"ix_asset_cache_state_is_missing",
"asset_cache_state",
["is_missing"],
)
def downgrade() -> None:
op.drop_index("ix_asset_cache_state_is_missing", table_name="asset_cache_state")
op.drop_column("asset_cache_state", "is_missing")

View File

@ -698,20 +698,21 @@ async def cancel_seed(request: web.Request) -> web.Response:
@ROUTES.post("/api/assets/prune") @ROUTES.post("/api/assets/prune")
async def prune_orphans(request: web.Request) -> web.Response: async def mark_missing_assets(request: web.Request) -> web.Response:
"""Prune orphaned assets that no longer exist on the filesystem. """Mark assets as missing when their cache states point to files outside all known root prefixes.
This removes assets whose cache states point to files outside all known This is a non-destructive soft-delete operation. Assets and their metadata
root prefixes (models, input, output). are preserved, but cache states are flagged as missing. They can be restored
if the file reappears in a future scan.
Returns: Returns:
200 OK with count of pruned assets 200 OK with count of marked assets
409 Conflict if a scan is currently running 409 Conflict if a scan is currently running
""" """
pruned = asset_seeder.prune_orphans() marked = asset_seeder.mark_missing_outside_prefixes()
if pruned == 0 and asset_seeder.get_status().state.value != "IDLE": if marked == 0 and asset_seeder.get_status().state.value != "IDLE":
return web.json_response( return web.json_response(
{"status": "scan_running", "pruned": 0}, {"status": "scan_running", "marked": 0},
status=409, status=409,
) )
return web.json_response({"status": "completed", "pruned": pruned}, status=200) return web.json_response({"status": "completed", "marked": marked}, status=200)

View File

@ -83,12 +83,14 @@ class AssetCacheState(Base):
file_path: Mapped[str] = mapped_column(Text, nullable=False) file_path: Mapped[str] = mapped_column(Text, nullable=False)
mtime_ns: Mapped[int | None] = mapped_column(BigInteger, nullable=True) mtime_ns: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
needs_verify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) needs_verify: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_missing: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
asset: Mapped[Asset] = relationship(back_populates="cache_states") asset: Mapped[Asset] = relationship(back_populates="cache_states")
__table_args__ = ( __table_args__ = (
Index("ix_asset_cache_state_file_path", "file_path"), Index("ix_asset_cache_state_file_path", "file_path"),
Index("ix_asset_cache_state_asset_id", "asset_id"), Index("ix_asset_cache_state_asset_id", "asset_id"),
Index("ix_asset_cache_state_is_missing", "is_missing"),
CheckConstraint( CheckConstraint(
"(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg" "(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg"
), ),

View File

@ -28,12 +28,13 @@ from app.assets.database.queries.cache_state import (
bulk_set_needs_verify, bulk_set_needs_verify,
delete_assets_by_ids, delete_assets_by_ids,
delete_cache_states_by_ids, delete_cache_states_by_ids,
delete_cache_states_outside_prefixes,
delete_orphaned_seed_asset, delete_orphaned_seed_asset,
get_cache_states_by_paths_and_asset_ids, get_cache_states_by_paths_and_asset_ids,
get_cache_states_for_prefixes, get_cache_states_for_prefixes,
get_orphaned_seed_asset_ids, get_unreferenced_unhashed_asset_ids,
list_cache_states_by_asset_id, list_cache_states_by_asset_id,
mark_cache_states_missing_outside_prefixes,
restore_cache_states_by_paths,
upsert_cache_state, upsert_cache_state,
) )
from app.assets.database.queries.tags import ( from app.assets.database.queries.tags import (
@ -68,7 +69,6 @@ __all__ = [
"delete_asset_info_by_id", "delete_asset_info_by_id",
"delete_assets_by_ids", "delete_assets_by_ids",
"delete_cache_states_by_ids", "delete_cache_states_by_ids",
"delete_cache_states_outside_prefixes",
"delete_orphaned_seed_asset", "delete_orphaned_seed_asset",
"ensure_tags_exist", "ensure_tags_exist",
"fetch_asset_info_and_asset", "fetch_asset_info_and_asset",
@ -80,13 +80,15 @@ __all__ = [
"get_cache_states_by_paths_and_asset_ids", "get_cache_states_by_paths_and_asset_ids",
"get_cache_states_for_prefixes", "get_cache_states_for_prefixes",
"get_or_create_asset_info", "get_or_create_asset_info",
"get_orphaned_seed_asset_ids", "get_unreferenced_unhashed_asset_ids",
"insert_asset_info", "insert_asset_info",
"list_asset_infos_page", "list_asset_infos_page",
"list_cache_states_by_asset_id", "list_cache_states_by_asset_id",
"list_tags_with_usage", "list_tags_with_usage",
"mark_cache_states_missing_outside_prefixes",
"remove_missing_tag_for_asset_id", "remove_missing_tag_for_asset_id",
"remove_tags_from_asset_info", "remove_tags_from_asset_info",
"restore_cache_states_by_paths",
"set_asset_info_metadata", "set_asset_info_metadata",
"set_asset_info_preview", "set_asset_info_preview",
"set_asset_info_tags", "set_asset_info_tags",

View File

@ -49,11 +49,15 @@ def upsert_cache_state(
file_path: str, file_path: str,
mtime_ns: int, mtime_ns: int,
) -> tuple[bool, bool]: ) -> tuple[bool, bool]:
"""Upsert a cache state by file_path. Returns (created, updated).""" """Upsert a cache state by file_path. Returns (created, updated).
Also restores cache states that were previously marked as missing.
"""
vals = { vals = {
"asset_id": asset_id, "asset_id": asset_id,
"file_path": file_path, "file_path": file_path,
"mtime_ns": int(mtime_ns), "mtime_ns": int(mtime_ns),
"is_missing": False,
} }
ins = ( ins = (
sqlite.insert(AssetCacheState) sqlite.insert(AssetCacheState)
@ -74,26 +78,30 @@ def upsert_cache_state(
AssetCacheState.asset_id != asset_id, AssetCacheState.asset_id != asset_id,
AssetCacheState.mtime_ns.is_(None), AssetCacheState.mtime_ns.is_(None),
AssetCacheState.mtime_ns != int(mtime_ns), AssetCacheState.mtime_ns != int(mtime_ns),
AssetCacheState.is_missing == True, # noqa: E712
) )
) )
.values(asset_id=asset_id, mtime_ns=int(mtime_ns)) .values(asset_id=asset_id, mtime_ns=int(mtime_ns), is_missing=False)
) )
res2 = session.execute(upd) res2 = session.execute(upd)
updated = int(res2.rowcount or 0) > 0 updated = int(res2.rowcount or 0) > 0
return False, updated return False, updated
def delete_cache_states_outside_prefixes( def mark_cache_states_missing_outside_prefixes(
session: Session, valid_prefixes: list[str] session: Session, valid_prefixes: list[str]
) -> int: ) -> int:
"""Delete cache states with file_path not matching any of the valid prefixes. """Mark cache states as missing when file_path doesn't match any valid prefix.
This is a non-destructive soft-delete that preserves user metadata.
Cache states can be restored if the file reappears in a future scan.
Args: Args:
session: Database session session: Database session
valid_prefixes: List of absolute directory prefixes that are valid valid_prefixes: List of absolute directory prefixes that are valid
Returns: Returns:
Number of cache states deleted Number of cache states marked as missing
""" """
if not valid_prefixes: if not valid_prefixes:
return 0 return 0
@ -104,22 +112,59 @@ def delete_cache_states_outside_prefixes(
return AssetCacheState.file_path.like(escaped + "%", escape=esc) return AssetCacheState.file_path.like(escaped + "%", escape=esc)
matches_valid_prefix = sa.or_(*[make_prefix_condition(p) for p in valid_prefixes]) matches_valid_prefix = sa.or_(*[make_prefix_condition(p) for p in valid_prefixes])
result = session.execute(sa.delete(AssetCacheState).where(~matches_valid_prefix)) result = session.execute(
sa.update(AssetCacheState)
.where(~matches_valid_prefix)
.where(AssetCacheState.is_missing == False) # noqa: E712
.values(is_missing=True)
)
return result.rowcount return result.rowcount
def get_orphaned_seed_asset_ids(session: Session) -> list[str]: def restore_cache_states_by_paths(session: Session, file_paths: list[str]) -> int:
"""Get IDs of seed assets (hash is None) with no remaining cache states. """Restore cache states that were previously marked as missing.
Called when a file path is re-scanned and found to exist.
Args:
session: Database session
file_paths: List of file paths that exist and should be restored
Returns: Returns:
List of asset IDs that are orphaned Number of cache states restored
""" """
orphan_subq = ( if not file_paths:
sa.select(Asset.id) return 0
.outerjoin(AssetCacheState, AssetCacheState.asset_id == Asset.id)
.where(Asset.hash.is_(None), AssetCacheState.id.is_(None)) result = session.execute(
sa.update(AssetCacheState)
.where(AssetCacheState.file_path.in_(file_paths))
.where(AssetCacheState.is_missing == True) # noqa: E712
.values(is_missing=False)
) )
return [row[0] for row in session.execute(orphan_subq).all()] return result.rowcount
def get_unreferenced_unhashed_asset_ids(session: Session) -> list[str]:
"""Get IDs of unhashed assets (hash=None) with no active cache states.
An asset is considered unreferenced if it has no cache states,
or all its cache states are marked as missing.
Returns:
List of asset IDs that are unreferenced
"""
active_cache_state_exists = (
sa.select(sa.literal(1))
.where(AssetCacheState.asset_id == Asset.id)
.where(AssetCacheState.is_missing == False) # noqa: E712
.correlate(Asset)
.exists()
)
unreferenced_subq = sa.select(Asset.id).where(
Asset.hash.is_(None), ~active_cache_state_exists
)
return [row[0] for row in session.execute(unreferenced_subq).all()]
def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int: def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
@ -142,12 +187,15 @@ def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
def get_cache_states_for_prefixes( def get_cache_states_for_prefixes(
session: Session, session: Session,
prefixes: list[str], prefixes: list[str],
*,
include_missing: bool = False,
) -> list[CacheStateRow]: ) -> list[CacheStateRow]:
"""Get all cache states with paths matching any of the given prefixes. """Get all cache states with paths matching any of the given prefixes.
Args: Args:
session: Database session session: Database session
prefixes: List of absolute directory prefixes to match prefixes: List of absolute directory prefixes to match
include_missing: If False (default), exclude cache states marked as missing
Returns: Returns:
List of cache state rows with joined asset data, ordered by asset_id, state_id List of cache state rows with joined asset data, ordered by asset_id, state_id
@ -163,7 +211,7 @@ def get_cache_states_for_prefixes(
escaped, esc = escape_sql_like_string(base) escaped, esc = escape_sql_like_string(base)
conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc)) conds.append(AssetCacheState.file_path.like(escaped + "%", escape=esc))
rows = session.execute( query = (
sa.select( sa.select(
AssetCacheState.id, AssetCacheState.id,
AssetCacheState.file_path, AssetCacheState.file_path,
@ -175,7 +223,13 @@ def get_cache_states_for_prefixes(
) )
.join(Asset, Asset.id == AssetCacheState.asset_id) .join(Asset, Asset.id == AssetCacheState.asset_id)
.where(sa.or_(*conds)) .where(sa.or_(*conds))
.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc()) )
if not include_missing:
query = query.where(AssetCacheState.is_missing == False) # noqa: E712
rows = session.execute(
query.order_by(AssetCacheState.asset_id.asc(), AssetCacheState.id.asc())
).all() ).all()
return [ return [
@ -240,13 +294,15 @@ def bulk_insert_cache_states_ignore_conflicts(
"""Bulk insert cache state rows with ON CONFLICT DO NOTHING on file_path. """Bulk insert cache state rows with ON CONFLICT DO NOTHING on file_path.
Each dict should have: asset_id, file_path, mtime_ns Each dict should have: asset_id, file_path, mtime_ns
The is_missing field is automatically set to False for new inserts.
""" """
if not rows: if not rows:
return return
enriched_rows = [{**row, "is_missing": False} for row in rows]
ins = sqlite.insert(AssetCacheState).on_conflict_do_nothing( ins = sqlite.insert(AssetCacheState).on_conflict_do_nothing(
index_elements=[AssetCacheState.file_path] index_elements=[AssetCacheState.file_path]
) )
for chunk in iter_chunks(rows, calculate_rows_per_statement(3)): for chunk in iter_chunks(enriched_rows, calculate_rows_per_statement(4)):
session.execute(ins, chunk) session.execute(ins, chunk)

View File

@ -17,7 +17,7 @@ from app.assets.database.queries import (
from app.assets.services.bulk_ingest import ( from app.assets.services.bulk_ingest import (
SeedAssetSpec, SeedAssetSpec,
batch_insert_seed_assets, batch_insert_seed_assets,
prune_orphaned_assets, mark_assets_missing_outside_prefixes,
) )
from app.assets.services.file_utils import ( from app.assets.services.file_utils import (
get_mtime_ns, get_mtime_ns,
@ -221,18 +221,18 @@ def sync_root_safely(root: RootType) -> set[str]:
return set() return set()
def prune_orphans_safely(prefixes: list[str]) -> int: def mark_missing_outside_prefixes_safely(prefixes: list[str]) -> int:
"""Prune orphaned assets outside the given prefixes. """Mark cache states as missing when outside the given prefixes.
Returns count pruned or 0 on failure. This is a non-destructive soft-delete. Returns count marked or 0 on failure.
""" """
try: try:
with create_session() as sess: with create_session() as sess:
count = prune_orphaned_assets(sess, prefixes) count = mark_assets_missing_outside_prefixes(sess, prefixes)
sess.commit() sess.commit()
return count return count
except Exception as e: except Exception as e:
logging.exception("orphan pruning failed: %s", e) logging.exception("marking missing assets failed: %s", e)
return 0 return 0
@ -319,7 +319,7 @@ def insert_asset_specs(specs: list[SeedAssetSpec], tag_pool: set[str]) -> int:
def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None: def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> None:
"""Scan the given roots and seed the assets into the database. """Scan the given roots and seed the assets into the database.
Note: This function does not prune orphaned assets. Call prune_orphaned_assets Note: This function does not mark missing assets. Call mark_missing_outside_prefixes_safely
separately if cleanup is needed. separately if cleanup is needed.
""" """
if not dependencies_available(): if not dependencies_available():

View File

@ -15,7 +15,7 @@ from app.assets.scanner import (
get_all_known_prefixes, get_all_known_prefixes,
get_prefixes_for_root, get_prefixes_for_root,
insert_asset_specs, insert_asset_specs,
prune_orphans_safely, mark_missing_outside_prefixes_safely,
sync_root_safely, sync_root_safely,
) )
from app.database.db import dependencies_available from app.database.db import dependencies_available
@ -174,35 +174,41 @@ class AssetSeeder:
with self._lock: with self._lock:
self._thread = None self._thread = None
def prune_orphans(self) -> int: def mark_missing_outside_prefixes(self) -> int:
"""Prune orphaned assets that are outside all known root prefixes. """Mark cache states as missing when outside all known root prefixes.
This is a non-destructive soft-delete operation. Assets and their
metadata are preserved, but cache states are flagged as missing.
They can be restored if the file reappears in a future scan.
This operation is decoupled from scanning to prevent partial scans This operation is decoupled from scanning to prevent partial scans
from accidentally deleting assets belonging to other roots. from accidentally marking assets belonging to other roots.
Should be called explicitly when cleanup is desired, typically after Should be called explicitly when cleanup is desired, typically after
a full scan of all roots or during maintenance. a full scan of all roots or during maintenance.
Returns: Returns:
Number of orphaned assets pruned, or 0 if dependencies unavailable Number of cache states marked as missing, or 0 if dependencies
or a scan is currently running unavailable or a scan is currently running
""" """
with self._lock: with self._lock:
if self._state != State.IDLE: if self._state != State.IDLE:
logging.warning("Cannot prune orphans while scan is running") logging.warning(
"Cannot mark missing assets while scan is running"
)
return 0 return 0
if not dependencies_available(): if not dependencies_available():
logging.warning( logging.warning(
"Database dependencies not available, skipping orphan pruning" "Database dependencies not available, skipping mark missing"
) )
return 0 return 0
all_prefixes = get_all_known_prefixes() all_prefixes = get_all_known_prefixes()
pruned = prune_orphans_safely(all_prefixes) marked = mark_missing_outside_prefixes_safely(all_prefixes)
if pruned > 0: if marked > 0:
logging.info("Pruned %d orphaned assets", pruned) logging.info("Marked %d cache states as missing", marked)
return pruned return marked
def _is_cancelled(self) -> bool: def _is_cancelled(self) -> bool:
"""Check if cancellation has been requested.""" """Check if cancellation has been requested."""
@ -290,9 +296,9 @@ class AssetSeeder:
if self._prune_first: if self._prune_first:
all_prefixes = get_all_known_prefixes() all_prefixes = get_all_known_prefixes()
pruned = prune_orphans_safely(all_prefixes) marked = mark_missing_outside_prefixes_safely(all_prefixes)
if pruned > 0: if marked > 0:
logging.info("Pruned %d orphaned assets before scan", pruned) logging.info("Marked %d cache states as missing before scan", marked)
if self._is_cancelled(): if self._is_cancelled():
logging.info("Asset scan cancelled after pruning phase") logging.info("Asset scan cancelled after pruning phase")

View File

@ -11,7 +11,8 @@ from app.assets.services.asset_management import (
from app.assets.services.bulk_ingest import ( from app.assets.services.bulk_ingest import (
BulkInsertResult, BulkInsertResult,
batch_insert_seed_assets, batch_insert_seed_assets,
prune_orphaned_assets, cleanup_unreferenced_assets,
mark_assets_missing_outside_prefixes,
) )
from app.assets.services.file_utils import ( from app.assets.services.file_utils import (
get_mtime_ns, get_mtime_ns,
@ -77,7 +78,8 @@ __all__ = [
"list_assets_page", "list_assets_page",
"list_files_recursively", "list_files_recursively",
"list_tags", "list_tags",
"prune_orphaned_assets", "cleanup_unreferenced_assets",
"mark_assets_missing_outside_prefixes",
"remove_tags", "remove_tags",
"resolve_asset_for_download", "resolve_asset_for_download",
"set_asset_preview", "set_asset_preview",

View File

@ -28,10 +28,10 @@ from app.assets.database.queries import (
bulk_insert_cache_states_ignore_conflicts, bulk_insert_cache_states_ignore_conflicts,
bulk_insert_tags_and_meta, bulk_insert_tags_and_meta,
delete_assets_by_ids, delete_assets_by_ids,
delete_cache_states_outside_prefixes,
get_asset_info_ids_by_ids, get_asset_info_ids_by_ids,
get_cache_states_by_paths_and_asset_ids, get_cache_states_by_paths_and_asset_ids,
get_orphaned_seed_asset_ids, get_unreferenced_unhashed_asset_ids,
mark_cache_states_missing_outside_prefixes,
) )
from app.assets.helpers import get_utc_now from app.assets.helpers import get_utc_now
@ -210,16 +210,37 @@ def batch_insert_seed_assets(
) )
def prune_orphaned_assets(session: Session, valid_prefixes: list[str]) -> int: def mark_assets_missing_outside_prefixes(
"""Prune cache states outside valid prefixes, then delete orphaned seed assets. session: Session, valid_prefixes: list[str]
) -> int:
"""Mark cache states as missing when outside valid prefixes.
This is a non-destructive operation that soft-deletes cache states
by setting is_missing=True. User metadata is preserved and assets
can be restored if the file reappears in a future scan.
Note: This does NOT delete
unreferenced unhashed assets. Those are preserved so user metadata
remains intact even when base directories change.
Args: Args:
session: Database session session: Database session
valid_prefixes: List of absolute directory prefixes that are valid valid_prefixes: List of absolute directory prefixes that are valid
Returns: Returns:
Number of orphaned assets deleted Number of cache states marked as missing
""" """
delete_cache_states_outside_prefixes(session, valid_prefixes) return mark_cache_states_missing_outside_prefixes(session, valid_prefixes)
orphan_ids = get_orphaned_seed_asset_ids(session)
return delete_assets_by_ids(session, orphan_ids)
def cleanup_unreferenced_assets(session: Session) -> int:
"""Hard-delete unhashed assets with no active cache states.
This is a destructive operation intended for explicit cleanup.
Only deletes assets where hash=None and all cache states are missing.
Returns:
Number of assets deleted
"""
unreferenced_ids = get_unreferenced_unhashed_asset_ids(session)
return delete_assets_by_ids(session, unreferenced_ids)

View File

@ -6,8 +6,7 @@ from app.assets.database.models import Asset, AssetCacheState, AssetInfo
from app.assets.database.queries import ( from app.assets.database.queries import (
list_cache_states_by_asset_id, list_cache_states_by_asset_id,
upsert_cache_state, upsert_cache_state,
delete_cache_states_outside_prefixes, get_unreferenced_unhashed_asset_ids,
get_orphaned_seed_asset_ids,
delete_assets_by_ids, delete_assets_by_ids,
get_cache_states_for_prefixes, get_cache_states_for_prefixes,
bulk_set_needs_verify, bulk_set_needs_verify,
@ -15,6 +14,8 @@ from app.assets.database.queries import (
delete_orphaned_seed_asset, delete_orphaned_seed_asset,
bulk_insert_cache_states_ignore_conflicts, bulk_insert_cache_states_ignore_conflicts,
get_cache_states_by_paths_and_asset_ids, get_cache_states_by_paths_and_asset_ids,
mark_cache_states_missing_outside_prefixes,
restore_cache_states_by_paths,
) )
from app.assets.helpers import select_best_live_path, get_utc_now from app.assets.helpers import select_best_live_path, get_utc_now
@ -168,9 +169,51 @@ class TestUpsertCacheState:
state = session.query(AssetCacheState).filter_by(file_path=file_path).one() state = session.query(AssetCacheState).filter_by(file_path=file_path).one()
assert state.mtime_ns == final_mtime assert state.mtime_ns == final_mtime
def test_upsert_restores_missing_state(self, session: Session):
"""Upserting a cache state that was marked missing should restore it."""
asset = _make_asset(session, "hash1")
file_path = "/restored/file.bin"
class TestDeleteCacheStatesOutsidePrefixes: state = _make_cache_state(session, asset, file_path, mtime_ns=100)
def test_deletes_states_outside_prefixes(self, session: Session, tmp_path): state.is_missing = True
session.commit()
created, updated = upsert_cache_state(
session, asset_id=asset.id, file_path=file_path, mtime_ns=100
)
session.commit()
assert created is False
assert updated is True
restored_state = session.query(AssetCacheState).filter_by(file_path=file_path).one()
assert restored_state.is_missing is False
class TestRestoreCacheStatesByPaths:
def test_restores_missing_states(self, session: Session):
asset = _make_asset(session, "hash1")
missing_path = "/missing/file.bin"
active_path = "/active/file.bin"
missing_state = _make_cache_state(session, asset, missing_path)
missing_state.is_missing = True
_make_cache_state(session, asset, active_path)
session.commit()
restored = restore_cache_states_by_paths(session, [missing_path])
session.commit()
assert restored == 1
state = session.query(AssetCacheState).filter_by(file_path=missing_path).one()
assert state.is_missing is False
def test_empty_list_restores_nothing(self, session: Session):
restored = restore_cache_states_by_paths(session, [])
assert restored == 0
class TestMarkCacheStatesMissingOutsidePrefixes:
def test_marks_states_missing_outside_prefixes(self, session: Session, tmp_path):
asset = _make_asset(session, "hash1") asset = _make_asset(session, "hash1")
valid_dir = tmp_path / "valid" valid_dir = tmp_path / "valid"
valid_dir.mkdir() valid_dir.mkdir()
@ -184,39 +227,48 @@ class TestDeleteCacheStatesOutsidePrefixes:
_make_cache_state(session, asset, invalid_path) _make_cache_state(session, asset, invalid_path)
session.commit() session.commit()
deleted = delete_cache_states_outside_prefixes(session, [str(valid_dir)]) marked = mark_cache_states_missing_outside_prefixes(session, [str(valid_dir)])
session.commit() session.commit()
assert deleted == 1 assert marked == 1
remaining = session.query(AssetCacheState).all() all_states = session.query(AssetCacheState).all()
assert len(remaining) == 1 assert len(all_states) == 2
assert remaining[0].file_path == valid_path
def test_empty_prefixes_deletes_nothing(self, session: Session): valid_state = next(s for s in all_states if s.file_path == valid_path)
invalid_state = next(s for s in all_states if s.file_path == invalid_path)
assert valid_state.is_missing is False
assert invalid_state.is_missing is True
def test_empty_prefixes_marks_nothing(self, session: Session):
asset = _make_asset(session, "hash1") asset = _make_asset(session, "hash1")
_make_cache_state(session, asset, "/some/path.bin") _make_cache_state(session, asset, "/some/path.bin")
session.commit() session.commit()
deleted = delete_cache_states_outside_prefixes(session, []) marked = mark_cache_states_missing_outside_prefixes(session, [])
assert deleted == 0 assert marked == 0
class TestGetOrphanedSeedAssetIds: class TestGetUnreferencedUnhashedAssetIds:
def test_returns_orphaned_seed_assets(self, session: Session): def test_returns_unreferenced_unhashed_assets(self, session: Session):
# Seed asset (hash=None) with no cache states # Unhashed asset (hash=None) with no cache states
orphan = _make_asset(session, hash_val=None) no_states = _make_asset(session, hash_val=None)
# Seed asset with cache state (not orphaned) # Unhashed asset with active cache state (not unreferenced)
with_state = _make_asset(session, hash_val=None) with_active_state = _make_asset(session, hash_val=None)
_make_cache_state(session, with_state, "/has/state.bin") _make_cache_state(session, with_active_state, "/has/state.bin")
# Unhashed asset with only missing cache state (should be unreferenced)
with_missing_state = _make_asset(session, hash_val=None)
missing_state = _make_cache_state(session, with_missing_state, "/missing/state.bin")
missing_state.is_missing = True
# Regular asset (hash not None) - should not be returned # Regular asset (hash not None) - should not be returned
_make_asset(session, hash_val="blake3:regular") _make_asset(session, hash_val="blake3:regular")
session.commit() session.commit()
orphaned = get_orphaned_seed_asset_ids(session) unreferenced = get_unreferenced_unhashed_asset_ids(session)
assert orphan.id in orphaned assert no_states.id in unreferenced
assert with_state.id not in orphaned assert with_missing_state.id in unreferenced
assert with_active_state.id not in unreferenced
class TestDeleteAssetsByIds: class TestDeleteAssetsByIds:

View File

@ -349,10 +349,10 @@ class TestSeederThreadSafety:
) )
class TestSeederPruneOrphans: class TestSeederMarkMissing:
"""Test prune_orphans behavior.""" """Test mark_missing_outside_prefixes behavior."""
def test_prune_orphans_when_idle(self, fresh_seeder: AssetSeeder): def test_mark_missing_when_idle(self, fresh_seeder: AssetSeeder):
with ( with (
patch("app.assets.seeder.dependencies_available", return_value=True), patch("app.assets.seeder.dependencies_available", return_value=True),
patch( patch(
@ -360,14 +360,14 @@ class TestSeederPruneOrphans:
return_value=["/models", "/input", "/output"], return_value=["/models", "/input", "/output"],
), ),
patch( patch(
"app.assets.seeder.prune_orphans_safely", return_value=5 "app.assets.seeder.mark_missing_outside_prefixes_safely", return_value=5
) as mock_prune, ) as mock_mark,
): ):
result = fresh_seeder.prune_orphans() result = fresh_seeder.mark_missing_outside_prefixes()
assert result == 5 assert result == 5
mock_prune.assert_called_once_with(["/models", "/input", "/output"]) mock_mark.assert_called_once_with(["/models", "/input", "/output"])
def test_prune_orphans_returns_zero_when_running( def test_mark_missing_returns_zero_when_running(
self, fresh_seeder: AssetSeeder, mock_dependencies self, fresh_seeder: AssetSeeder, mock_dependencies
): ):
barrier = threading.Event() barrier = threading.Event()
@ -382,35 +382,35 @@ class TestSeederPruneOrphans:
fresh_seeder.start(roots=("models",)) fresh_seeder.start(roots=("models",))
time.sleep(0.05) time.sleep(0.05)
result = fresh_seeder.prune_orphans() result = fresh_seeder.mark_missing_outside_prefixes()
assert result == 0 assert result == 0
barrier.set() barrier.set()
def test_prune_orphans_returns_zero_when_dependencies_unavailable( def test_mark_missing_returns_zero_when_dependencies_unavailable(
self, fresh_seeder: AssetSeeder self, fresh_seeder: AssetSeeder
): ):
with patch("app.assets.seeder.dependencies_available", return_value=False): with patch("app.assets.seeder.dependencies_available", return_value=False):
result = fresh_seeder.prune_orphans() result = fresh_seeder.mark_missing_outside_prefixes()
assert result == 0 assert result == 0
def test_prune_first_flag_triggers_pruning_before_scan( def test_prune_first_flag_triggers_mark_missing_before_scan(
self, fresh_seeder: AssetSeeder self, fresh_seeder: AssetSeeder
): ):
prune_call_order = [] call_order = []
def track_prune(prefixes): def track_mark(prefixes):
prune_call_order.append("prune") call_order.append("mark_missing")
return 3 return 3
def track_sync(root): def track_sync(root):
prune_call_order.append(f"sync_{root}") call_order.append(f"sync_{root}")
return set() return set()
with ( with (
patch("app.assets.seeder.dependencies_available", return_value=True), patch("app.assets.seeder.dependencies_available", return_value=True),
patch("app.assets.seeder.get_all_known_prefixes", return_value=["/models"]), patch("app.assets.seeder.get_all_known_prefixes", return_value=["/models"]),
patch("app.assets.seeder.prune_orphans_safely", side_effect=track_prune), patch("app.assets.seeder.mark_missing_outside_prefixes_safely", side_effect=track_mark),
patch("app.assets.seeder.sync_root_safely", side_effect=track_sync), patch("app.assets.seeder.sync_root_safely", side_effect=track_sync),
patch("app.assets.seeder.collect_paths_for_roots", return_value=[]), patch("app.assets.seeder.collect_paths_for_roots", return_value=[]),
patch("app.assets.seeder.build_asset_specs", return_value=([], set(), 0)), patch("app.assets.seeder.build_asset_specs", return_value=([], set(), 0)),
@ -419,5 +419,5 @@ class TestSeederPruneOrphans:
fresh_seeder.start(roots=("models",), prune_first=True) fresh_seeder.start(roots=("models",), prune_first=True)
fresh_seeder.wait(timeout=5.0) fresh_seeder.wait(timeout=5.0)
assert prune_call_order[0] == "prune" assert call_order[0] == "mark_missing"
assert "sync_models" in prune_call_order assert "sync_models" in call_order