fix(api-nodes): add separate retry budget for 429 rate limit responses (#12421)
Some checks are pending
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Waiting to run
Execution Tests / test (macos-latest) (push) Waiting to run
Execution Tests / test (ubuntu-latest) (push) Waiting to run
Execution Tests / test (windows-latest) (push) Waiting to run
Test server launches without errors / test (push) Waiting to run
Unit Tests / test (macos-latest) (push) Waiting to run
Unit Tests / test (ubuntu-latest) (push) Waiting to run
Unit Tests / test (windows-2022) (push) Waiting to run

This commit is contained in:
Alexander Piskun 2026-02-12 11:38:51 +02:00 committed by GitHub
parent 66c18522fb
commit 4a93a62371
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 177 additions and 181 deletions

View File

@ -57,6 +57,7 @@ class _RequestConfig:
files: dict[str, Any] | list[tuple[str, Any]] | None files: dict[str, Any] | list[tuple[str, Any]] | None
multipart_parser: Callable | None multipart_parser: Callable | None
max_retries: int max_retries: int
max_retries_on_rate_limit: int
retry_delay: float retry_delay: float
retry_backoff: float retry_backoff: float
wait_label: str = "Waiting" wait_label: str = "Waiting"
@ -65,6 +66,7 @@ class _RequestConfig:
final_label_on_success: str | None = "Completed" final_label_on_success: str | None = "Completed"
progress_origin_ts: float | None = None progress_origin_ts: float | None = None
price_extractor: Callable[[dict[str, Any]], float | None] | None = None price_extractor: Callable[[dict[str, Any]], float | None] | None = None
is_rate_limited: Callable[[int, Any], bool] | None = None
@dataclass @dataclass
@ -78,7 +80,7 @@ class _PollUIState:
active_since: float | None = None # start time of current active interval (None if queued) active_since: float | None = None # start time of current active interval (None if queued)
_RETRY_STATUS = {408, 429, 500, 502, 503, 504} _RETRY_STATUS = {408, 500, 502, 503, 504} # status 429 is handled separately
COMPLETED_STATUSES = ["succeeded", "succeed", "success", "completed", "finished", "done", "complete"] COMPLETED_STATUSES = ["succeeded", "succeed", "success", "completed", "finished", "done", "complete"]
FAILED_STATUSES = ["cancelled", "canceled", "canceling", "fail", "failed", "error"] FAILED_STATUSES = ["cancelled", "canceled", "canceling", "fail", "failed", "error"]
QUEUED_STATUSES = ["created", "queued", "queueing", "submitted", "initializing"] QUEUED_STATUSES = ["created", "queued", "queueing", "submitted", "initializing"]
@ -103,6 +105,8 @@ async def sync_op(
final_label_on_success: str | None = "Completed", final_label_on_success: str | None = "Completed",
progress_origin_ts: float | None = None, progress_origin_ts: float | None = None,
monitor_progress: bool = True, monitor_progress: bool = True,
max_retries_on_rate_limit: int = 16,
is_rate_limited: Callable[[int, Any], bool] | None = None,
) -> M: ) -> M:
raw = await sync_op_raw( raw = await sync_op_raw(
cls, cls,
@ -122,6 +126,8 @@ async def sync_op(
final_label_on_success=final_label_on_success, final_label_on_success=final_label_on_success,
progress_origin_ts=progress_origin_ts, progress_origin_ts=progress_origin_ts,
monitor_progress=monitor_progress, monitor_progress=monitor_progress,
max_retries_on_rate_limit=max_retries_on_rate_limit,
is_rate_limited=is_rate_limited,
) )
if not isinstance(raw, dict): if not isinstance(raw, dict):
raise Exception("Expected JSON response to validate into a Pydantic model, got non-JSON (binary or text).") raise Exception("Expected JSON response to validate into a Pydantic model, got non-JSON (binary or text).")
@ -194,6 +200,8 @@ async def sync_op_raw(
final_label_on_success: str | None = "Completed", final_label_on_success: str | None = "Completed",
progress_origin_ts: float | None = None, progress_origin_ts: float | None = None,
monitor_progress: bool = True, monitor_progress: bool = True,
max_retries_on_rate_limit: int = 16,
is_rate_limited: Callable[[int, Any], bool] | None = None,
) -> dict[str, Any] | bytes: ) -> dict[str, Any] | bytes:
""" """
Make a single network request. Make a single network request.
@ -222,6 +230,8 @@ async def sync_op_raw(
final_label_on_success=final_label_on_success, final_label_on_success=final_label_on_success,
progress_origin_ts=progress_origin_ts, progress_origin_ts=progress_origin_ts,
price_extractor=price_extractor, price_extractor=price_extractor,
max_retries_on_rate_limit=max_retries_on_rate_limit,
is_rate_limited=is_rate_limited,
) )
return await _request_base(cfg, expect_binary=as_binary) return await _request_base(cfg, expect_binary=as_binary)
@ -506,7 +516,7 @@ def _friendly_http_message(status: int, body: Any) -> str:
if status == 409: if status == 409:
return "There is a problem with your account. Please contact support@comfy.org." return "There is a problem with your account. Please contact support@comfy.org."
if status == 429: if status == 429:
return "Rate Limit Exceeded: Please try again later." return "Rate Limit Exceeded: The server returned 429 after all retry attempts. Please wait and try again."
try: try:
if isinstance(body, dict): if isinstance(body, dict):
err = body.get("error") err = body.get("error")
@ -586,6 +596,8 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
start_time = cfg.progress_origin_ts if cfg.progress_origin_ts is not None else time.monotonic() start_time = cfg.progress_origin_ts if cfg.progress_origin_ts is not None else time.monotonic()
attempt = 0 attempt = 0
delay = cfg.retry_delay delay = cfg.retry_delay
rate_limit_attempts = 0
rate_limit_delay = cfg.retry_delay
operation_succeeded: bool = False operation_succeeded: bool = False
final_elapsed_seconds: int | None = None final_elapsed_seconds: int | None = None
extracted_price: float | None = None extracted_price: float | None = None
@ -653,7 +665,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
payload_headers["Content-Type"] = "application/json" payload_headers["Content-Type"] = "application/json"
payload_kw["json"] = cfg.data or {} payload_kw["json"] = cfg.data or {}
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -662,8 +673,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
request_params=dict(params) if params else None, request_params=dict(params) if params else None,
request_data=request_body_log, request_data=request_body_log,
) )
except Exception as _log_e:
logging.debug("[DEBUG] request logging failed: %s", _log_e)
req_coro = sess.request(method, url, params=params, **payload_kw) req_coro = sess.request(method, url, params=params, **payload_kw)
req_task = asyncio.create_task(req_coro) req_task = asyncio.create_task(req_coro)
@ -688,17 +697,33 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
body = await resp.json() body = await resp.json()
except (ContentTypeError, json.JSONDecodeError): except (ContentTypeError, json.JSONDecodeError):
body = await resp.text() body = await resp.text()
if resp.status in _RETRY_STATUS and attempt <= cfg.max_retries: should_retry = False
wait_time = 0.0
retry_label = ""
is_rl = resp.status == 429 or (
cfg.is_rate_limited is not None and cfg.is_rate_limited(resp.status, body)
)
if is_rl and rate_limit_attempts < cfg.max_retries_on_rate_limit:
rate_limit_attempts += 1
wait_time = min(rate_limit_delay, 30.0)
rate_limit_delay *= cfg.retry_backoff
retry_label = f"rate-limit retry {rate_limit_attempts} of {cfg.max_retries_on_rate_limit}"
should_retry = True
elif resp.status in _RETRY_STATUS and (attempt - rate_limit_attempts) <= cfg.max_retries:
wait_time = delay
delay *= cfg.retry_backoff
retry_label = f"retry {attempt - rate_limit_attempts} of {cfg.max_retries}"
should_retry = True
if should_retry:
logging.warning( logging.warning(
"HTTP %s %s -> %s. Retrying in %.2fs (retry %d of %d).", "HTTP %s %s -> %s. Waiting %.2fs (%s).",
method, method,
url, url,
resp.status, resp.status,
delay, wait_time,
attempt, retry_label,
cfg.max_retries,
) )
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -706,23 +731,18 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
response_status_code=resp.status, response_status_code=resp.status,
response_headers=dict(resp.headers), response_headers=dict(resp.headers),
response_content=body, response_content=body,
error_message=_friendly_http_message(resp.status, body), error_message=f"HTTP {resp.status} ({retry_label}, will retry in {wait_time:.1f}s)",
) )
except Exception as _log_e:
logging.debug("[DEBUG] response logging failed: %s", _log_e)
await sleep_with_interrupt( await sleep_with_interrupt(
delay, wait_time,
cfg.node_cls, cfg.node_cls,
cfg.wait_label if cfg.monitor_progress else None, cfg.wait_label if cfg.monitor_progress else None,
start_time if cfg.monitor_progress else None, start_time if cfg.monitor_progress else None,
cfg.estimated_total, cfg.estimated_total,
display_callback=_display_time_progress if cfg.monitor_progress else None, display_callback=_display_time_progress if cfg.monitor_progress else None,
) )
delay *= cfg.retry_backoff
continue continue
msg = _friendly_http_message(resp.status, body) msg = _friendly_http_message(resp.status, body)
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -732,8 +752,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
response_content=body, response_content=body,
error_message=msg, error_message=msg,
) )
except Exception as _log_e:
logging.debug("[DEBUG] response logging failed: %s", _log_e)
raise Exception(msg) raise Exception(msg)
if expect_binary: if expect_binary:
@ -753,7 +771,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
bytes_payload = bytes(buff) bytes_payload = bytes(buff)
operation_succeeded = True operation_succeeded = True
final_elapsed_seconds = int(time.monotonic() - start_time) final_elapsed_seconds = int(time.monotonic() - start_time)
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -762,8 +779,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
response_headers=dict(resp.headers), response_headers=dict(resp.headers),
response_content=bytes_payload, response_content=bytes_payload,
) )
except Exception as _log_e:
logging.debug("[DEBUG] response logging failed: %s", _log_e)
return bytes_payload return bytes_payload
else: else:
try: try:
@ -780,7 +795,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
extracted_price = cfg.price_extractor(payload) if cfg.price_extractor else None extracted_price = cfg.price_extractor(payload) if cfg.price_extractor else None
operation_succeeded = True operation_succeeded = True
final_elapsed_seconds = int(time.monotonic() - start_time) final_elapsed_seconds = int(time.monotonic() - start_time)
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -789,25 +803,22 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
response_headers=dict(resp.headers), response_headers=dict(resp.headers),
response_content=response_content_to_log, response_content=response_content_to_log,
) )
except Exception as _log_e:
logging.debug("[DEBUG] response logging failed: %s", _log_e)
return payload return payload
except ProcessingInterrupted: except ProcessingInterrupted:
logging.debug("Polling was interrupted by user") logging.debug("Polling was interrupted by user")
raise raise
except (ClientError, OSError) as e: except (ClientError, OSError) as e:
if attempt <= cfg.max_retries: if (attempt - rate_limit_attempts) <= cfg.max_retries:
logging.warning( logging.warning(
"Connection error calling %s %s. Retrying in %.2fs (%d/%d): %s", "Connection error calling %s %s. Retrying in %.2fs (%d/%d): %s",
method, method,
url, url,
delay, delay,
attempt, attempt - rate_limit_attempts,
cfg.max_retries, cfg.max_retries,
str(e), str(e),
) )
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -817,8 +828,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
request_data=request_body_log, request_data=request_body_log,
error_message=f"{type(e).__name__}: {str(e)} (will retry)", error_message=f"{type(e).__name__}: {str(e)} (will retry)",
) )
except Exception as _log_e:
logging.debug("[DEBUG] request error logging failed: %s", _log_e)
await sleep_with_interrupt( await sleep_with_interrupt(
delay, delay,
cfg.node_cls, cfg.node_cls,
@ -831,7 +840,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
continue continue
diag = await _diagnose_connectivity() diag = await _diagnose_connectivity()
if not diag["internet_accessible"]: if not diag["internet_accessible"]:
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -841,13 +849,10 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
request_data=request_body_log, request_data=request_body_log,
error_message=f"LocalNetworkError: {str(e)}", error_message=f"LocalNetworkError: {str(e)}",
) )
except Exception as _log_e:
logging.debug("[DEBUG] final error logging failed: %s", _log_e)
raise LocalNetworkError( raise LocalNetworkError(
"Unable to connect to the API server due to local network issues. " "Unable to connect to the API server due to local network issues. "
"Please check your internet connection and try again." "Please check your internet connection and try again."
) from e ) from e
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method=method, request_method=method,
@ -857,8 +862,6 @@ async def _request_base(cfg: _RequestConfig, expect_binary: bool):
request_data=request_body_log, request_data=request_body_log,
error_message=f"ApiServerError: {str(e)}", error_message=f"ApiServerError: {str(e)}",
) )
except Exception as _log_e:
logging.debug("[DEBUG] final error logging failed: %s", _log_e)
raise ApiServerError( raise ApiServerError(
f"The API server at {default_base_url()} is currently unreachable. " f"The API server at {default_base_url()} is currently unreachable. "
f"The service may be experiencing issues." f"The service may be experiencing issues."

View File

@ -167,7 +167,6 @@ async def download_url_to_bytesio(
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
dest.seek(0) dest.seek(0)
with contextlib.suppress(Exception):
request_logger.log_request_response( request_logger.log_request_response(
operation_id=op_id, operation_id=op_id,
request_method="GET", request_method="GET",
@ -181,7 +180,6 @@ async def download_url_to_bytesio(
raise ProcessingInterrupted("Task cancelled") from None raise ProcessingInterrupted("Task cancelled") from None
except (ClientError, OSError) as e: except (ClientError, OSError) as e:
if attempt <= max_retries: if attempt <= max_retries:
with contextlib.suppress(Exception):
request_logger.log_request_response( request_logger.log_request_response(
operation_id=op_id, operation_id=op_id,
request_method="GET", request_method="GET",

View File

@ -8,7 +8,6 @@ from typing import Any
import folder_paths import folder_paths
# Get the logger instance
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -91,6 +90,7 @@ def log_request_response(
Filenames are sanitized and length-limited for cross-platform safety. Filenames are sanitized and length-limited for cross-platform safety.
If we still fail to write, we fall back to appending into api.log. If we still fail to write, we fall back to appending into api.log.
""" """
try:
log_dir = get_log_directory() log_dir = get_log_directory()
filepath = _build_log_filepath(log_dir, operation_id, request_url) filepath = _build_log_filepath(log_dir, operation_id, request_url)
@ -123,6 +123,8 @@ def log_request_response(
logger.debug("API log saved to: %s", filepath) logger.debug("API log saved to: %s", filepath)
except Exception as e: except Exception as e:
logger.error("Error writing API log to %s: %s", filepath, str(e)) logger.error("Error writing API log to %s: %s", filepath, str(e))
except Exception as _log_e:
logging.debug("[DEBUG] log_request_response failed: %s", _log_e)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -254,7 +254,6 @@ async def upload_file(
monitor_task = asyncio.create_task(_monitor()) monitor_task = asyncio.create_task(_monitor())
sess: aiohttp.ClientSession | None = None sess: aiohttp.ClientSession | None = None
try:
try: try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
@ -264,8 +263,6 @@ async def upload_file(
request_params=None, request_params=None,
request_data=f"[File data {len(data)} bytes]", request_data=f"[File data {len(data)} bytes]",
) )
except Exception as e:
logging.debug("[DEBUG] upload request logging failed: %s", e)
sess = aiohttp.ClientSession(timeout=timeout) sess = aiohttp.ClientSession(timeout=timeout)
req = sess.put(upload_url, data=data, headers=headers, skip_auto_headers=skip_auto_headers) req = sess.put(upload_url, data=data, headers=headers, skip_auto_headers=skip_auto_headers)
@ -311,7 +308,6 @@ async def upload_file(
delay *= retry_backoff delay *= retry_backoff
continue continue
raise Exception(f"Failed to upload (HTTP {resp.status}).") raise Exception(f"Failed to upload (HTTP {resp.status}).")
try:
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method="PUT", request_method="PUT",
@ -320,14 +316,11 @@ async def upload_file(
response_headers=dict(resp.headers), response_headers=dict(resp.headers),
response_content="File uploaded successfully.", response_content="File uploaded successfully.",
) )
except Exception as e:
logging.debug("[DEBUG] upload response logging failed: %s", e)
return return
except asyncio.CancelledError: except asyncio.CancelledError:
raise ProcessingInterrupted("Task cancelled") from None raise ProcessingInterrupted("Task cancelled") from None
except (aiohttp.ClientError, OSError) as e: except (aiohttp.ClientError, OSError) as e:
if attempt <= max_retries: if attempt <= max_retries:
with contextlib.suppress(Exception):
request_logger.log_request_response( request_logger.log_request_response(
operation_id=operation_id, operation_id=operation_id,
request_method="PUT", request_method="PUT",