From b419fd8399c5eb5be5f65dc6d041e794aa27e09d Mon Sep 17 00:00:00 2001 From: Talmaj Marinc Date: Tue, 30 Jun 2026 18:07:11 +0200 Subject: [PATCH] Redact urls in logging and fix concurrent enqueue issue that could corrupt the downloaded files. --- app/model_downloader/engine/job.py | 6 +- app/model_downloader/manager.py | 99 +++++++++++++++++++++--------- 2 files changed, 72 insertions(+), 33 deletions(-) diff --git a/app/model_downloader/engine/job.py b/app/model_downloader/engine/job.py index 0b1519ce9..969d16388 100644 --- a/app/model_downloader/engine/job.py +++ b/app/model_downloader/engine/job.py @@ -27,7 +27,7 @@ from app.model_downloader.engine.planner import ( plan_segments, ) from app.model_downloader.engine.writer import FileWriter -from app.model_downloader.net.http import open_validated +from app.model_downloader.net.http import open_validated, redact_url from app.model_downloader.net.probe import probe from app.model_downloader.verify import checksum, dedup, structural @@ -192,7 +192,7 @@ class DownloadJob: if not pr.ok: if pr.gated: raise FatalError( - f"{self.spec.url} requires authentication. Add an API key for " + f"{redact_url(self.spec.url)} requires authentication. Add an API key for " f"this host at /api/download/credentials and retry." ) if pr.status == 0 or pr.status in _RETRYABLE_STATUSES: @@ -413,7 +413,7 @@ class DownloadJob: def _raise_for_status(self, status: int) -> None: if status in (401, 403): raise FatalError( - f"{self.spec.url} returned {status}; add/update an API key for " + f"{redact_url(self.spec.url)} returned {status}; add/update an API key for " f"this host at /api/download/credentials." ) if status in _RETRYABLE_STATUSES: diff --git a/app/model_downloader/manager.py b/app/model_downloader/manager.py index 206701b37..d1ff21e16 100644 --- a/app/model_downloader/manager.py +++ b/app/model_downloader/manager.py @@ -15,6 +15,7 @@ from app.model_downloader.constants import DownloadStatus from app.model_downloader.database import queries from app.model_downloader.scheduler import SCHEDULER from app.model_downloader.security import paths +from app.model_downloader.net.http import redact_url from app.model_downloader.security.allowlist import is_url_allowed from app.model_downloader.security.paths import InvalidModelId @@ -41,6 +42,13 @@ class DownloadManager: def __init__(self) -> None: self._scheduler = SCHEDULER self._notify_cb: Optional[Callable[[str], None]] = None + # Serializes the "check for a live download, then write" critical section + # per model_id. ``downloads`` has no uniqueness constraint on model_id + # (history rows are kept), so without this two concurrent enqueue/resume + # calls could both pass the live check and admit two jobs sharing one + # temp/dest path. The manager is a process singleton over a local SQLite + # DB, so an in-process lock is sufficient (and avoids a migration). + self._model_locks: dict[str, asyncio.Lock] = {} def set_notify(self, cb: Optional[Callable[[str], None]]) -> None: self._notify_cb = cb @@ -80,37 +88,55 @@ class DownloadManager: f"Model already exists on disk: {model_id}", status=409, ) - if await self._has_live_download(model_id): - raise DownloadError( - "ALREADY_DOWNLOADING", - f"A download for {model_id} is already in progress.", - status=409, - ) - download_id = str(uuid.uuid4()) - await asyncio.to_thread( - queries.insert_download, - { - "id": download_id, - "url": url, - "model_id": model_id, - "dest_path": dest_path, - "temp_path": temp_path, - "status": DownloadStatus.QUEUED, - "priority": priority, - "expected_sha256": expected_sha256, - "credential_id": credential_id, - "allow_any_extension": allow_any_extension, - }, - ) - logging.info("[model_downloader] enqueued %s -> %s", url, model_id) + # Hold the per-model lock across the live check and the insert so a + # concurrent enqueue/resume for the same model_id cannot interleave + # between them and create a second job against the same temp/dest path. + async with self._model_lock(model_id): + if await self._has_live_download(model_id): + raise DownloadError( + "ALREADY_DOWNLOADING", + f"A download for {model_id} is already in progress.", + status=409, + ) + await asyncio.to_thread( + queries.insert_download, + { + "id": download_id, + "url": url, + "model_id": model_id, + "dest_path": dest_path, + "temp_path": temp_path, + "status": DownloadStatus.QUEUED, + "priority": priority, + "expected_sha256": expected_sha256, + "credential_id": credential_id, + "allow_any_extension": allow_any_extension, + }, + ) + logging.info("[model_downloader] enqueued %s -> %s", redact_url(url), model_id) await self._scheduler.pump() return download_id - async def _has_live_download(self, model_id: str) -> bool: + def _model_lock(self, model_id: str) -> asyncio.Lock: + # Lazily create one lock per model_id. There is no ``await`` between the + # lookup and the insert, so under the single asyncio thread this is + # atomic and cannot hand out two different locks for the same model_id. + lock = self._model_locks.get(model_id) + if lock is None: + lock = asyncio.Lock() + self._model_locks[model_id] = lock + return lock + + async def _has_live_download( + self, model_id: str, *, exclude_id: Optional[str] = None + ) -> bool: rows = await asyncio.to_thread(queries.list_downloads) return any( - r.model_id == model_id and r.status in _LIVE_STATUSES for r in rows + r.model_id == model_id + and r.id != exclude_id + and r.status in _LIVE_STATUSES + for r in rows ) # ----- control ----- @@ -132,14 +158,27 @@ class DownloadManager: row = await asyncio.to_thread(queries.get_download, download_id) if row is None: raise DownloadError("NOT_FOUND", "No such download.", status=404) - if row.status in (DownloadStatus.PAUSED, DownloadStatus.FAILED): + if row.status not in (DownloadStatus.PAUSED, DownloadStatus.FAILED): + return + # Re-queueing a paused/failed row must respect the single-live-per-model + # invariant: another download (e.g. a newer enqueue) may already be live + # for this model_id and would share this row's temp/dest path. Hold the + # per-model lock across the check and the status flip, and exclude this + # row itself (a paused row is already a "live" status). + async with self._model_lock(row.model_id): + if await self._has_live_download(row.model_id, exclude_id=download_id): + raise DownloadError( + "ALREADY_DOWNLOADING", + f"A download for {row.model_id} is already in progress.", + status=409, + ) await asyncio.to_thread( queries.update_download, download_id, status=DownloadStatus.QUEUED, error=None, ) - await self._scheduler.pump() + await self._scheduler.pump() async def cancel(self, download_id: str) -> None: job = self._scheduler.get_job(download_id) @@ -167,7 +206,7 @@ class DownloadManager: await asyncio.to_thread( queries.update_download, download_id, priority=priority ) - # Admission-order only (PRD section 13 default); a higher priority is + # Admission-order only; a higher priority is # picked up the next time a slot frees. Pump in case a slot is free now. await self._scheduler.pump() @@ -196,7 +235,7 @@ class DownloadManager: return { "download_id": row.id, "model_id": row.model_id, - "url": row.url, + "url": redact_url(row.url), "status": row.status, "priority": row.priority, "total_bytes": total, @@ -216,7 +255,7 @@ class DownloadManager: return { "download_id": st.download_id, "model_id": st.model_id, - "url": st.url, + "url": redact_url(st.url), "status": st.status, "priority": st.priority, "total_bytes": st.total_bytes,