Fix potential concurrency and db integrity error.

This commit is contained in:
Talmaj Marinc 2026-06-30 10:01:35 +02:00
parent ca1daf3c27
commit bbee25a3e6

View File

@ -11,6 +11,7 @@ import time
from typing import Optional from typing import Optional
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from app.database.db import create_session from app.database.db import create_session
from app.model_downloader.constants import DownloadStatus from app.model_downloader.constants import DownloadStatus
@ -199,30 +200,48 @@ def list_subdomain_credentials() -> list[HostCredential]:
def upsert_credential(values: dict) -> HostCredential: def upsert_credential(values: dict) -> HostCredential:
"""Insert or update a credential keyed by ``host``.""" """Insert or update a credential keyed by ``host``.
Callers can target the same host concurrently (each runs in its own
short-lived session on a separate connection), so the read-then-write here
can race: two callers both see no existing row and both attempt an insert.
The ``host`` column is uniquely indexed, so the loser's insert raises
``IntegrityError``. We recover by rolling back and retrying, at which point
the now-committed row is found and updated in place, letting concurrent
calls converge instead of failing or creating duplicates.
"""
host = values["host"] host = values["host"]
now = int(time.time()) now = int(time.time())
with create_session() as session: last_error: IntegrityError | None = None
row = ( for _ in range(2):
session.execute( with create_session() as session:
select(HostCredential).where(HostCredential.host == host).limit(1) row = (
session.execute(
select(HostCredential).where(HostCredential.host == host).limit(1)
)
.scalars()
.first()
) )
.scalars() if row is None:
.first() row = HostCredential(**values)
) row.created_at = now
if row is None: row.updated_at = now
row = HostCredential(**values) session.add(row)
row.created_at = now else:
row.updated_at = now for key, value in values.items():
session.add(row) setattr(row, key, value)
else: row.updated_at = now
for key, value in values.items(): try:
setattr(row, key, value) session.commit()
row.updated_at = now except IntegrityError as exc:
session.commit() session.rollback()
session.refresh(row) last_error = exc
session.expunge(row) continue
return row session.refresh(row)
session.expunge(row)
return row
assert last_error is not None
raise last_error
def delete_credential(credential_id: str) -> bool: def delete_credential(credential_id: str) -> bool: