Compare commits

...

5 Commits

Author SHA1 Message Date
Miklos Nagy
531ce3d2c7
Merge 09c250184d into c4a14df9a3 2026-01-21 11:18:53 +08:00
Mylo
c4a14df9a3
Dynamically detect chroma radiance patch size (#11991)
Some checks are pending
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Waiting to run
Execution Tests / test (macos-latest) (push) Waiting to run
Execution Tests / test (ubuntu-latest) (push) Waiting to run
Execution Tests / test (windows-latest) (push) Waiting to run
Test server launches without errors / test (push) Waiting to run
Unit Tests / test (macos-latest) (push) Waiting to run
Unit Tests / test (ubuntu-latest) (push) Waiting to run
Unit Tests / test (windows-2022) (push) Waiting to run
2026-01-20 18:46:11 -05:00
Ivan Zorin
965d0ed509
fix: remove normalization of audio in LTX Mel spectrogram creation (#11990)
For LTX Audio VAE, remove normalization of audio during MEL spectrogram creation.
This aligs inference with training and prevents loud audio from being attenuated.
2026-01-20 18:44:28 -05:00
Alexander Piskun
ddc541ffda
feat(api-nodes): add WaveSpeed nodes (#11945) 2026-01-20 13:05:40 -08:00
Miklos Nagy
09c250184d
Update graph.py 2025-11-12 15:14:45 +01:00
5 changed files with 332 additions and 60 deletions

View File

@ -103,20 +103,10 @@ class AudioPreprocessor:
return waveform
return torchaudio.functional.resample(waveform, source_rate, self.target_sample_rate)
@staticmethod
def normalize_amplitude(
waveform: torch.Tensor, max_amplitude: float = 0.5, eps: float = 1e-5
) -> torch.Tensor:
waveform = waveform - waveform.mean(dim=2, keepdim=True)
peak = torch.max(torch.abs(waveform)) + eps
scale = peak.clamp(max=max_amplitude) / peak
return waveform * scale
def waveform_to_mel(
self, waveform: torch.Tensor, waveform_sample_rate: int, device
) -> torch.Tensor:
waveform = self.resample(waveform, waveform_sample_rate)
waveform = self.normalize_amplitude(waveform)
mel_transform = torchaudio.transforms.MelSpectrogram(
sample_rate=self.target_sample_rate,

View File

@ -253,7 +253,7 @@ def detect_unet_config(state_dict, key_prefix, metadata=None):
dit_config["image_model"] = "chroma_radiance"
dit_config["in_channels"] = 3
dit_config["out_channels"] = 3
dit_config["patch_size"] = 16
dit_config["patch_size"] = state_dict.get('{}img_in_patch.weight'.format(key_prefix)).size(dim=-1)
dit_config["nerf_hidden_size"] = 64
dit_config["nerf_mlp_ratio"] = 4
dit_config["nerf_depth"] = 4

View File

@ -0,0 +1,35 @@
from pydantic import BaseModel, Field
class SeedVR2ImageRequest(BaseModel):
image: str = Field(...)
target_resolution: str = Field(...)
output_format: str = Field("png")
enable_sync_mode: bool = Field(False)
class FlashVSRRequest(BaseModel):
target_resolution: str = Field(...)
video: str = Field(...)
duration: float = Field(...)
class TaskCreatedDataResponse(BaseModel):
id: str = Field(...)
class TaskCreatedResponse(BaseModel):
code: int = Field(...)
message: str = Field(...)
data: TaskCreatedDataResponse | None = Field(None)
class TaskResultDataResponse(BaseModel):
status: str = Field(...)
outputs: list[str] = Field([])
class TaskResultResponse(BaseModel):
code: int = Field(...)
message: str = Field(...)
data: TaskResultDataResponse | None = Field(None)

View File

@ -0,0 +1,178 @@
from typing_extensions import override
from comfy_api.latest import IO, ComfyExtension, Input
from comfy_api_nodes.apis.wavespeed import (
FlashVSRRequest,
TaskCreatedResponse,
TaskResultResponse,
SeedVR2ImageRequest,
)
from comfy_api_nodes.util import (
ApiEndpoint,
download_url_to_video_output,
poll_op,
sync_op,
upload_video_to_comfyapi,
validate_container_format_is_mp4,
validate_video_duration,
upload_images_to_comfyapi,
get_number_of_images,
download_url_to_image_tensor,
)
class WavespeedFlashVSRNode(IO.ComfyNode):
@classmethod
def define_schema(cls):
return IO.Schema(
node_id="WavespeedFlashVSRNode",
display_name="FlashVSR Video Upscale",
category="api node/video/WaveSpeed",
description="Fast, high-quality video upscaler that "
"boosts resolution and restores clarity for low-resolution or blurry footage.",
inputs=[
IO.Video.Input("video"),
IO.Combo.Input("target_resolution", options=["720p", "1080p", "2K", "4K"]),
],
outputs=[
IO.Video.Output(),
],
hidden=[
IO.Hidden.auth_token_comfy_org,
IO.Hidden.api_key_comfy_org,
IO.Hidden.unique_id,
],
is_api_node=True,
price_badge=IO.PriceBadge(
depends_on=IO.PriceBadgeDepends(widgets=["target_resolution"]),
expr="""
(
$price_for_1sec := {"720p": 0.012, "1080p": 0.018, "2k": 0.024, "4k": 0.032};
{
"type":"usd",
"usd": $lookup($price_for_1sec, widgets.target_resolution),
"format":{"suffix": "/second", "approximate": true}
}
)
""",
),
)
@classmethod
async def execute(
cls,
video: Input.Video,
target_resolution: str,
) -> IO.NodeOutput:
validate_container_format_is_mp4(video)
validate_video_duration(video, min_duration=5, max_duration=60 * 10)
initial_res = await sync_op(
cls,
ApiEndpoint(path="/proxy/wavespeed/api/v3/wavespeed-ai/flashvsr", method="POST"),
response_model=TaskCreatedResponse,
data=FlashVSRRequest(
target_resolution=target_resolution.lower(),
video=await upload_video_to_comfyapi(cls, video),
duration=video.get_duration(),
),
)
if initial_res.code != 200:
raise ValueError(f"Task creation fails with code={initial_res.code} and message={initial_res.message}")
final_response = await poll_op(
cls,
ApiEndpoint(path=f"/proxy/wavespeed/api/v3/predictions/{initial_res.data.id}/result"),
response_model=TaskResultResponse,
status_extractor=lambda x: "failed" if x.data is None else x.data.status,
poll_interval=10.0,
max_poll_attempts=480,
)
if final_response.code != 200:
raise ValueError(
f"Task processing failed with code={final_response.code} and message={final_response.message}"
)
return IO.NodeOutput(await download_url_to_video_output(final_response.data.outputs[0]))
class WavespeedImageUpscaleNode(IO.ComfyNode):
@classmethod
def define_schema(cls):
return IO.Schema(
node_id="WavespeedImageUpscaleNode",
display_name="WaveSpeed Image Upscale",
category="api node/image/WaveSpeed",
description="Boost image resolution and quality, upscaling photos to 4K or 8K for sharp, detailed results.",
inputs=[
IO.Combo.Input("model", options=["SeedVR2", "Ultimate"]),
IO.Image.Input("image"),
IO.Combo.Input("target_resolution", options=["2K", "4K", "8K"]),
],
outputs=[
IO.Image.Output(),
],
hidden=[
IO.Hidden.auth_token_comfy_org,
IO.Hidden.api_key_comfy_org,
IO.Hidden.unique_id,
],
is_api_node=True,
price_badge=IO.PriceBadge(
depends_on=IO.PriceBadgeDepends(widgets=["model"]),
expr="""
(
$prices := {"seedvr2": 0.01, "ultimate": 0.06};
{"type":"usd", "usd": $lookup($prices, widgets.model)}
)
""",
),
)
@classmethod
async def execute(
cls,
model: str,
image: Input.Image,
target_resolution: str,
) -> IO.NodeOutput:
if get_number_of_images(image) != 1:
raise ValueError("Exactly one input image is required.")
if model == "SeedVR2":
model_path = "seedvr2/image"
else:
model_path = "ultimate-image-upscaler"
initial_res = await sync_op(
cls,
ApiEndpoint(path=f"/proxy/wavespeed/api/v3/wavespeed-ai/{model_path}", method="POST"),
response_model=TaskCreatedResponse,
data=SeedVR2ImageRequest(
target_resolution=target_resolution.lower(),
image=(await upload_images_to_comfyapi(cls, image, max_images=1))[0],
),
)
if initial_res.code != 200:
raise ValueError(f"Task creation fails with code={initial_res.code} and message={initial_res.message}")
final_response = await poll_op(
cls,
ApiEndpoint(path=f"/proxy/wavespeed/api/v3/predictions/{initial_res.data.id}/result"),
response_model=TaskResultResponse,
status_extractor=lambda x: "failed" if x.data is None else x.data.status,
poll_interval=10.0,
max_poll_attempts=480,
)
if final_response.code != 200:
raise ValueError(
f"Task processing failed with code={final_response.code} and message={final_response.message}"
)
return IO.NodeOutput(await download_url_to_image_tensor(final_response.data.outputs[0]))
class WavespeedExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[IO.ComfyNode]]:
return [
WavespeedFlashVSRNode,
WavespeedImageUpscaleNode,
]
async def comfy_entrypoint() -> WavespeedExtension:
return WavespeedExtension()

View File

@ -1,6 +1,12 @@
from __future__ import annotations
from typing import Type, Literal
# graph.py — grouped/batched scheduler on top of the updated ExecutionList
# Implements model-class batching to reduce device/context swaps while preserving
# the new execution_cache behavior added upstream.
from __future__ import annotations
from typing import Type, Literal, Optional
import os
import nodes
import asyncio
import inspect
@ -10,15 +16,19 @@ from comfy.comfy_types.node_typing import ComfyNodeABC, InputTypeDict, InputType
# NOTE: ExecutionBlocker code got moved to graph_utils.py to prevent torch being imported too soon during unit tests
ExecutionBlocker = ExecutionBlocker
class DependencyCycleError(Exception):
pass
class NodeInputError(Exception):
pass
class NodeNotFoundError(Exception):
pass
class DynamicPrompt:
def __init__(self, original_prompt):
# The original prompt provided by the user
@ -62,6 +72,7 @@ class DynamicPrompt:
def get_original_prompt(self):
return self.original_prompt
def get_input_info(
class_def: Type[ComfyNodeABC],
input_name: str,
@ -104,12 +115,13 @@ def get_input_info(
# input_type = IO.Combo.io_type
return input_type, input_category, extra_info
class TopologicalSort:
def __init__(self, dynprompt):
self.dynprompt = dynprompt
self.pendingNodes = {}
self.blockCount = {} # Number of nodes this node is directly blocked by
self.blocking = {} # Which nodes are blocked by this node
self.blockCount = {} # Number of nodes this node is directly blocked by
self.blocking = {} # Which nodes are blocked by this node
self.externalBlocks = 0
self.unblockedEvent = asyncio.Event()
@ -170,6 +182,7 @@ class TopologicalSort:
assert node_id in self.blockCount, "Can't add external block to a node that isn't pending"
self.externalBlocks += 1
self.blockCount[node_id] += 1
def unblock():
self.externalBlocks -= 1
self.blockCount[node_id] -= 1
@ -191,18 +204,31 @@ class TopologicalSort:
def is_empty(self):
return len(self.pendingNodes) == 0
class ExecutionList(TopologicalSort):
"""
ExecutionList implements a topological dissolve of the graph. After a node is staged for execution,
it can still be returned to the graph after having further dependencies added.
ExecutionList implements a topological dissolve of the graph with batching.
After a node is staged for execution, it can still be returned to the graph
after having further dependencies added.
Batching: we favor running nodes of the same class_type back-to-back
to reduce device/context thrash (e.g., model swaps). Within a batch we still
apply UX-friendly priorities (output/async early, VAEDecodepreview, etc.).
"""
def __init__(self, dynprompt, output_cache):
super().__init__(dynprompt)
self.output_cache = output_cache
self.staged_node_id = None
self.staged_node_id: Optional[str] = None
# Upstream execution cache (kept intact)
self.execution_cache = {}
self.execution_cache_listeners = {}
# Batching state
self._current_group_class: Optional[str] = None
# ----------------------------- cache ---------------------------------
def is_cached(self, node_id):
return self.output_cache.get(node_id) is not None
@ -220,7 +246,7 @@ class ExecutionList(TopologicalSort):
value = self.execution_cache[to_node_id].get(from_node_id)
if value is None:
return None
#Write back to the main cache on touch.
# Write back to the main cache on touch.
self.output_cache.set(from_node_id, value)
return value
@ -234,16 +260,93 @@ class ExecutionList(TopologicalSort):
super().add_strong_link(from_node_id, from_socket, to_node_id)
self.cache_link(from_node_id, to_node_id)
# --------------------------- group utils ------------------------------
def _pick_largest_group(self, node_list):
"""Return the class_type with the most representatives in node_list.
Ties are resolved deterministically by class name."""
counts = {}
for nid in node_list:
ctype = self.dynprompt.get_node(nid)["class_type"]
counts[ctype] = counts.get(ctype, 0) + 1
# max by (count, class_name) for deterministic tie-break
return max(counts.items(), key=lambda kv: (kv[1], kv[0]))[0]
def _filter_by_group(self, node_list, group_cls):
"""Keep only nodes that belong to the given class."""
return [nid for nid in node_list if self.dynprompt.get_node(nid)["class_type"] == group_cls]
# ------------------------- node classification ------------------------
def _is_output(self, node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
return getattr(class_def, 'OUTPUT_NODE', False) is True
def _is_async(self, node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
return inspect.iscoroutinefunction(getattr(class_def, class_def.FUNCTION))
# ------------------------- UX within a batch --------------------------
def _pick_in_batch_with_ux(self, candidates):
"""
Original UX heuristics, but applied *within* the current batch.
"""
# 1) Output nodes ASAP
for nid in candidates:
if self._is_output(nid):
return nid
# 1b) Async nodes early to overlap
for nid in candidates:
if self._is_async(nid):
return nid
# 2) decoder-before-preview pattern (within the batch)
for nid in candidates:
for blocked in self.blocking[nid]:
if self._is_output(blocked):
return nid
# 3) VAELoader -> VAEDecode -> preview (within the batch)
for nid in candidates:
for blocked in self.blocking[nid]:
for blocked2 in self.blocking[blocked]:
if self._is_output(blocked2):
return nid
# 4) Otherwise, first candidate
return candidates[0]
# ------------------------- batch-aware picking ------------------------
def ux_friendly_pick_node(self, available):
"""
Choose which ready node to execute next, honoring the current batch.
When the current batch runs dry, switch to the largest ready group.
"""
# Ensure current batch is still present; otherwise pick a new largest group.
has_current = (
self._current_group_class is not None and
any(self.dynprompt.get_node(nid)["class_type"] == self._current_group_class for nid in available)
)
if not has_current:
new_group = self._pick_largest_group(available)
self._current_group_class = new_group
# Restrict to nodes of the current batch
candidates = self._filter_by_group(available, self._current_group_class)
return self._pick_in_batch_with_ux(candidates)
# --------------------------- staging / run ----------------------------
async def stage_node_execution(self):
assert self.staged_node_id is None
if self.is_empty():
return None, None, None
available = self.get_ready_nodes()
# If nothing ready but there are external blockers, wait for unblocks.
while len(available) == 0 and self.externalBlocks > 0:
# Wait for an external block to be released
await self.unblockedEvent.wait()
self.unblockedEvent.clear()
available = self.get_ready_nodes()
if len(available) == 0:
cycled_nodes = self.get_nodes_in_cycle()
# Because cycles composed entirely of static nodes are caught during initial validation,
@ -264,64 +367,30 @@ class ExecutionList(TopologicalSort):
}
return None, error_details, ex
# Batch-aware pick
self.staged_node_id = self.ux_friendly_pick_node(available)
return self.staged_node_id, None, None
def ux_friendly_pick_node(self, node_list):
# If an output node is available, do that first.
# Technically this has no effect on the overall length of execution, but it feels better as a user
# for a PreviewImage to display a result as soon as it can
# Some other heuristics could probably be used here to improve the UX further.
def is_output(node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
if hasattr(class_def, 'OUTPUT_NODE') and class_def.OUTPUT_NODE == True:
return True
return False
# If an available node is async, do that first.
# This will execute the asynchronous function earlier, reducing the overall time.
def is_async(node_id):
class_type = self.dynprompt.get_node(node_id)["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
return inspect.iscoroutinefunction(getattr(class_def, class_def.FUNCTION))
for node_id in node_list:
if is_output(node_id) or is_async(node_id):
return node_id
#This should handle the VAEDecode -> preview case
for node_id in node_list:
for blocked_node_id in self.blocking[node_id]:
if is_output(blocked_node_id):
return node_id
#This should handle the VAELoader -> VAEDecode -> preview case
for node_id in node_list:
for blocked_node_id in self.blocking[node_id]:
for blocked_node_id1 in self.blocking[blocked_node_id]:
if is_output(blocked_node_id1):
return node_id
#TODO: this function should be improved
return node_list[0]
def unstage_node_execution(self):
# If a node execution resolves to PENDING, return it to the pool
# but keep the current batch so we continue batching next time.
assert self.staged_node_id is not None
self.staged_node_id = None
def complete_node_execution(self):
node_id = self.staged_node_id
self.pop_node(node_id)
# Maintain current batch; it will switch automatically when empty.
self.execution_cache.pop(node_id, None)
self.execution_cache_listeners.pop(node_id, None)
self.staged_node_id = None
# ------------------------- cycle detection ----------------------------
def get_nodes_in_cycle(self):
# We'll dissolve the graph in reverse topological order to leave only the nodes in the cycle.
# We're skipping some of the performance optimizations from the original TopologicalSort to keep
# the code simple (and because having a cycle in the first place is a catastrophic error)
blocked_by = { node_id: {} for node_id in self.pendingNodes }
blocked_by = {node_id: {} for node_id in self.pendingNodes}
for from_node_id in self.blocking:
for to_node_id in self.blocking[from_node_id]:
if True in self.blocking[from_node_id][to_node_id].values():