From abc0b728aba2b7bf19bab57b363779af21df632e Mon Sep 17 00:00:00 2001 From: Talmaj Marinc Date: Tue, 30 Jun 2026 18:21:18 +0200 Subject: [PATCH] Fix sweep deleting FAILED partials and fix segmented resume path trusted offsets blindly. --- app/model_downloader/engine/job.py | 43 ++++++++++++++++++++++++++++-- app/model_downloader/scheduler.py | 20 ++++++++++---- 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/app/model_downloader/engine/job.py b/app/model_downloader/engine/job.py index 969d16388..93dcf269c 100644 --- a/app/model_downloader/engine/job.py +++ b/app/model_downloader/engine/job.py @@ -222,12 +222,34 @@ class DownloadJob: pr.total_bytes, pr.accept_ranges, max(1, args.download_segments) ) existing = await asyncio.to_thread(queries.list_segments, self.spec.download_id) - if ( + can_resume_segmented = ( seg_count > 1 and existing and pr.total_bytes is not None and existing[-1].end_offset == pr.total_bytes - 1 - ): + ) + if can_resume_segmented and not self._segmented_part_valid(pr.total_bytes): + # The persisted per-segment offsets describe bytes in a preallocated + # .part that is now gone or the wrong size (e.g. the partial of a + # failed download was swept on restart, or removed by a fatal + # error). Trusting them would skip already-"complete" segments and + # leave zero-filled holes. Discard the offsets and re-plan fresh. + logging.info( + "[model_downloader] %s discarding segmented resume offsets " + "(preallocated .part missing or wrong size); restarting", + self.spec.model_id, + ) + self._remove_temp() + await asyncio.to_thread( + queries.replace_segments, self.spec.download_id, [] + ) + await asyncio.to_thread( + queries.update_download, self.spec.download_id, bytes_done=0 + ) + existing = [] + can_resume_segmented = False + + if can_resume_segmented: # Resume an existing segmented plan. self.state.segments = [ SegmentRuntime(s.idx, s.start_offset, s.end_offset, s.bytes_done) @@ -537,6 +559,23 @@ class DownloadJob: logging.debug("[model_downloader] writer close error", exc_info=True) self._writer = None + def _segmented_part_valid(self, total_bytes: int) -> bool: + """True when the temp file is the preallocated segmented ``.part``. + + A segmented transfer preallocates the .part to ``total_bytes`` up front + and tracks how much of each range landed via per-segment offsets. Those + offsets are only trustworthy when the file they describe is still on + disk at its full preallocated size. A missing file (swept after a + failure, removed on a fatal error, deleted by hand) or a wrong-sized one + means the persisted offsets no longer correspond to real bytes and must + not be resumed over. Doing so would skip "complete" segments and leave + zero-filled holes that pass the size-only verification gate. + """ + try: + return os.path.getsize(self.spec.temp_path) == total_bytes + except OSError: + return False + def _contiguous_prefix_valid(self, prefix_len: int) -> bool: """True when the temp file is exactly ``prefix_len`` contiguous bytes. diff --git a/app/model_downloader/scheduler.py b/app/model_downloader/scheduler.py index 89043c2a9..098bc76ec 100644 --- a/app/model_downloader/scheduler.py +++ b/app/model_downloader/scheduler.py @@ -1,4 +1,4 @@ -"""Priority scheduler + lifecycle (PRD sections 4, 6, 12). +"""Priority scheduler + lifecycle. Owns the set of running jobs and admits queued downloads up to a global concurrency limit (K), highest priority first, FIFO within a priority. Runs @@ -25,7 +25,7 @@ from app.model_downloader.database import queries from app.model_downloader.engine.job import DownloadJob, JobSpec from app.model_downloader.security import paths -# Backoff for retryable failures (PRD section 12). +# Backoff for retryable failures _BACKOFF_BASE = 2.0 _BACKOFF_CAP = 300.0 _MAX_ATTEMPTS = 6 @@ -70,13 +70,23 @@ class Scheduler: def _sweep_orphan_temp_files() -> None: """Remove ``.part`` files not referenced by a resumable download row. - Resumable partials (queued/paused rows) are preserved; only truly - orphaned temp files from crashed runs are deleted. + Resumable partials are preserved; only truly orphaned temp files from + crashed runs are deleted. ``FAILED`` is included because + :meth:`DownloadManager.resume` explicitly permits resuming a + retry-exhausted failed row: deleting its partial here while the + per-segment offsets survive in the DB would make the next resume + preallocate a fresh sparse file, skip every "complete" segment, and + leave zero-filled holes that pass the size-only verification gate. """ live = { row.temp_path for row in queries.list_downloads() - if row.status in (DownloadStatus.QUEUED, DownloadStatus.PAUSED) + if row.status + in ( + DownloadStatus.QUEUED, + DownloadStatus.PAUSED, + DownloadStatus.FAILED, + ) } for path in paths.iter_all_tmp_paths(): if path in live: