mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-02-17 00:43:48 +08:00
feat(api-nodes): implement new API client for V3 nodes
This commit is contained in:
parent
55ac7d333c
commit
462ce028be
@ -1,7 +1,7 @@
|
||||
import logging
|
||||
import math
|
||||
from enum import Enum
|
||||
from typing import Literal, Optional, Type, Union
|
||||
from typing import Literal, Optional, Union
|
||||
from typing_extensions import override
|
||||
|
||||
import torch
|
||||
@ -13,18 +13,15 @@ from comfy_api_nodes.util.validation_utils import (
|
||||
get_number_of_images,
|
||||
validate_image_dimensions,
|
||||
)
|
||||
from comfy_api_nodes.apis.client import (
|
||||
from comfy_api_nodes.util import (
|
||||
ApiEndpoint,
|
||||
EmptyRequest,
|
||||
HttpMethod,
|
||||
SynchronousOperation,
|
||||
PollingOperation,
|
||||
T,
|
||||
sync_op_pydantic,
|
||||
poll_op_pydantic,
|
||||
upload_images_to_comfyapi,
|
||||
)
|
||||
from comfy_api_nodes.apinode_utils import (
|
||||
download_url_to_image_tensor,
|
||||
download_url_to_video_output,
|
||||
upload_images_to_comfyapi,
|
||||
validate_string,
|
||||
image_tensor_pair_to_batch,
|
||||
)
|
||||
@ -208,35 +205,6 @@ def get_video_url_from_task_status(response: TaskStatusResponse) -> Union[str, N
|
||||
return None
|
||||
|
||||
|
||||
async def poll_until_finished(
|
||||
auth_kwargs: dict[str, str],
|
||||
task_id: str,
|
||||
estimated_duration: Optional[int] = None,
|
||||
node_id: Optional[str] = None,
|
||||
) -> TaskStatusResponse:
|
||||
"""Polls the ByteDance API endpoint until the task reaches a terminal state, then returns the response."""
|
||||
return await PollingOperation(
|
||||
poll_endpoint=ApiEndpoint(
|
||||
path=f"{BYTEPLUS_TASK_STATUS_ENDPOINT}/{task_id}",
|
||||
method=HttpMethod.GET,
|
||||
request_model=EmptyRequest,
|
||||
response_model=TaskStatusResponse,
|
||||
),
|
||||
completed_statuses=[
|
||||
"succeeded",
|
||||
],
|
||||
failed_statuses=[
|
||||
"cancelled",
|
||||
"failed",
|
||||
],
|
||||
status_extractor=lambda response: response.status,
|
||||
auth_kwargs=auth_kwargs,
|
||||
result_url_extractor=get_video_url_from_task_status,
|
||||
estimated_duration=estimated_duration,
|
||||
node_id=node_id,
|
||||
).execute()
|
||||
|
||||
|
||||
class ByteDanceImageNode(IO.ComfyNode):
|
||||
|
||||
@classmethod
|
||||
@ -353,20 +321,12 @@ class ByteDanceImageNode(IO.ComfyNode):
|
||||
guidance_scale=guidance_scale,
|
||||
watermark=watermark,
|
||||
)
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
response = await SynchronousOperation(
|
||||
endpoint=ApiEndpoint(
|
||||
path=BYTEPLUS_IMAGE_ENDPOINT,
|
||||
method=HttpMethod.POST,
|
||||
request_model=Text2ImageTaskCreationRequest,
|
||||
response_model=ImageTaskCreationResponse,
|
||||
),
|
||||
request=payload,
|
||||
auth_kwargs=auth_kwargs,
|
||||
).execute()
|
||||
response = await sync_op_pydantic(
|
||||
cls,
|
||||
endpoint=ApiEndpoint(path=BYTEPLUS_IMAGE_ENDPOINT, method="POST"),
|
||||
data=payload,
|
||||
response_model=ImageTaskCreationResponse,
|
||||
)
|
||||
return IO.NodeOutput(await download_url_to_image_tensor(get_image_url_from_response(response)))
|
||||
|
||||
|
||||
@ -449,16 +409,7 @@ class ByteDanceImageEditNode(IO.ComfyNode):
|
||||
if get_number_of_images(image) != 1:
|
||||
raise ValueError("Exactly one input image is required.")
|
||||
validate_image_aspect_ratio_range(image, (1, 3), (3, 1))
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
source_url = (await upload_images_to_comfyapi(
|
||||
image,
|
||||
max_images=1,
|
||||
mime_type="image/png",
|
||||
auth_kwargs=auth_kwargs,
|
||||
))[0]
|
||||
source_url = (await upload_images_to_comfyapi(cls, image, max_images=1, mime_type="image/png"))[0]
|
||||
payload = Image2ImageTaskCreationRequest(
|
||||
model=model,
|
||||
prompt=prompt,
|
||||
@ -467,16 +418,12 @@ class ByteDanceImageEditNode(IO.ComfyNode):
|
||||
guidance_scale=guidance_scale,
|
||||
watermark=watermark,
|
||||
)
|
||||
response = await SynchronousOperation(
|
||||
endpoint=ApiEndpoint(
|
||||
path=BYTEPLUS_IMAGE_ENDPOINT,
|
||||
method=HttpMethod.POST,
|
||||
request_model=Image2ImageTaskCreationRequest,
|
||||
response_model=ImageTaskCreationResponse,
|
||||
),
|
||||
request=payload,
|
||||
auth_kwargs=auth_kwargs,
|
||||
).execute()
|
||||
response = await sync_op_pydantic(
|
||||
cls,
|
||||
endpoint=ApiEndpoint(path=BYTEPLUS_IMAGE_ENDPOINT, method="POST"),
|
||||
data=payload,
|
||||
response_model=ImageTaskCreationResponse,
|
||||
)
|
||||
return IO.NodeOutput(await download_url_to_image_tensor(get_image_url_from_response(response)))
|
||||
|
||||
|
||||
@ -621,41 +568,31 @@ class ByteDanceSeedreamNode(IO.ComfyNode):
|
||||
raise ValueError(
|
||||
"The maximum number of generated images plus the number of reference images cannot exceed 15."
|
||||
)
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
reference_images_urls = []
|
||||
if n_input_images:
|
||||
for i in image:
|
||||
validate_image_aspect_ratio_range(i, (1, 3), (3, 1))
|
||||
reference_images_urls = (await upload_images_to_comfyapi(
|
||||
reference_images_urls = await upload_images_to_comfyapi(
|
||||
cls,
|
||||
image,
|
||||
max_images=n_input_images,
|
||||
mime_type="image/png",
|
||||
auth_kwargs=auth_kwargs,
|
||||
))
|
||||
payload = Seedream4TaskCreationRequest(
|
||||
model=model,
|
||||
prompt=prompt,
|
||||
image=reference_images_urls,
|
||||
size=f"{w}x{h}",
|
||||
seed=seed,
|
||||
sequential_image_generation=sequential_image_generation,
|
||||
sequential_image_generation_options=Seedream4Options(max_images=max_images),
|
||||
watermark=watermark,
|
||||
)
|
||||
response = await SynchronousOperation(
|
||||
endpoint=ApiEndpoint(
|
||||
path=BYTEPLUS_IMAGE_ENDPOINT,
|
||||
method=HttpMethod.POST,
|
||||
request_model=Seedream4TaskCreationRequest,
|
||||
response_model=ImageTaskCreationResponse,
|
||||
)
|
||||
response = await sync_op_pydantic(
|
||||
cls,
|
||||
endpoint=ApiEndpoint(path=BYTEPLUS_IMAGE_ENDPOINT, method="POST"),
|
||||
response_model=ImageTaskCreationResponse,
|
||||
data=Seedream4TaskCreationRequest(
|
||||
model=model,
|
||||
prompt=prompt,
|
||||
image=reference_images_urls,
|
||||
size=f"{w}x{h}",
|
||||
seed=seed,
|
||||
sequential_image_generation=sequential_image_generation,
|
||||
sequential_image_generation_options=Seedream4Options(max_images=max_images),
|
||||
watermark=watermark,
|
||||
),
|
||||
request=payload,
|
||||
auth_kwargs=auth_kwargs,
|
||||
).execute()
|
||||
|
||||
)
|
||||
if len(response.data) == 1:
|
||||
return IO.NodeOutput(await download_url_to_image_tensor(get_image_url_from_response(response)))
|
||||
urls = [str(d["url"]) for d in response.data if isinstance(d, dict) and "url" in d]
|
||||
@ -764,19 +701,9 @@ class ByteDanceTextToVideoNode(IO.ComfyNode):
|
||||
f"--camerafixed {str(camera_fixed).lower()} "
|
||||
f"--watermark {str(watermark).lower()}"
|
||||
)
|
||||
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
return await process_video_task(
|
||||
request_model=Text2VideoTaskCreationRequest,
|
||||
payload=Text2VideoTaskCreationRequest(
|
||||
model=model,
|
||||
content=[TaskTextContent(text=prompt)],
|
||||
),
|
||||
auth_kwargs=auth_kwargs,
|
||||
node_id=cls.hidden.unique_id,
|
||||
cls,
|
||||
payload=Text2VideoTaskCreationRequest(model=model, content=[TaskTextContent(text=prompt)]),
|
||||
estimated_duration=max(1, math.ceil(VIDEO_TASKS_EXECUTION_TIME[model][resolution] * (duration / 10.0))),
|
||||
)
|
||||
|
||||
@ -879,13 +806,7 @@ class ByteDanceImageToVideoNode(IO.ComfyNode):
|
||||
validate_image_dimensions(image, min_width=300, min_height=300, max_width=6000, max_height=6000)
|
||||
validate_image_aspect_ratio_range(image, (2, 5), (5, 2), strict=False) # 0.4 to 2.5
|
||||
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
|
||||
image_url = (await upload_images_to_comfyapi(image, max_images=1, auth_kwargs=auth_kwargs))[0]
|
||||
|
||||
image_url = (await upload_images_to_comfyapi(cls, image, max_images=1))[0]
|
||||
prompt = (
|
||||
f"{prompt} "
|
||||
f"--resolution {resolution} "
|
||||
@ -897,13 +818,11 @@ class ByteDanceImageToVideoNode(IO.ComfyNode):
|
||||
)
|
||||
|
||||
return await process_video_task(
|
||||
request_model=Image2VideoTaskCreationRequest,
|
||||
cls,
|
||||
payload=Image2VideoTaskCreationRequest(
|
||||
model=model,
|
||||
content=[TaskTextContent(text=prompt), TaskImageContent(image_url=TaskImageContentUrl(url=image_url))],
|
||||
),
|
||||
auth_kwargs=auth_kwargs,
|
||||
node_id=cls.hidden.unique_id,
|
||||
estimated_duration=max(1, math.ceil(VIDEO_TASKS_EXECUTION_TIME[model][resolution] * (duration / 10.0))),
|
||||
)
|
||||
|
||||
@ -1012,16 +931,11 @@ class ByteDanceFirstLastFrameNode(IO.ComfyNode):
|
||||
validate_image_dimensions(i, min_width=300, min_height=300, max_width=6000, max_height=6000)
|
||||
validate_image_aspect_ratio_range(i, (2, 5), (5, 2), strict=False) # 0.4 to 2.5
|
||||
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
|
||||
download_urls = await upload_images_to_comfyapi(
|
||||
cls,
|
||||
image_tensor_pair_to_batch(first_frame, last_frame),
|
||||
max_images=2,
|
||||
mime_type="image/png",
|
||||
auth_kwargs=auth_kwargs,
|
||||
)
|
||||
|
||||
prompt = (
|
||||
@ -1035,7 +949,7 @@ class ByteDanceFirstLastFrameNode(IO.ComfyNode):
|
||||
)
|
||||
|
||||
return await process_video_task(
|
||||
request_model=Image2VideoTaskCreationRequest,
|
||||
cls,
|
||||
payload=Image2VideoTaskCreationRequest(
|
||||
model=model,
|
||||
content=[
|
||||
@ -1044,8 +958,6 @@ class ByteDanceFirstLastFrameNode(IO.ComfyNode):
|
||||
TaskImageContent(image_url=TaskImageContentUrl(url=str(download_urls[1])), role="last_frame"),
|
||||
],
|
||||
),
|
||||
auth_kwargs=auth_kwargs,
|
||||
node_id=cls.hidden.unique_id,
|
||||
estimated_duration=max(1, math.ceil(VIDEO_TASKS_EXECUTION_TIME[model][resolution] * (duration / 10.0))),
|
||||
)
|
||||
|
||||
@ -1141,15 +1053,7 @@ class ByteDanceImageReferenceNode(IO.ComfyNode):
|
||||
validate_image_dimensions(image, min_width=300, min_height=300, max_width=6000, max_height=6000)
|
||||
validate_image_aspect_ratio_range(image, (2, 5), (5, 2), strict=False) # 0.4 to 2.5
|
||||
|
||||
auth_kwargs = {
|
||||
"auth_token": cls.hidden.auth_token_comfy_org,
|
||||
"comfy_api_key": cls.hidden.api_key_comfy_org,
|
||||
}
|
||||
|
||||
image_urls = await upload_images_to_comfyapi(
|
||||
images, max_images=4, mime_type="image/png", auth_kwargs=auth_kwargs
|
||||
)
|
||||
|
||||
image_urls = await upload_images_to_comfyapi(cls, images, max_images=4, mime_type="image/png")
|
||||
prompt = (
|
||||
f"{prompt} "
|
||||
f"--resolution {resolution} "
|
||||
@ -1163,39 +1067,32 @@ class ByteDanceImageReferenceNode(IO.ComfyNode):
|
||||
*[TaskImageContent(image_url=TaskImageContentUrl(url=str(i)), role="reference_image") for i in image_urls]
|
||||
]
|
||||
return await process_video_task(
|
||||
request_model=Image2VideoTaskCreationRequest,
|
||||
payload=Image2VideoTaskCreationRequest(
|
||||
model=model,
|
||||
content=x,
|
||||
),
|
||||
auth_kwargs=auth_kwargs,
|
||||
node_id=cls.hidden.unique_id,
|
||||
cls,
|
||||
payload=Image2VideoTaskCreationRequest(model=model, content=x),
|
||||
estimated_duration=max(1, math.ceil(VIDEO_TASKS_EXECUTION_TIME[model][resolution] * (duration / 10.0))),
|
||||
)
|
||||
|
||||
|
||||
async def process_video_task(
|
||||
request_model: Type[T],
|
||||
cls: type[IO.ComfyNode],
|
||||
payload: Union[Text2VideoTaskCreationRequest, Image2VideoTaskCreationRequest],
|
||||
auth_kwargs: dict,
|
||||
node_id: str,
|
||||
estimated_duration: Optional[int],
|
||||
) -> IO.NodeOutput:
|
||||
initial_response = await SynchronousOperation(
|
||||
endpoint=ApiEndpoint(
|
||||
path=BYTEPLUS_TASK_ENDPOINT,
|
||||
method=HttpMethod.POST,
|
||||
request_model=request_model,
|
||||
response_model=TaskCreationResponse,
|
||||
),
|
||||
request=payload,
|
||||
auth_kwargs=auth_kwargs,
|
||||
).execute()
|
||||
response = await poll_until_finished(
|
||||
auth_kwargs,
|
||||
initial_response.id,
|
||||
initial_response = await sync_op_pydantic(
|
||||
cls,
|
||||
endpoint=ApiEndpoint(path=BYTEPLUS_TASK_ENDPOINT, method="POST"),
|
||||
data=payload,
|
||||
response_model=TaskCreationResponse,
|
||||
)
|
||||
response = await poll_op_pydantic(
|
||||
cls,
|
||||
poll_endpoint=ApiEndpoint(path=f"{BYTEPLUS_TASK_STATUS_ENDPOINT}/{initial_response.id}"),
|
||||
completed_statuses=["succeeded"],
|
||||
failed_statuses=["cancelled", "failed"],
|
||||
queued_states=["queued"],
|
||||
status_extractor=lambda r: r.status,
|
||||
estimated_duration=estimated_duration,
|
||||
node_id=node_id,
|
||||
response_model=TaskStatusResponse,
|
||||
)
|
||||
return IO.NodeOutput(await download_url_to_video_output(get_video_url_from_task_status(response)))
|
||||
|
||||
|
||||
@ -0,0 +1,23 @@
|
||||
from .api_client import ApiEndpoint, sync_op_pydantic, poll_op_pydantic, sync_op, poll_op
|
||||
from .download_helpers import (
|
||||
download_url_to_bytesio,
|
||||
download_url_to_image_tensor,
|
||||
bytesio_to_image_tensor,
|
||||
)
|
||||
from .upload_helpers import (
|
||||
upload_file_to_comfyapi,
|
||||
upload_images_to_comfyapi,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ApiEndpoint",
|
||||
"poll_op",
|
||||
"sync_op",
|
||||
"poll_op_pydantic",
|
||||
"sync_op_pydantic",
|
||||
"upload_file_to_comfyapi",
|
||||
"upload_images_to_comfyapi",
|
||||
"download_url_to_bytesio",
|
||||
"download_url_to_image_tensor",
|
||||
"bytesio_to_image_tensor",
|
||||
]
|
||||
58
comfy_api_nodes/util/_helpers.py
Normal file
58
comfy_api_nodes/util/_helpers.py
Normal file
@ -0,0 +1,58 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
import time
|
||||
from typing import Optional, Callable
|
||||
|
||||
from comfy_api.latest import IO
|
||||
from comfy.cli_args import args
|
||||
from comfy.model_management import processing_interrupted
|
||||
|
||||
from .common_exceptions import ProcessingInterrupted
|
||||
|
||||
|
||||
def _is_processing_interrupted() -> bool:
|
||||
"""Return True if user/runtime requested interruption."""
|
||||
return processing_interrupted()
|
||||
|
||||
|
||||
def _get_node_id(node_cls: type[IO.ComfyNode]) -> str:
|
||||
return node_cls.hidden.unique_id
|
||||
|
||||
|
||||
def _get_auth_header(node_cls: type[IO.ComfyNode]) -> dict[str, str]:
|
||||
if node_cls.hidden.auth_token_comfy_org:
|
||||
return {"Authorization": f"Bearer {node_cls.hidden.auth_token_comfy_org}"}
|
||||
if node_cls.hidden.api_key_comfy_org:
|
||||
return {"X-API-KEY": node_cls.hidden.api_key_comfy_org}
|
||||
return {}
|
||||
|
||||
|
||||
def _default_base_url() -> str:
|
||||
return getattr(args, "comfy_api_base", "https://api.comfy.org")
|
||||
|
||||
|
||||
async def _sleep_with_interrupt(
|
||||
seconds: float,
|
||||
node_cls: type[IO.ComfyNode],
|
||||
label: Optional[str] = None,
|
||||
start_ts: Optional[float] = None,
|
||||
estimated_total: Optional[int] = None,
|
||||
*,
|
||||
display_callback: Optional[Callable[[type[IO.ComfyNode], str, int, Optional[int]], None]] = None,
|
||||
):
|
||||
"""
|
||||
Sleep in 1s slices while:
|
||||
- Checking for interruption (raises ProcessingInterrupted).
|
||||
- Optionally emitting time progress via display_callback (if provided).
|
||||
"""
|
||||
end = time.monotonic() + seconds
|
||||
while True:
|
||||
if _is_processing_interrupted():
|
||||
raise ProcessingInterrupted("Task cancelled")
|
||||
now = time.monotonic()
|
||||
if start_ts is not None and label and display_callback:
|
||||
with contextlib.suppress(Exception):
|
||||
display_callback(node_cls, label, int(now - start_ts), estimated_total)
|
||||
if now >= end:
|
||||
break
|
||||
await asyncio.sleep(min(1.0, end - now))
|
||||
915
comfy_api_nodes/util/api_client.py
Normal file
915
comfy_api_nodes/util/api_client.py
Normal file
@ -0,0 +1,915 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from io import BytesIO
|
||||
from typing import Any, Callable, Optional, Union, Type, TypeVar, Literal
|
||||
|
||||
import aiohttp
|
||||
from aiohttp.client_exceptions import ClientError, ContentTypeError
|
||||
from comfy_api.latest import IO
|
||||
from comfy import utils
|
||||
from pydantic import BaseModel
|
||||
from server import PromptServer
|
||||
from urllib.parse import urljoin, urlparse
|
||||
|
||||
from comfy_api_nodes.apis import request_logger
|
||||
from .common_exceptions import ProcessingInterrupted, LocalNetworkError, ApiServerError
|
||||
from ._helpers import (
|
||||
_is_processing_interrupted,
|
||||
_get_node_id,
|
||||
_get_auth_header,
|
||||
_default_base_url,
|
||||
_sleep_with_interrupt,
|
||||
)
|
||||
|
||||
|
||||
M = TypeVar("M", bound=BaseModel)
|
||||
|
||||
|
||||
class ApiEndpoint:
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"] = "GET",
|
||||
*,
|
||||
query_params: Optional[dict[str, Any]] = None,
|
||||
headers: Optional[dict[str, str]] = None,
|
||||
):
|
||||
self.path = path
|
||||
self.method = method
|
||||
self.query_params = query_params or {}
|
||||
self.headers = headers or {}
|
||||
|
||||
|
||||
@dataclass
|
||||
class _RequestConfig:
|
||||
node_cls: type[IO.ComfyNode]
|
||||
endpoint: ApiEndpoint
|
||||
timeout: float
|
||||
content_type: str
|
||||
data: Optional[dict[str, Any]]
|
||||
files: Optional[Union[dict[str, Any], list[tuple[str, Any]]]]
|
||||
multipart_parser: Optional[Callable]
|
||||
max_retries: int
|
||||
retry_delay: float
|
||||
retry_backoff: float
|
||||
wait_label: str = "Waiting"
|
||||
monitor_progress: bool = True
|
||||
estimated_total: Optional[int] = None
|
||||
final_label_on_success: Optional[str] = "Completed"
|
||||
progress_origin_ts: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class _PollUIState:
|
||||
started: float
|
||||
status_label: str = "Queued"
|
||||
is_queued: bool = True
|
||||
price: Optional[float] = None
|
||||
estimated_duration: Optional[int] = None
|
||||
base_processing_elapsed: float = 0.0 # sum of completed active intervals
|
||||
active_since: Optional[float] = None # start time of current active interval (None if queued)
|
||||
|
||||
|
||||
_RETRY_STATUS = {408, 429, 500, 502, 503, 504}
|
||||
|
||||
|
||||
async def sync_op_pydantic(
|
||||
cls: type[IO.ComfyNode],
|
||||
endpoint: ApiEndpoint,
|
||||
*,
|
||||
response_model: Type[M],
|
||||
data: Optional[BaseModel] = None,
|
||||
files: Optional[Union[dict[str, Any], list[tuple[str, Any]]]] = None,
|
||||
content_type: str = "application/json",
|
||||
timeout: float = 3600.0,
|
||||
multipart_parser: Optional[Callable] = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff: float = 2.0,
|
||||
wait_label: str = "Waiting for server",
|
||||
estimated_total: Optional[int] = None,
|
||||
final_label_on_success: Optional[str] = "Completed",
|
||||
progress_origin_ts: Optional[float] = None,
|
||||
monitor_progress: bool = True,
|
||||
) -> M:
|
||||
raw = await sync_op(
|
||||
cls,
|
||||
endpoint,
|
||||
data=data,
|
||||
files=files,
|
||||
content_type=content_type,
|
||||
timeout=timeout,
|
||||
multipart_parser=multipart_parser,
|
||||
max_retries=max_retries,
|
||||
retry_delay=retry_delay,
|
||||
retry_backoff=retry_backoff,
|
||||
wait_label=wait_label,
|
||||
estimated_total=estimated_total,
|
||||
as_binary=False,
|
||||
final_label_on_success=final_label_on_success,
|
||||
progress_origin_ts=progress_origin_ts,
|
||||
monitor_progress=monitor_progress,
|
||||
)
|
||||
if not isinstance(raw, dict):
|
||||
raise Exception("Expected JSON response to validate into a Pydantic model, got non-JSON (binary or text).")
|
||||
return _validate_or_raise(response_model, raw)
|
||||
|
||||
|
||||
async def poll_op_pydantic(
|
||||
cls: type[IO.ComfyNode],
|
||||
*,
|
||||
poll_endpoint: ApiEndpoint,
|
||||
response_model: Type[M],
|
||||
status_extractor: Callable[[M], Optional[str]],
|
||||
progress_extractor: Optional[Callable[[M], Optional[int]]] = None,
|
||||
price_extractor: Optional[Callable[[M], Optional[float]]] = None,
|
||||
completed_statuses: list[str],
|
||||
failed_statuses: list[str],
|
||||
queued_states: Optional[list[str]] = None,
|
||||
poll_interval: float = 5.0,
|
||||
max_poll_attempts: int = 120,
|
||||
timeout_per_poll: float = 120.0,
|
||||
max_retries_per_poll: int = 3,
|
||||
retry_delay_per_poll: float = 1.0,
|
||||
retry_backoff_per_poll: float = 2.0,
|
||||
estimated_duration: Optional[int] = None,
|
||||
cancel_endpoint: Optional[ApiEndpoint] = None,
|
||||
cancel_timeout: float = 10.0,
|
||||
) -> M:
|
||||
raw = await poll_op(
|
||||
cls,
|
||||
poll_endpoint=poll_endpoint,
|
||||
status_extractor=_wrap_model_extractor(response_model, status_extractor),
|
||||
progress_extractor=_wrap_model_extractor(response_model, progress_extractor),
|
||||
price_extractor=_wrap_model_extractor(response_model, price_extractor),
|
||||
completed_statuses=completed_statuses,
|
||||
failed_statuses=failed_statuses,
|
||||
queued_states=queued_states,
|
||||
poll_interval=poll_interval,
|
||||
max_poll_attempts=max_poll_attempts,
|
||||
timeout_per_poll=timeout_per_poll,
|
||||
max_retries_per_poll=max_retries_per_poll,
|
||||
retry_delay_per_poll=retry_delay_per_poll,
|
||||
retry_backoff_per_poll=retry_backoff_per_poll,
|
||||
estimated_duration=estimated_duration,
|
||||
cancel_endpoint=cancel_endpoint,
|
||||
cancel_timeout=cancel_timeout,
|
||||
)
|
||||
if not isinstance(raw, dict):
|
||||
raise Exception("Expected JSON response to validate into a Pydantic model, got non-JSON (binary or text).")
|
||||
return _validate_or_raise(response_model, raw)
|
||||
|
||||
|
||||
async def sync_op(
|
||||
cls: type[IO.ComfyNode],
|
||||
endpoint: ApiEndpoint,
|
||||
*,
|
||||
data: Optional[Union[dict[str, Any], BaseModel]] = None,
|
||||
files: Optional[Union[dict[str, Any], list[tuple[str, Any]]]] = None,
|
||||
content_type: str = "application/json",
|
||||
timeout: float = 3600.0,
|
||||
multipart_parser: Optional[Callable] = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff: float = 2.0,
|
||||
wait_label: str = "Waiting for server",
|
||||
estimated_total: Optional[int] = None,
|
||||
as_binary: bool = False,
|
||||
final_label_on_success: Optional[str] = "Completed",
|
||||
progress_origin_ts: Optional[float] = None,
|
||||
monitor_progress: bool = True,
|
||||
) -> Union[dict[str, Any], bytes]:
|
||||
"""
|
||||
Make a single network request.
|
||||
- If as_binary=False (default): returns JSON dict (or {'_raw': '<text>'} if non-JSON).
|
||||
- If as_binary=True: returns bytes.
|
||||
"""
|
||||
if isinstance(data, BaseModel):
|
||||
data = data.model_dump(exclude_none=True)
|
||||
for k, v in list(data.items()):
|
||||
if isinstance(v, Enum):
|
||||
data[k] = v.value
|
||||
cfg = _RequestConfig(
|
||||
node_cls=cls,
|
||||
endpoint=endpoint,
|
||||
timeout=timeout,
|
||||
content_type=content_type,
|
||||
data=data,
|
||||
files=files,
|
||||
multipart_parser=multipart_parser,
|
||||
max_retries=max_retries,
|
||||
retry_delay=retry_delay,
|
||||
retry_backoff=retry_backoff,
|
||||
wait_label=wait_label,
|
||||
monitor_progress=monitor_progress,
|
||||
estimated_total=estimated_total,
|
||||
final_label_on_success=final_label_on_success,
|
||||
progress_origin_ts=progress_origin_ts,
|
||||
)
|
||||
return await _request_base(cfg, expect_binary=as_binary)
|
||||
|
||||
|
||||
async def poll_op(
|
||||
cls: type[IO.ComfyNode],
|
||||
*,
|
||||
poll_endpoint: ApiEndpoint,
|
||||
status_extractor: Callable[[dict[str, Any]], Optional[str]],
|
||||
progress_extractor: Optional[Callable[[dict[str, Any]], Optional[int]]] = None,
|
||||
price_extractor: Optional[Callable[[dict[str, Any]], Optional[float]]] = None,
|
||||
completed_statuses: list[str],
|
||||
failed_statuses: list[str],
|
||||
queued_states: Optional[list[str]] = None,
|
||||
poll_interval: float = 5.0,
|
||||
max_poll_attempts: int = 120,
|
||||
timeout_per_poll: float = 120.0,
|
||||
max_retries_per_poll: int = 3,
|
||||
retry_delay_per_poll: float = 1.0,
|
||||
retry_backoff_per_poll: float = 2.0,
|
||||
estimated_duration: Optional[int] = None,
|
||||
cancel_endpoint: Optional[ApiEndpoint] = None,
|
||||
cancel_timeout: float = 10.0,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Polls an endpoint until the task reaches a terminal state. Displays time while queued/processing,
|
||||
checks interruption every second, and calls Cancel endpoint (if provided) on interruption.
|
||||
Returns the final JSON response from the poll endpoint.
|
||||
"""
|
||||
queued_states = queued_states or []
|
||||
started = time.monotonic()
|
||||
consumed_attempts = 0 # counts only non-queued polls
|
||||
|
||||
progress_bar = utils.ProgressBar(100) if progress_extractor else None
|
||||
last_progress: Optional[int] = None
|
||||
|
||||
state = _PollUIState(started=started, estimated_duration=estimated_duration)
|
||||
stop_ticker = asyncio.Event()
|
||||
|
||||
async def _ticker():
|
||||
"""Emit a UI update every second while polling is in progress."""
|
||||
try:
|
||||
while not stop_ticker.is_set():
|
||||
if _is_processing_interrupted():
|
||||
break
|
||||
now = time.monotonic()
|
||||
proc_elapsed = state.base_processing_elapsed + (
|
||||
(now - state.active_since) if state.active_since is not None else 0.0
|
||||
)
|
||||
_display_time_progress(
|
||||
cls,
|
||||
label=state.status_label,
|
||||
elapsed_seconds=int(now - state.started),
|
||||
estimated_total=state.estimated_duration,
|
||||
price=state.price,
|
||||
is_queued=state.is_queued,
|
||||
processing_elapsed_seconds=int(proc_elapsed),
|
||||
)
|
||||
await asyncio.sleep(1.0)
|
||||
except Exception as exc:
|
||||
logging.debug("Polling ticker exited: %s", exc)
|
||||
|
||||
ticker_task = asyncio.create_task(_ticker())
|
||||
try:
|
||||
while consumed_attempts < max_poll_attempts:
|
||||
try:
|
||||
resp_json = await sync_op(
|
||||
cls,
|
||||
poll_endpoint,
|
||||
timeout=timeout_per_poll,
|
||||
max_retries=max_retries_per_poll,
|
||||
retry_delay=retry_delay_per_poll,
|
||||
retry_backoff=retry_backoff_per_poll,
|
||||
wait_label="Checking",
|
||||
estimated_total=None,
|
||||
as_binary=False,
|
||||
final_label_on_success=None,
|
||||
monitor_progress=False,
|
||||
)
|
||||
if not isinstance(resp_json, dict):
|
||||
raise Exception("Polling endpoint returned non-JSON response.")
|
||||
except ProcessingInterrupted:
|
||||
if cancel_endpoint:
|
||||
with contextlib.suppress(Exception):
|
||||
await sync_op(
|
||||
cls,
|
||||
cancel_endpoint,
|
||||
timeout=cancel_timeout,
|
||||
max_retries=0,
|
||||
wait_label="Cancelling task",
|
||||
estimated_total=None,
|
||||
as_binary=False,
|
||||
final_label_on_success=None,
|
||||
monitor_progress=False,
|
||||
)
|
||||
raise
|
||||
|
||||
try:
|
||||
status = status_extractor(resp_json)
|
||||
except Exception as e:
|
||||
logging.error("Status extraction failed: %s", e)
|
||||
status = None
|
||||
|
||||
if price_extractor:
|
||||
new_price = price_extractor(resp_json)
|
||||
if new_price is not None:
|
||||
state.price = new_price
|
||||
|
||||
if progress_extractor:
|
||||
new_progress = progress_extractor(resp_json)
|
||||
if new_progress is not None and last_progress != new_progress:
|
||||
progress_bar.update_absolute(new_progress, total=100)
|
||||
last_progress = new_progress
|
||||
|
||||
now_ts = time.monotonic()
|
||||
is_queued = status in queued_states
|
||||
|
||||
if is_queued:
|
||||
if state.active_since is not None: # If we just moved from active -> queued, close the active interval
|
||||
state.base_processing_elapsed += (now_ts - state.active_since)
|
||||
state.active_since = None
|
||||
else:
|
||||
if state.active_since is None: # If we just moved from queued -> active, open a new active interval
|
||||
state.active_since = now_ts
|
||||
|
||||
state.is_queued = is_queued
|
||||
state.status_label = status or ("Queued" if is_queued else "Processing")
|
||||
if status in completed_statuses:
|
||||
if state.active_since is not None:
|
||||
state.base_processing_elapsed += (now_ts - state.active_since)
|
||||
state.active_since = None
|
||||
stop_ticker.set()
|
||||
with contextlib.suppress(Exception):
|
||||
await ticker_task
|
||||
|
||||
if progress_bar and last_progress != 100:
|
||||
progress_bar.update_absolute(100, total=100)
|
||||
|
||||
_display_time_progress(
|
||||
cls,
|
||||
label=status if status else "Completed",
|
||||
elapsed_seconds=int(now_ts - started),
|
||||
estimated_total=estimated_duration,
|
||||
price=state.price,
|
||||
is_queued=False,
|
||||
processing_elapsed_seconds=int(state.base_processing_elapsed),
|
||||
)
|
||||
return resp_json
|
||||
|
||||
if status in failed_statuses:
|
||||
msg = f"Task failed: {json.dumps(resp_json)}"
|
||||
logging.error(msg)
|
||||
raise Exception(msg)
|
||||
|
||||
try:
|
||||
await _sleep_with_interrupt(poll_interval, cls, None, None, None)
|
||||
except ProcessingInterrupted:
|
||||
if cancel_endpoint:
|
||||
with contextlib.suppress(Exception):
|
||||
await sync_op(
|
||||
cls,
|
||||
cancel_endpoint,
|
||||
timeout=cancel_timeout,
|
||||
max_retries=0,
|
||||
wait_label="Cancelling task",
|
||||
estimated_total=None,
|
||||
as_binary=False,
|
||||
final_label_on_success=None,
|
||||
monitor_progress=False,
|
||||
)
|
||||
raise
|
||||
if not is_queued:
|
||||
consumed_attempts += 1
|
||||
|
||||
raise Exception(
|
||||
f"Polling timed out after {max_poll_attempts} non-queued attempts "
|
||||
f"(~{int(max_poll_attempts * poll_interval)}s of active polling)."
|
||||
)
|
||||
except ProcessingInterrupted:
|
||||
raise
|
||||
except (LocalNetworkError, ApiServerError):
|
||||
raise
|
||||
except Exception as e:
|
||||
raise Exception(f"Polling aborted due to error: {e}") from e
|
||||
finally:
|
||||
stop_ticker.set()
|
||||
with contextlib.suppress(Exception):
|
||||
await ticker_task
|
||||
|
||||
|
||||
def _display_text(
|
||||
node_cls: type[IO.ComfyNode],
|
||||
text: Optional[str],
|
||||
*,
|
||||
status: Optional[str] = None,
|
||||
price: Optional[float] = None,
|
||||
) -> None:
|
||||
display_lines: list[str] = []
|
||||
if status:
|
||||
display_lines.append(f"Status: {status.capitalize()}")
|
||||
if price is not None:
|
||||
display_lines.append(f"Price: ${float(price):,.4f}")
|
||||
if text is not None:
|
||||
display_lines.append(text)
|
||||
if display_lines:
|
||||
PromptServer.instance.send_progress_text("\n".join(display_lines), _get_node_id(node_cls))
|
||||
|
||||
|
||||
def _display_time_progress(
|
||||
node_cls: type[IO.ComfyNode],
|
||||
label: str,
|
||||
elapsed_seconds: int,
|
||||
estimated_total: Optional[int] = None,
|
||||
*,
|
||||
price: Optional[float] = None,
|
||||
is_queued: Optional[bool] = None,
|
||||
processing_elapsed_seconds: Optional[int] = None,
|
||||
) -> None:
|
||||
if estimated_total is not None and estimated_total > 0 and is_queued is False:
|
||||
pe = processing_elapsed_seconds if processing_elapsed_seconds is not None else elapsed_seconds
|
||||
remaining = max(0, int(estimated_total) - int(pe))
|
||||
time_line = f"Time elapsed: {int(elapsed_seconds)}s (~{remaining}s remaining)"
|
||||
else:
|
||||
time_line = f"Time elapsed: {int(elapsed_seconds)}s"
|
||||
_display_text(node_cls, time_line, status=label, price=price)
|
||||
|
||||
|
||||
async def _diagnose_connectivity() -> dict[str, bool]:
|
||||
"""Best-effort connectivity diagnostics to distinguish local vs. server issues."""
|
||||
results = {
|
||||
"internet_accessible": False,
|
||||
"api_accessible": False,
|
||||
"is_local_issue": False,
|
||||
"is_api_issue": False,
|
||||
}
|
||||
timeout = aiohttp.ClientTimeout(total=5.0)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
try:
|
||||
async with session.get("https://www.google.com") as resp:
|
||||
results["internet_accessible"] = resp.status < 500
|
||||
except (ClientError, asyncio.TimeoutError, socket.gaierror):
|
||||
results["is_local_issue"] = True
|
||||
return results
|
||||
|
||||
parsed = urlparse(_default_base_url())
|
||||
health_url = f"{parsed.scheme}://{parsed.netloc}/health"
|
||||
with contextlib.suppress(ClientError, asyncio.TimeoutError):
|
||||
async with session.get(health_url) as resp:
|
||||
results["api_accessible"] = resp.status < 500
|
||||
results["is_api_issue"] = results["internet_accessible"] and not results["api_accessible"]
|
||||
return results
|
||||
|
||||
|
||||
def _unpack_tuple(t: tuple) -> tuple[str, Any, str]:
|
||||
"""Normalize (filename, value, content_type)."""
|
||||
if len(t) == 2:
|
||||
return t[0], t[1], "application/octet-stream"
|
||||
if len(t) == 3:
|
||||
return t[0], t[1], t[2]
|
||||
raise ValueError("files tuple must be (filename, file[, content_type])")
|
||||
|
||||
|
||||
def _join_url(base_url: str, path: str) -> str:
|
||||
return urljoin(base_url.rstrip("/") + "/", path.lstrip("/"))
|
||||
|
||||
|
||||
def _merge_headers(node_cls: type[IO.ComfyNode], endpoint_headers: dict[str, str]) -> dict[str, str]:
|
||||
headers = {"Accept": "*/*"}
|
||||
headers.update(_get_auth_header(node_cls))
|
||||
if endpoint_headers:
|
||||
headers.update(endpoint_headers)
|
||||
return headers
|
||||
|
||||
|
||||
def _merge_params(endpoint_params: dict[str, Any], method: str, data: Optional[dict[str, Any]]) -> dict[str, Any]:
|
||||
params = dict(endpoint_params or {})
|
||||
if method.upper() == "GET" and data:
|
||||
for k, v in data.items():
|
||||
if v is not None:
|
||||
params[k] = v
|
||||
return params
|
||||
|
||||
|
||||
def _friendly_http_message(status: int, body: Any) -> str:
|
||||
if status == 401:
|
||||
return "Unauthorized: Please login first to use this node."
|
||||
if status == 402:
|
||||
return "Payment Required: Please add credits to your account to use this node."
|
||||
if status == 409:
|
||||
return "There is a problem with your account. Please contact support@comfy.org."
|
||||
if status == 429:
|
||||
return "Rate Limit Exceeded: Please try again later."
|
||||
try:
|
||||
if isinstance(body, dict):
|
||||
err = body.get("error")
|
||||
if isinstance(err, dict):
|
||||
msg = err.get("message")
|
||||
typ = err.get("type")
|
||||
if msg and typ:
|
||||
return f"API Error: {msg} (Type: {typ})"
|
||||
if msg:
|
||||
return f"API Error: {msg}"
|
||||
return f"API Error: {json.dumps(body)}"
|
||||
else:
|
||||
txt = str(body)
|
||||
if len(txt) <= 200:
|
||||
return f"API Error (raw): {txt}"
|
||||
return f"API Error (status {status})"
|
||||
except Exception:
|
||||
return f"HTTP {status}: Unknown error"
|
||||
|
||||
|
||||
def _generate_operation_id(method: str, path: str, attempt: int) -> str:
|
||||
slug = path.strip("/").replace("/", "_") or "op"
|
||||
return f"{method}_{slug}_try{attempt}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
|
||||
def _snapshot_request_body_for_logging(
|
||||
content_type: str,
|
||||
method: str,
|
||||
data: Optional[dict[str, Any]],
|
||||
files: Optional[Union[dict[str, Any], list[tuple[str, Any]]]],
|
||||
) -> Optional[Union[dict[str, Any], str]]:
|
||||
if method.upper() == "GET":
|
||||
return None
|
||||
if content_type == "multipart/form-data":
|
||||
form_fields = sorted([k for k, v in (data or {}).items() if v is not None])
|
||||
file_fields: list[dict[str, str]] = []
|
||||
if files:
|
||||
file_iter = files if isinstance(files, list) else list(files.items())
|
||||
for field_name, file_obj in file_iter:
|
||||
if file_obj is None:
|
||||
continue
|
||||
if isinstance(file_obj, tuple):
|
||||
filename = file_obj[0]
|
||||
else:
|
||||
filename = getattr(file_obj, "name", field_name)
|
||||
file_fields.append({"field": field_name, "filename": str(filename or "")})
|
||||
return {"_multipart": True, "form_fields": form_fields, "file_fields": file_fields}
|
||||
if content_type == "application/x-www-form-urlencoded":
|
||||
return data or {}
|
||||
return data or {}
|
||||
|
||||
|
||||
async def _request_base(cfg: _RequestConfig, expect_binary: bool):
|
||||
"""Core request with retries, per-second interruption monitoring, true cancellation, and friendly errors."""
|
||||
url = _join_url(_default_base_url(), cfg.endpoint.path)
|
||||
method = cfg.endpoint.method
|
||||
params = _merge_params(cfg.endpoint.query_params, method, cfg.data if method == "GET" else None)
|
||||
|
||||
async def _monitor(stop_evt: asyncio.Event, start_ts: float):
|
||||
"""Every second: update elapsed time and signal interruption."""
|
||||
try:
|
||||
while not stop_evt.is_set():
|
||||
if _is_processing_interrupted():
|
||||
return
|
||||
if cfg.monitor_progress:
|
||||
_display_time_progress(
|
||||
cfg.node_cls, cfg.wait_label, int(time.monotonic() - start_ts), cfg.estimated_total
|
||||
)
|
||||
await asyncio.sleep(1.0)
|
||||
except asyncio.CancelledError:
|
||||
return # normal shutdown
|
||||
|
||||
start_time = cfg.progress_origin_ts if cfg.progress_origin_ts is not None else time.monotonic()
|
||||
attempt = 0
|
||||
delay = cfg.retry_delay
|
||||
operation_succeeded: bool = False
|
||||
final_elapsed_seconds: Optional[int] = None
|
||||
while True:
|
||||
attempt += 1
|
||||
stop_event = asyncio.Event()
|
||||
monitor_task: Optional[asyncio.Task] = None
|
||||
sess: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
operation_id = _generate_operation_id(method, cfg.endpoint.path, attempt)
|
||||
logging.debug("[DEBUG] HTTP %s %s (attempt %d)", method, url, attempt)
|
||||
|
||||
payload_headers = _merge_headers(cfg.node_cls, cfg.endpoint.headers)
|
||||
payload_kw: dict[str, Any] = {"headers": payload_headers}
|
||||
if method == "GET":
|
||||
payload_headers.pop("Content-Type", None)
|
||||
request_body_log = _snapshot_request_body_for_logging(cfg.content_type, method, cfg.data, cfg.files)
|
||||
try:
|
||||
if cfg.monitor_progress:
|
||||
monitor_task = asyncio.create_task(_monitor(stop_event, start_time))
|
||||
|
||||
timeout = aiohttp.ClientTimeout(total=cfg.timeout)
|
||||
sess = aiohttp.ClientSession(timeout=timeout)
|
||||
|
||||
if cfg.content_type == "multipart/form-data" and method != "GET":
|
||||
# aiohttp will set Content-Type boundary; remove any fixed Content-Type
|
||||
payload_headers.pop("Content-Type", None)
|
||||
if cfg.multipart_parser and cfg.data:
|
||||
form = cfg.multipart_parser(cfg.data)
|
||||
if not isinstance(form, aiohttp.FormData):
|
||||
raise ValueError("multipart_parser must return aiohttp.FormData")
|
||||
else:
|
||||
form = aiohttp.FormData(default_to_multipart=True)
|
||||
if cfg.data:
|
||||
for k, v in cfg.data.items():
|
||||
if v is None:
|
||||
continue
|
||||
form.add_field(k, str(v) if not isinstance(v, (bytes, bytearray)) else v)
|
||||
if cfg.files:
|
||||
file_iter = cfg.files if isinstance(cfg.files, list) else cfg.files.items()
|
||||
for field_name, file_obj in file_iter:
|
||||
if file_obj is None:
|
||||
continue
|
||||
if isinstance(file_obj, tuple):
|
||||
filename, file_value, content_type = _unpack_tuple(file_obj)
|
||||
else:
|
||||
filename = getattr(file_obj, "name", field_name)
|
||||
file_value = file_obj
|
||||
content_type = "application/octet-stream"
|
||||
# Attempt to rewind BytesIO for retries
|
||||
if isinstance(file_value, BytesIO):
|
||||
with contextlib.suppress(Exception):
|
||||
file_value.seek(0)
|
||||
form.add_field(field_name, file_value, filename=filename, content_type=content_type)
|
||||
payload_kw["data"] = form # do not send body on GET
|
||||
elif cfg.content_type == "application/x-www-form-urlencoded" and method != "GET":
|
||||
payload_headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||
payload_kw["data"] = cfg.data or {}
|
||||
elif method != "GET":
|
||||
payload_headers["Content-Type"] = "application/json"
|
||||
payload_kw["json"] = cfg.data or {}
|
||||
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
request_headers=dict(payload_headers) if payload_headers else None,
|
||||
request_params=dict(params) if params else None,
|
||||
request_data=request_body_log,
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] request logging failed: %s", _log_e)
|
||||
|
||||
# Compose the HTTP request coroutine
|
||||
req_coro = sess.request(method, url, params=params, **payload_kw)
|
||||
req_task = asyncio.create_task(req_coro)
|
||||
|
||||
# Race: request vs. monitor (interruption)
|
||||
tasks = {req_task}
|
||||
if monitor_task:
|
||||
tasks.add(monitor_task)
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
|
||||
if monitor_task and monitor_task in done:
|
||||
# Interrupted – cancel the request and abort
|
||||
if req_task in pending:
|
||||
req_task.cancel()
|
||||
raise ProcessingInterrupted("Task cancelled")
|
||||
|
||||
# Otherwise, request finished
|
||||
resp = await req_task
|
||||
async with resp:
|
||||
if resp.status >= 400:
|
||||
try:
|
||||
body = await resp.json()
|
||||
except (ContentTypeError, json.JSONDecodeError):
|
||||
body = await resp.text()
|
||||
# Retryable?
|
||||
if resp.status in _RETRY_STATUS and attempt <= cfg.max_retries:
|
||||
logging.warning(
|
||||
"HTTP %s %s -> %s. Retrying in %.2fs (retry %d of %d).",
|
||||
method,
|
||||
url,
|
||||
resp.status,
|
||||
delay,
|
||||
attempt,
|
||||
cfg.max_retries,
|
||||
)
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=body,
|
||||
error_message=_friendly_http_message(resp.status, body),
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] response logging failed: %s", _log_e)
|
||||
|
||||
await _sleep_with_interrupt(
|
||||
delay,
|
||||
cfg.node_cls,
|
||||
cfg.wait_label if cfg.monitor_progress else None,
|
||||
start_time if cfg.monitor_progress else None,
|
||||
cfg.estimated_total,
|
||||
display_callback=_display_time_progress if cfg.monitor_progress else None,
|
||||
)
|
||||
delay *= cfg.retry_backoff
|
||||
continue
|
||||
msg = _friendly_http_message(resp.status, body)
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=body,
|
||||
error_message=msg,
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] response logging failed: %s", _log_e)
|
||||
raise Exception(msg)
|
||||
|
||||
# Success
|
||||
if expect_binary:
|
||||
# Read stream in chunks so that cancellation is fast when user interrupts
|
||||
buff = bytearray()
|
||||
last_tick = time.monotonic()
|
||||
async for chunk in resp.content.iter_chunked(64 * 1024):
|
||||
buff.extend(chunk)
|
||||
now = time.monotonic()
|
||||
if now - last_tick >= 1.0:
|
||||
last_tick = now
|
||||
if _is_processing_interrupted():
|
||||
raise ProcessingInterrupted("Task cancelled")
|
||||
if cfg.monitor_progress:
|
||||
_display_time_progress(
|
||||
cfg.node_cls, cfg.wait_label, int(now - start_time), cfg.estimated_total
|
||||
)
|
||||
bytes_payload = bytes(buff)
|
||||
operation_succeeded = True
|
||||
final_elapsed_seconds = int(time.monotonic() - start_time)
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=bytes_payload,
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] response logging failed: %s", _log_e)
|
||||
return bytes_payload
|
||||
else:
|
||||
try:
|
||||
payload = await resp.json()
|
||||
response_content_to_log: Any = payload
|
||||
except (ContentTypeError, json.JSONDecodeError):
|
||||
text = await resp.text()
|
||||
try:
|
||||
payload = json.loads(text) if text else {}
|
||||
except json.JSONDecodeError:
|
||||
payload = {"_raw": text}
|
||||
response_content_to_log = payload if isinstance(payload, dict) else text
|
||||
operation_succeeded = True
|
||||
final_elapsed_seconds = int(time.monotonic() - start_time)
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=response_content_to_log,
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] response logging failed: %s", _log_e)
|
||||
return payload
|
||||
|
||||
except ProcessingInterrupted:
|
||||
logging.debug("Polling was interrupted by user")
|
||||
raise
|
||||
except (ClientError, asyncio.TimeoutError, socket.gaierror) as e:
|
||||
# Retry transient connection issues
|
||||
if attempt <= cfg.max_retries:
|
||||
logging.warning(
|
||||
"Connection error calling %s %s. Retrying in %.2fs (%d/%d): %s",
|
||||
method, url, delay, attempt, cfg.max_retries, str(e)
|
||||
)
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
request_headers=dict(payload_headers) if payload_headers else None,
|
||||
request_params=dict(params) if params else None,
|
||||
request_data=request_body_log,
|
||||
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] request error logging failed: %s", _log_e)
|
||||
await _sleep_with_interrupt(
|
||||
delay,
|
||||
cfg.node_cls,
|
||||
cfg.wait_label if cfg.monitor_progress else None,
|
||||
start_time if cfg.monitor_progress else None,
|
||||
cfg.estimated_total,
|
||||
display_callback=_display_time_progress if cfg.monitor_progress else None,
|
||||
)
|
||||
delay *= cfg.retry_backoff
|
||||
continue
|
||||
diag = await _diagnose_connectivity()
|
||||
if diag.get("is_local_issue"):
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
request_headers=dict(payload_headers) if payload_headers else None,
|
||||
request_params=dict(params) if params else None,
|
||||
request_data=request_body_log,
|
||||
error_message=f"LocalNetworkError: {str(e)}",
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] final error logging failed: %s", _log_e)
|
||||
raise LocalNetworkError(
|
||||
"Unable to connect to the API server due to local network issues. "
|
||||
"Please check your internet connection and try again."
|
||||
) from e
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
request_headers=dict(payload_headers) if payload_headers else None,
|
||||
request_params=dict(params) if params else None,
|
||||
request_data=request_body_log,
|
||||
error_message=f"ApiServerError: {str(e)}",
|
||||
)
|
||||
except Exception as _log_e:
|
||||
logging.debug("[DEBUG] final error logging failed: %s", _log_e)
|
||||
raise ApiServerError(
|
||||
f"The API server at {_default_base_url()} is currently unreachable. "
|
||||
f"The service may be experiencing issues."
|
||||
) from e
|
||||
finally:
|
||||
stop_event.set()
|
||||
if monitor_task:
|
||||
monitor_task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
await monitor_task
|
||||
if sess:
|
||||
with contextlib.suppress(Exception):
|
||||
await sess.close()
|
||||
if operation_succeeded and cfg.monitor_progress and cfg.final_label_on_success:
|
||||
_display_time_progress(
|
||||
cfg.node_cls,
|
||||
label=cfg.final_label_on_success,
|
||||
elapsed_seconds=(
|
||||
final_elapsed_seconds
|
||||
if final_elapsed_seconds is not None
|
||||
else int(time.monotonic() - start_time)
|
||||
),
|
||||
estimated_total=cfg.estimated_total,
|
||||
price=None,
|
||||
is_queued=False,
|
||||
processing_elapsed_seconds=final_elapsed_seconds,
|
||||
)
|
||||
|
||||
|
||||
def _validate_or_raise(response_model: Type[M], payload: Any) -> M:
|
||||
try:
|
||||
return response_model.model_validate(payload)
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Response validation failed for %s: %s",
|
||||
getattr(response_model, "__name__", response_model),
|
||||
e,
|
||||
)
|
||||
raise Exception(
|
||||
f"Response validation failed for {getattr(response_model, '__name__', response_model)}: {e}"
|
||||
) from e
|
||||
|
||||
|
||||
def _wrap_model_extractor(
|
||||
response_model: Type[M],
|
||||
extractor: Optional[Callable[[M], Any]],
|
||||
) -> Optional[Callable[[dict[str, Any]], Any]]:
|
||||
"""Wrap a typed extractor so it can be used by the dict-based poller.
|
||||
Validates the dict into `response_model` before invoking `extractor`.
|
||||
Uses a small per-wrapper cache keyed by `id(dict)` to avoid re-validating
|
||||
the same response for multiple extractors in a single poll attempt.
|
||||
"""
|
||||
if extractor is None:
|
||||
return None
|
||||
_cache: dict[int, M] = {}
|
||||
|
||||
def _wrapped(d: dict[str, Any]) -> Any:
|
||||
try:
|
||||
key = id(d)
|
||||
model = _cache.get(key)
|
||||
if model is None:
|
||||
model = response_model.model_validate(d)
|
||||
_cache[key] = model
|
||||
return extractor(model)
|
||||
except Exception as e:
|
||||
logging.error("Extractor failed (typed -> dict wrapper): %s", e)
|
||||
raise
|
||||
|
||||
return _wrapped
|
||||
14
comfy_api_nodes/util/common_exceptions.py
Normal file
14
comfy_api_nodes/util/common_exceptions.py
Normal file
@ -0,0 +1,14 @@
|
||||
class NetworkError(Exception):
|
||||
"""Base exception for network-related errors with diagnostic information."""
|
||||
|
||||
|
||||
class LocalNetworkError(NetworkError):
|
||||
"""Exception raised when local network connectivity issues are detected."""
|
||||
|
||||
|
||||
class ApiServerError(NetworkError):
|
||||
"""Exception raised when the API server is unreachable but internet is working."""
|
||||
|
||||
|
||||
class ProcessingInterrupted(Exception):
|
||||
"""Operation was interrupted by user/runtime via processing_interrupted()."""
|
||||
25
comfy_api_nodes/util/conversions.py
Normal file
25
comfy_api_nodes/util/conversions.py
Normal file
@ -0,0 +1,25 @@
|
||||
from io import BytesIO
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
import torch
|
||||
|
||||
|
||||
def bytesio_to_image_tensor(image_bytesio: BytesIO, mode: str = "RGBA") -> torch.Tensor:
|
||||
"""Converts image data from BytesIO to a torch.Tensor.
|
||||
|
||||
Args:
|
||||
image_bytesio: BytesIO object containing the image data.
|
||||
mode: The PIL mode to convert the image to (e.g., "RGB", "RGBA").
|
||||
|
||||
Returns:
|
||||
A torch.Tensor representing the image (1, H, W, C).
|
||||
|
||||
Raises:
|
||||
PIL.UnidentifiedImageError: If the image data cannot be identified.
|
||||
ValueError: If the specified mode is invalid.
|
||||
"""
|
||||
image = Image.open(image_bytesio)
|
||||
image = image.convert(mode)
|
||||
image_array = np.array(image).astype(np.float32) / 255.0
|
||||
return torch.from_numpy(image_array).unsqueeze(0)
|
||||
246
comfy_api_nodes/util/download_helpers.py
Normal file
246
comfy_api_nodes/util/download_helpers.py
Normal file
@ -0,0 +1,246 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from io import BytesIO
|
||||
from typing import Optional, Union, IO
|
||||
from pathlib import Path
|
||||
|
||||
import aiohttp
|
||||
import torch
|
||||
from aiohttp.client_exceptions import ClientError, ContentTypeError
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from comfy_api_nodes.apis import request_logger
|
||||
|
||||
from ._helpers import _is_processing_interrupted
|
||||
from .common_exceptions import ProcessingInterrupted, LocalNetworkError, ApiServerError
|
||||
from .api_client import _diagnose_connectivity
|
||||
from .conversions import bytesio_to_image_tensor
|
||||
|
||||
|
||||
_RETRY_STATUS = {408, 429, 500, 502, 503, 504}
|
||||
|
||||
|
||||
async def download_url_to_bytesio(
|
||||
url: str,
|
||||
timeout: Optional[float] = None,
|
||||
*,
|
||||
dest: Optional[Union[BytesIO, IO[bytes], str, Path]] = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff: float = 2.0,
|
||||
) -> None:
|
||||
"""Stream-download a URL into memory or to a provided destination.
|
||||
|
||||
Raises:
|
||||
ProcessingInterrupted, LocalNetworkError, ApiServerError, Exception (HTTP and other errors)
|
||||
"""
|
||||
attempt = 0
|
||||
delay = retry_delay
|
||||
|
||||
while True:
|
||||
attempt += 1
|
||||
op_id = _generate_operation_id("GET", url, attempt)
|
||||
timeout_cfg = aiohttp.ClientTimeout(total=timeout)
|
||||
stop_evt = asyncio.Event()
|
||||
|
||||
async def _monitor():
|
||||
try:
|
||||
while not stop_evt.is_set():
|
||||
if _is_processing_interrupted():
|
||||
return
|
||||
await asyncio.sleep(1.0)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
monitor_task: Optional[asyncio.Task] = None
|
||||
sess: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
# Open file path if a path was provided
|
||||
is_path_sink = isinstance(dest, (str, Path))
|
||||
fhandle = None
|
||||
try:
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=op_id,
|
||||
request_method="GET",
|
||||
request_url=url,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.debug("[DEBUG] download request logging failed: %s", e)
|
||||
|
||||
monitor_task = asyncio.create_task(_monitor())
|
||||
sess = aiohttp.ClientSession(timeout=timeout_cfg)
|
||||
req_task = asyncio.create_task(sess.get(url))
|
||||
|
||||
done, pending = await asyncio.wait({req_task, monitor_task}, return_when=asyncio.FIRST_COMPLETED)
|
||||
|
||||
# Interruption wins the race
|
||||
if monitor_task in done and req_task in pending:
|
||||
req_task.cancel()
|
||||
raise ProcessingInterrupted("Task cancelled")
|
||||
|
||||
resp = await req_task
|
||||
async with resp:
|
||||
if resp.status >= 400:
|
||||
# Attempt to capture body for logging (do not log huge binaries)
|
||||
with contextlib.suppress(Exception):
|
||||
try:
|
||||
body = await resp.json()
|
||||
except (ContentTypeError, ValueError):
|
||||
text = await resp.text()
|
||||
body = text if len(text) <= 4096 else f"[text {len(text)} bytes]"
|
||||
request_logger.log_request_response(
|
||||
operation_id=op_id,
|
||||
request_method="GET",
|
||||
request_url=url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=body,
|
||||
error_message=f"HTTP {resp.status}",
|
||||
)
|
||||
|
||||
if resp.status in _RETRY_STATUS and attempt <= max_retries:
|
||||
await _sleep_with_cancel(delay)
|
||||
delay *= retry_backoff
|
||||
continue
|
||||
raise Exception(f"Failed to download (HTTP {resp.status}).")
|
||||
|
||||
# Prepare path sink if needed
|
||||
if is_path_sink:
|
||||
p = Path(str(dest))
|
||||
with contextlib.suppress(Exception):
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
fhandle = open(p, "wb")
|
||||
sink = fhandle
|
||||
else:
|
||||
sink = dest # BytesIO or file-like
|
||||
|
||||
# Stream body in chunks to sink with cancellation checks
|
||||
written = 0
|
||||
last_tick = time.monotonic()
|
||||
async for chunk in resp.content.iter_chunked(1024 * 1024):
|
||||
sink.write(chunk)
|
||||
written += len(chunk)
|
||||
now = time.monotonic()
|
||||
if now - last_tick >= 1.0:
|
||||
last_tick = now
|
||||
if _is_processing_interrupted():
|
||||
raise ProcessingInterrupted("Task cancelled")
|
||||
|
||||
if isinstance(dest, BytesIO):
|
||||
dest.seek(0)
|
||||
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=op_id,
|
||||
request_method="GET",
|
||||
request_url=url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=f"[streamed {written} bytes to dest]",
|
||||
)
|
||||
except Exception as e:
|
||||
logging.debug("[DEBUG] download response logging failed: %s", e)
|
||||
return
|
||||
except ProcessingInterrupted:
|
||||
logging.debug("Download was interrupted by user")
|
||||
raise
|
||||
except (ClientError, asyncio.TimeoutError) as e:
|
||||
if attempt <= max_retries:
|
||||
with contextlib.suppress(Exception):
|
||||
request_logger.log_request_response(
|
||||
operation_id=op_id,
|
||||
request_method="GET",
|
||||
request_url=url,
|
||||
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
|
||||
)
|
||||
await _sleep_with_cancel(delay)
|
||||
delay *= retry_backoff
|
||||
continue
|
||||
|
||||
diag = await _diagnose_connectivity()
|
||||
if diag.get("is_local_issue"):
|
||||
raise LocalNetworkError(
|
||||
"Unable to connect to the network. Please check your internet connection and try again."
|
||||
) from e
|
||||
raise ApiServerError("The remote service appears unreachable at this time.") from e
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
if fhandle:
|
||||
fhandle.flush()
|
||||
fhandle.close()
|
||||
stop_evt.set()
|
||||
if monitor_task:
|
||||
monitor_task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
await monitor_task
|
||||
if sess:
|
||||
with contextlib.suppress(Exception):
|
||||
await sess.close()
|
||||
|
||||
|
||||
async def download_url_to_image_tensor(
|
||||
url: str,
|
||||
timeout: int = None,
|
||||
auth_kwargs: Optional[dict[str, str]] = None,
|
||||
*,
|
||||
dest: Optional[Union[BytesIO, IO[bytes], str, Path]] = None,
|
||||
mode: str = "RGBA",
|
||||
) -> torch.Tensor:
|
||||
"""
|
||||
Download image and decode to tensor. Supports streaming `dest` like util version.
|
||||
"""
|
||||
if dest is None:
|
||||
bio = await download_url_to_bytesio(url, timeout, auth_kwargs, dest=None)
|
||||
return bytesio_to_image_tensor(bio, mode=mode) # type: ignore[arg-type]
|
||||
|
||||
await download_url_to_bytesio(url, timeout, auth_kwargs, dest=dest)
|
||||
|
||||
if isinstance(dest, BytesIO):
|
||||
with contextlib.suppress(Exception):
|
||||
dest.seek(0)
|
||||
return bytesio_to_image_tensor(dest, mode=mode)
|
||||
|
||||
if hasattr(dest, "read") and hasattr(dest, "seek"):
|
||||
try:
|
||||
with contextlib.suppress(Exception):
|
||||
dest.flush()
|
||||
dest.seek(0)
|
||||
data = dest.read()
|
||||
return bytesio_to_image_tensor(BytesIO(data), mode=mode)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if isinstance(dest, (str, Path)) or getattr(dest, "name", None):
|
||||
path_str = str(dest if isinstance(dest, (str, Path)) else getattr(dest, "name"))
|
||||
with open(path_str, "rb") as f:
|
||||
return bytesio_to_image_tensor(BytesIO(f.read()), mode=mode)
|
||||
|
||||
raise ValueError(
|
||||
"Destination is not readable and no path is available to decode the image. "
|
||||
"Pass dest=None to decode from memory, or provide a readable handle / path."
|
||||
)
|
||||
|
||||
|
||||
def _generate_operation_id(method: str, url: str, attempt: int) -> str:
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
slug = (parsed.path.rsplit("/", 1)[-1] or parsed.netloc or "download").strip("/").replace("/", "_")
|
||||
except Exception:
|
||||
slug = "download"
|
||||
return f"{method}_{slug}_try{attempt}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
|
||||
async def _sleep_with_cancel(seconds: float) -> None:
|
||||
"""Sleep in 1s slices while checking for interruption."""
|
||||
end = time.monotonic() + seconds
|
||||
while True:
|
||||
if _is_processing_interrupted():
|
||||
raise ProcessingInterrupted("Task cancelled")
|
||||
now = time.monotonic()
|
||||
if now >= end:
|
||||
return
|
||||
await asyncio.sleep(min(1.0, end - now))
|
||||
272
comfy_api_nodes/util/upload_helpers.py
Normal file
272
comfy_api_nodes/util/upload_helpers.py
Normal file
@ -0,0 +1,272 @@
|
||||
import uuid
|
||||
import asyncio
|
||||
import contextlib
|
||||
from io import BytesIO
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional, Union
|
||||
|
||||
import aiohttp
|
||||
import torch
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from comfy_api.latest import IO
|
||||
from urllib.parse import urlparse
|
||||
from .api_client import (
|
||||
ApiEndpoint,
|
||||
sync_op_pydantic,
|
||||
_display_time_progress,
|
||||
_diagnose_connectivity,
|
||||
)
|
||||
|
||||
from comfy_api_nodes.apis import request_logger
|
||||
from comfy_api_nodes.apinode_utils import tensor_to_bytesio
|
||||
from ._helpers import _sleep_with_interrupt, _is_processing_interrupted
|
||||
from .common_exceptions import ProcessingInterrupted, LocalNetworkError, ApiServerError
|
||||
|
||||
|
||||
class UploadRequest(BaseModel):
|
||||
file_name: str = Field(..., description="Filename to upload")
|
||||
content_type: Optional[str] = Field(
|
||||
None,
|
||||
description="Mime type of the file. For example: image/png, image/jpeg, video/mp4, etc.",
|
||||
)
|
||||
|
||||
|
||||
class UploadResponse(BaseModel):
|
||||
download_url: str = Field(..., description="URL to GET uploaded file")
|
||||
upload_url: str = Field(..., description="URL to PUT file to upload")
|
||||
|
||||
|
||||
async def upload_images_to_comfyapi(
|
||||
cls: type[IO.ComfyNode],
|
||||
image: torch.Tensor,
|
||||
*,
|
||||
max_images: int = 8,
|
||||
mime_type: Optional[str] = None,
|
||||
wait_label: Optional[str] = "Uploading",
|
||||
) -> list[str]:
|
||||
"""
|
||||
Uploads images to ComfyUI API and returns download URLs.
|
||||
To upload multiple images, stack them in the batch dimension first.
|
||||
"""
|
||||
# if batch, try to upload each file if max_images is greater than 0
|
||||
download_urls: list[str] = []
|
||||
is_batch = len(image.shape) > 3
|
||||
batch_len = image.shape[0] if is_batch else 1
|
||||
|
||||
for idx in range(min(batch_len, max_images)):
|
||||
tensor = image[idx] if is_batch else image
|
||||
img_io = tensor_to_bytesio(tensor, mime_type=mime_type)
|
||||
url = await upload_file_to_comfyapi(cls, img_io, img_io.name, mime_type, wait_label)
|
||||
download_urls.append(url)
|
||||
return download_urls
|
||||
|
||||
|
||||
async def upload_file_to_comfyapi(
|
||||
cls: type[IO.ComfyNode],
|
||||
file_bytes_io: BytesIO,
|
||||
filename: str,
|
||||
upload_mime_type: Optional[str],
|
||||
wait_label: Optional[str] = "Uploading",
|
||||
) -> str:
|
||||
"""Uploads a single file to ComfyUI API and returns its download URL."""
|
||||
if upload_mime_type is None:
|
||||
request_object = UploadRequest(file_name=filename)
|
||||
else:
|
||||
request_object = UploadRequest(file_name=filename, content_type=upload_mime_type)
|
||||
create_resp = await sync_op_pydantic(
|
||||
cls,
|
||||
endpoint=ApiEndpoint(path="/customers/storage", method="POST"),
|
||||
data=request_object,
|
||||
response_model=UploadResponse,
|
||||
final_label_on_success=None,
|
||||
monitor_progress=False,
|
||||
)
|
||||
await upload_file(
|
||||
cls, create_resp.upload_url,
|
||||
file_bytes_io,
|
||||
content_type=upload_mime_type,
|
||||
wait_label=wait_label,
|
||||
)
|
||||
return create_resp.download_url
|
||||
|
||||
|
||||
async def upload_file(
|
||||
cls: type[IO.ComfyNode],
|
||||
upload_url: str,
|
||||
file: Union[BytesIO, str],
|
||||
*,
|
||||
content_type: Optional[str] = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff: float = 2.0,
|
||||
wait_label: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Upload a file to a signed URL (e.g., S3 pre-signed PUT) with retries, Comfy progress display, and interruption.
|
||||
|
||||
Args:
|
||||
cls: Node class (provides auth context + UI progress hooks).
|
||||
upload_url: Pre-signed PUT URL.
|
||||
file: BytesIO or path string.
|
||||
content_type: Explicit MIME type. If None, we *suppress* Content-Type.
|
||||
max_retries: Maximum retry attempts.
|
||||
retry_delay: Initial delay in seconds.
|
||||
retry_backoff: Exponential backoff factor.
|
||||
wait_label: Progress label shown in Comfy UI.
|
||||
|
||||
Raises:
|
||||
ProcessingInterrupted, LocalNetworkError, ApiServerError, Exception
|
||||
"""
|
||||
if isinstance(file, BytesIO):
|
||||
with contextlib.suppress(Exception):
|
||||
file.seek(0)
|
||||
data = file.read()
|
||||
elif isinstance(file, str):
|
||||
with open(file, "rb") as f:
|
||||
data = f.read()
|
||||
else:
|
||||
raise ValueError("file must be a BytesIO or a filesystem path string")
|
||||
|
||||
headers: dict[str, str] = {}
|
||||
skip_auto_headers: set[str] = set()
|
||||
if content_type:
|
||||
headers["Content-Type"] = content_type
|
||||
else:
|
||||
skip_auto_headers.add("Content-Type") # Don't let aiohttp add Content-Type, it can break the signed request
|
||||
|
||||
attempt = 0
|
||||
delay = retry_delay
|
||||
start_ts = time.monotonic()
|
||||
op_uuid = uuid.uuid4().hex[:8]
|
||||
while True:
|
||||
attempt += 1
|
||||
operation_id = _generate_operation_id("PUT", upload_url, attempt, op_uuid)
|
||||
timeout = aiohttp.ClientTimeout(total=None)
|
||||
stop_evt = asyncio.Event()
|
||||
|
||||
async def _monitor():
|
||||
try:
|
||||
while not stop_evt.is_set():
|
||||
if _is_processing_interrupted():
|
||||
return
|
||||
if wait_label:
|
||||
_display_time_progress(cls, wait_label, int(time.monotonic() - start_ts), None)
|
||||
await asyncio.sleep(1.0)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
|
||||
monitor_task = asyncio.create_task(_monitor())
|
||||
sess: Optional[aiohttp.ClientSession] = None
|
||||
try:
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT",
|
||||
request_url=upload_url,
|
||||
request_headers=headers or None,
|
||||
request_params=None,
|
||||
request_data=f"[File data {len(data)} bytes]",
|
||||
)
|
||||
except Exception as e:
|
||||
logging.debug("[DEBUG] upload request logging failed: %s", e)
|
||||
|
||||
sess = aiohttp.ClientSession(timeout=timeout)
|
||||
req = sess.put(upload_url, data=data, headers=headers, skip_auto_headers=skip_auto_headers)
|
||||
req_task = asyncio.create_task(req)
|
||||
|
||||
done, pending = await asyncio.wait({req_task, monitor_task}, return_when=asyncio.FIRST_COMPLETED)
|
||||
|
||||
if monitor_task in done and req_task in pending:
|
||||
req_task.cancel()
|
||||
raise ProcessingInterrupted("Upload cancelled")
|
||||
|
||||
resp = await req_task
|
||||
async with resp:
|
||||
if resp.status >= 400:
|
||||
with contextlib.suppress(Exception):
|
||||
try:
|
||||
body = await resp.json()
|
||||
except Exception:
|
||||
body = await resp.text()
|
||||
msg = f"Upload failed with status {resp.status}"
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT",
|
||||
request_url=upload_url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content=body,
|
||||
error_message=msg,
|
||||
)
|
||||
if resp.status in {408, 429, 500, 502, 503, 504} and attempt <= max_retries:
|
||||
await _sleep_with_interrupt(
|
||||
delay,
|
||||
cls,
|
||||
wait_label,
|
||||
start_ts,
|
||||
None,
|
||||
display_callback=_display_time_progress if wait_label else None,
|
||||
)
|
||||
delay *= retry_backoff
|
||||
continue
|
||||
raise Exception(f"Failed to upload (HTTP {resp.status}).")
|
||||
try:
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT",
|
||||
request_url=upload_url,
|
||||
response_status_code=resp.status,
|
||||
response_headers=dict(resp.headers),
|
||||
response_content="File uploaded successfully.",
|
||||
)
|
||||
except Exception as e:
|
||||
logging.debug("[DEBUG] upload response logging failed: %s", e)
|
||||
return
|
||||
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||||
if attempt <= max_retries:
|
||||
with contextlib.suppress(Exception):
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT",
|
||||
request_url=upload_url,
|
||||
request_headers=headers or None,
|
||||
request_data=f"[File data {len(data)} bytes]",
|
||||
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
|
||||
)
|
||||
await _sleep_with_interrupt(
|
||||
delay,
|
||||
cls,
|
||||
wait_label,
|
||||
start_ts,
|
||||
None,
|
||||
display_callback=_display_time_progress if wait_label else None,
|
||||
)
|
||||
delay *= retry_backoff
|
||||
continue
|
||||
|
||||
diag = await _diagnose_connectivity()
|
||||
if diag.get("is_local_issue"):
|
||||
raise LocalNetworkError(
|
||||
"Unable to connect to the network. Please check your internet connection and try again."
|
||||
) from e
|
||||
raise ApiServerError("The API service appears unreachable at this time.") from e
|
||||
finally:
|
||||
stop_evt.set()
|
||||
if monitor_task:
|
||||
monitor_task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
await monitor_task
|
||||
if sess:
|
||||
with contextlib.suppress(Exception):
|
||||
await sess.close()
|
||||
|
||||
|
||||
def _generate_operation_id(method: str, url: str, attempt: int, op_uuid: str) -> str:
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
slug = (parsed.path.rsplit("/", 1)[-1] or parsed.netloc or "upload").strip("/").replace("/", "_")
|
||||
except Exception:
|
||||
slug = "upload"
|
||||
return f"{method}_{slug}_{op_uuid}_try{attempt}"
|
||||
@ -50,6 +50,8 @@ messages_control.disable = [
|
||||
"too-many-branches",
|
||||
"too-many-locals",
|
||||
"too-many-arguments",
|
||||
"too-many-return-statements",
|
||||
"too-many-nested-blocks",
|
||||
"duplicate-code",
|
||||
"abstract-method",
|
||||
"superfluous-parens",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user