Update existing asset references on overwritten output files

ingest_existing_file now detects when a reference already exists for the
path and updates mtime_ns, job_id, size_bytes, resets enrichment_level
and clears the asset hash so the enricher re-hashes the new content.

Only brand-new paths fall through to batch_insert_seed_assets.
register_output_files only increments its counter on actual insert or
update.

Amp-Thread-ID: https://ampcode.com/threads/T-019cfdf4-c52c-771c-a920-57bac15c68be
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr 2026-03-17 15:45:09 -07:00
parent f9c6076619
commit 05f0052c73

View File

@ -23,7 +23,7 @@ from app.assets.database.queries import (
upsert_reference,
validate_tags_exist,
)
from app.assets.helpers import normalize_tags
from app.assets.helpers import get_utc_now, normalize_tags
from app.assets.services.bulk_ingest import batch_insert_seed_assets
from app.assets.services.file_utils import get_size_and_mtime_ns
from app.assets.services.path_utils import (
@ -145,8 +145,8 @@ def register_output_files(
if not os.path.isfile(abs_path):
continue
try:
ingest_existing_file(abs_path, user_metadata=user_metadata, job_id=job_id)
registered += 1
if ingest_existing_file(abs_path, user_metadata=user_metadata, job_id=job_id):
registered += 1
except Exception:
logging.exception("Failed to register output: %s", abs_path)
return registered
@ -158,33 +158,59 @@ def ingest_existing_file(
extra_tags: Sequence[str] = (),
owner_id: str = "",
job_id: str | None = None,
) -> None:
) -> bool:
"""Register an existing on-disk file as an asset stub.
Inserts a stub record (hash=NULL) for immediate UX visibility.
The caller is responsible for triggering background enrichment
(hash computation, metadata extraction) via the asset seeder.
If a reference already exists for this path, updates mtime_ns, job_id,
size_bytes, and resets enrichment so the enricher will re-hash it.
For brand-new paths, inserts a stub record (hash=NULL) for immediate
UX visibility.
Returns True if a row was inserted or updated, False otherwise.
"""
locator = os.path.abspath(abs_path)
size_bytes, mtime_ns = get_size_and_mtime_ns(abs_path)
mime_type = mimetypes.guess_type(abs_path, strict=False)[0]
name, path_tags = get_name_and_tags_from_asset_path(abs_path)
tags = list(dict.fromkeys(path_tags + list(extra_tags)))
spec = {
"abs_path": abs_path,
"size_bytes": size_bytes,
"mtime_ns": mtime_ns,
"info_name": name,
"tags": tags,
"fname": os.path.basename(abs_path),
"metadata": user_metadata,
"hash": None,
"mime_type": mime_type,
"job_id": job_id,
}
with create_session() as session:
batch_insert_seed_assets(session, [spec], owner_id=owner_id)
existing_ref = get_reference_by_file_path(session, locator)
if existing_ref is not None:
now = get_utc_now()
existing_ref.mtime_ns = mtime_ns
existing_ref.job_id = job_id
existing_ref.is_missing = False
existing_ref.deleted_at = None
existing_ref.updated_at = now
# Reset enrichment so the enricher re-hashes
existing_ref.enrichment_level = 0
# Clear the asset hash so enrich recomputes it
asset = existing_ref.asset
if asset:
asset.hash = None
asset.size_bytes = size_bytes
if mime_type:
asset.mime_type = mime_type
session.commit()
return True
spec = {
"abs_path": abs_path,
"size_bytes": size_bytes,
"mtime_ns": mtime_ns,
"info_name": name,
"tags": tags,
"fname": os.path.basename(abs_path),
"metadata": user_metadata,
"hash": None,
"mime_type": mime_type,
"job_id": job_id,
}
result = batch_insert_seed_assets(session, [spec], owner_id=owner_id)
session.commit()
return result.won_paths > 0
def _register_existing_asset(