Fix sweep deleting FAILED partials and fix segmented resume path trusted offsets blindly.

This commit is contained in:
Talmaj Marinc 2026-06-30 18:21:18 +02:00
parent 1bbd4a57db
commit abc0b728ab
2 changed files with 56 additions and 7 deletions

View File

@ -222,12 +222,34 @@ class DownloadJob:
pr.total_bytes, pr.accept_ranges, max(1, args.download_segments) pr.total_bytes, pr.accept_ranges, max(1, args.download_segments)
) )
existing = await asyncio.to_thread(queries.list_segments, self.spec.download_id) existing = await asyncio.to_thread(queries.list_segments, self.spec.download_id)
if ( can_resume_segmented = (
seg_count > 1 seg_count > 1
and existing and existing
and pr.total_bytes is not None and pr.total_bytes is not None
and existing[-1].end_offset == pr.total_bytes - 1 and existing[-1].end_offset == pr.total_bytes - 1
): )
if can_resume_segmented and not self._segmented_part_valid(pr.total_bytes):
# The persisted per-segment offsets describe bytes in a preallocated
# .part that is now gone or the wrong size (e.g. the partial of a
# failed download was swept on restart, or removed by a fatal
# error). Trusting them would skip already-"complete" segments and
# leave zero-filled holes. Discard the offsets and re-plan fresh.
logging.info(
"[model_downloader] %s discarding segmented resume offsets "
"(preallocated .part missing or wrong size); restarting",
self.spec.model_id,
)
self._remove_temp()
await asyncio.to_thread(
queries.replace_segments, self.spec.download_id, []
)
await asyncio.to_thread(
queries.update_download, self.spec.download_id, bytes_done=0
)
existing = []
can_resume_segmented = False
if can_resume_segmented:
# Resume an existing segmented plan. # Resume an existing segmented plan.
self.state.segments = [ self.state.segments = [
SegmentRuntime(s.idx, s.start_offset, s.end_offset, s.bytes_done) SegmentRuntime(s.idx, s.start_offset, s.end_offset, s.bytes_done)
@ -537,6 +559,23 @@ class DownloadJob:
logging.debug("[model_downloader] writer close error", exc_info=True) logging.debug("[model_downloader] writer close error", exc_info=True)
self._writer = None self._writer = None
def _segmented_part_valid(self, total_bytes: int) -> bool:
"""True when the temp file is the preallocated segmented ``.part``.
A segmented transfer preallocates the .part to ``total_bytes`` up front
and tracks how much of each range landed via per-segment offsets. Those
offsets are only trustworthy when the file they describe is still on
disk at its full preallocated size. A missing file (swept after a
failure, removed on a fatal error, deleted by hand) or a wrong-sized one
means the persisted offsets no longer correspond to real bytes and must
not be resumed over. Doing so would skip "complete" segments and leave
zero-filled holes that pass the size-only verification gate.
"""
try:
return os.path.getsize(self.spec.temp_path) == total_bytes
except OSError:
return False
def _contiguous_prefix_valid(self, prefix_len: int) -> bool: def _contiguous_prefix_valid(self, prefix_len: int) -> bool:
"""True when the temp file is exactly ``prefix_len`` contiguous bytes. """True when the temp file is exactly ``prefix_len`` contiguous bytes.

View File

@ -1,4 +1,4 @@
"""Priority scheduler + lifecycle (PRD sections 4, 6, 12). """Priority scheduler + lifecycle.
Owns the set of running jobs and admits queued downloads up to a global Owns the set of running jobs and admits queued downloads up to a global
concurrency limit (K), highest priority first, FIFO within a priority. Runs concurrency limit (K), highest priority first, FIFO within a priority. Runs
@ -25,7 +25,7 @@ from app.model_downloader.database import queries
from app.model_downloader.engine.job import DownloadJob, JobSpec from app.model_downloader.engine.job import DownloadJob, JobSpec
from app.model_downloader.security import paths from app.model_downloader.security import paths
# Backoff for retryable failures (PRD section 12). # Backoff for retryable failures
_BACKOFF_BASE = 2.0 _BACKOFF_BASE = 2.0
_BACKOFF_CAP = 300.0 _BACKOFF_CAP = 300.0
_MAX_ATTEMPTS = 6 _MAX_ATTEMPTS = 6
@ -70,13 +70,23 @@ class Scheduler:
def _sweep_orphan_temp_files() -> None: def _sweep_orphan_temp_files() -> None:
"""Remove ``.part`` files not referenced by a resumable download row. """Remove ``.part`` files not referenced by a resumable download row.
Resumable partials (queued/paused rows) are preserved; only truly Resumable partials are preserved; only truly orphaned temp files from
orphaned temp files from crashed runs are deleted. crashed runs are deleted. ``FAILED`` is included because
:meth:`DownloadManager.resume` explicitly permits resuming a
retry-exhausted failed row: deleting its partial here while the
per-segment offsets survive in the DB would make the next resume
preallocate a fresh sparse file, skip every "complete" segment, and
leave zero-filled holes that pass the size-only verification gate.
""" """
live = { live = {
row.temp_path row.temp_path
for row in queries.list_downloads() for row in queries.list_downloads()
if row.status in (DownloadStatus.QUEUED, DownloadStatus.PAUSED) if row.status
in (
DownloadStatus.QUEUED,
DownloadStatus.PAUSED,
DownloadStatus.FAILED,
)
} }
for path in paths.iter_all_tmp_paths(): for path in paths.iter_all_tmp_paths():
if path in live: if path in live: