ComfyUI/comfy_extras/nodes_hunyuan.py

673 lines
32 KiB
Python

import nodes
import node_helpers
import torch
import comfy.model_management
import comfy.utils
import comfy.clip_vision
from typing_extensions import override
from comfy_api.latest import ComfyExtension, io
from comfy.ldm.hunyuan_video.upsampler import HunyuanVideo15SRModel
from comfy.ldm.lightricks.latent_upsampler import LatentUpsampler
import folder_paths
import json
class CLIPTextEncodeHunyuanDiT(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="CLIPTextEncodeHunyuanDiT",
category="advanced/conditioning",
inputs=[
io.Clip.Input("clip"),
io.String.Input("bert", multiline=True, dynamic_prompts=True),
io.String.Input("mt5xl", multiline=True, dynamic_prompts=True),
],
outputs=[
io.Conditioning.Output(),
],
)
@classmethod
def execute(cls, clip, bert, mt5xl) -> io.NodeOutput:
tokens = clip.tokenize(bert)
tokens["mt5xl"] = clip.tokenize(mt5xl)["mt5xl"]
return io.NodeOutput(clip.encode_from_tokens_scheduled(tokens))
encode = execute # TODO: remove
class EmptyHunyuanLatentVideo(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="EmptyHunyuanLatentVideo",
display_name="Empty HunyuanVideo 1.0 Latent",
category="latent/video",
inputs=[
io.Int.Input("width", default=848, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("height", default=480, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("length", default=25, min=1, max=nodes.MAX_RESOLUTION, step=4),
io.Int.Input("batch_size", default=1, min=1, max=4096),
],
outputs=[
io.Latent.Output(),
],
)
@classmethod
def execute(cls, width, height, length, batch_size=1) -> io.NodeOutput:
latent = torch.zeros([batch_size, 16, ((length - 1) // 4) + 1, height // 8, width // 8], device=comfy.model_management.intermediate_device())
return io.NodeOutput({"samples": latent, "downscale_ratio_spacial": 8})
generate = execute # TODO: remove
class EmptyHunyuanVideo15Latent(EmptyHunyuanLatentVideo):
@classmethod
def define_schema(cls):
schema = super().define_schema()
schema.node_id = "EmptyHunyuanVideo15Latent"
schema.display_name = "Empty HunyuanVideo 1.5 Latent"
return schema
@classmethod
def execute(cls, width, height, length, batch_size=1) -> io.NodeOutput:
# Using scale factor of 16 instead of 8
latent = torch.zeros([batch_size, 32, ((length - 1) // 4) + 1, height // 16, width // 16], device=comfy.model_management.intermediate_device())
return io.NodeOutput({"samples": latent, "downscale_ratio_spacial": 16})
class HunyuanVideo15ImageToVideo(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanVideo15ImageToVideo",
category="conditioning/video_models",
inputs=[
io.Conditioning.Input("positive"),
io.Conditioning.Input("negative"),
io.Vae.Input("vae"),
io.Int.Input("width", default=848, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("height", default=480, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("length", default=33, min=1, max=nodes.MAX_RESOLUTION, step=4),
io.Int.Input("batch_size", default=1, min=1, max=4096),
io.Image.Input("start_image", optional=True),
io.ClipVisionOutput.Input("clip_vision_output", optional=True),
],
outputs=[
io.Conditioning.Output(display_name="positive"),
io.Conditioning.Output(display_name="negative"),
io.Latent.Output(display_name="latent"),
],
)
@classmethod
def execute(cls, positive, negative, vae, width, height, length, batch_size, start_image=None, clip_vision_output=None) -> io.NodeOutput:
latent = torch.zeros([batch_size, 32, ((length - 1) // 4) + 1, height // 16, width // 16], device=comfy.model_management.intermediate_device())
if start_image is not None:
start_image = comfy.utils.common_upscale(start_image[:length].movedim(-1, 1), width, height, "bilinear", "center").movedim(1, -1)
encoded = vae.encode(start_image[:, :, :, :3])
concat_latent_image = torch.zeros((latent.shape[0], 32, latent.shape[2], latent.shape[3], latent.shape[4]), device=comfy.model_management.intermediate_device())
concat_latent_image[:, :, :encoded.shape[2], :, :] = encoded
mask = torch.ones((1, 1, latent.shape[2], concat_latent_image.shape[-2], concat_latent_image.shape[-1]), device=start_image.device, dtype=start_image.dtype)
mask[:, :, :((start_image.shape[0] - 1) // 4) + 1] = 0.0
positive = node_helpers.conditioning_set_values(positive, {"concat_latent_image": concat_latent_image, "concat_mask": mask})
negative = node_helpers.conditioning_set_values(negative, {"concat_latent_image": concat_latent_image, "concat_mask": mask})
if clip_vision_output is not None:
positive = node_helpers.conditioning_set_values(positive, {"clip_vision_output": clip_vision_output})
negative = node_helpers.conditioning_set_values(negative, {"clip_vision_output": clip_vision_output})
out_latent = {}
out_latent["samples"] = latent
return io.NodeOutput(positive, negative, out_latent)
class HunyuanVideo15SuperResolution(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanVideo15SuperResolution",
inputs=[
io.Conditioning.Input("positive"),
io.Conditioning.Input("negative"),
io.Vae.Input("vae", optional=True),
io.Image.Input("start_image", optional=True),
io.ClipVisionOutput.Input("clip_vision_output", optional=True),
io.Latent.Input("latent"),
io.Float.Input("noise_augmentation", default=0.70, min=0.0, max=1.0, step=0.01, advanced=True),
],
outputs=[
io.Conditioning.Output(display_name="positive"),
io.Conditioning.Output(display_name="negative"),
io.Latent.Output(display_name="latent"),
],
)
@classmethod
def execute(cls, positive, negative, latent, noise_augmentation, vae=None, start_image=None, clip_vision_output=None) -> io.NodeOutput:
in_latent = latent["samples"]
in_channels = in_latent.shape[1]
cond_latent = torch.zeros([in_latent.shape[0], in_channels * 2 + 2, in_latent.shape[-3], in_latent.shape[-2], in_latent.shape[-1]], device=comfy.model_management.intermediate_device())
cond_latent[:, in_channels + 1 : 2 * in_channels + 1] = in_latent
cond_latent[:, 2 * in_channels + 1] = 1
if start_image is not None:
start_image = comfy.utils.common_upscale(start_image.movedim(-1, 1), in_latent.shape[-1] * 16, in_latent.shape[-2] * 16, "bilinear", "center").movedim(1, -1)
encoded = vae.encode(start_image[:, :, :, :3])
cond_latent[:, :in_channels, :encoded.shape[2], :, :] = encoded
cond_latent[:, in_channels + 1, 0] = 1
positive = node_helpers.conditioning_set_values(positive, {"concat_latent_image": cond_latent, "noise_augmentation": noise_augmentation})
negative = node_helpers.conditioning_set_values(negative, {"concat_latent_image": cond_latent, "noise_augmentation": noise_augmentation})
if clip_vision_output is not None:
positive = node_helpers.conditioning_set_values(positive, {"clip_vision_output": clip_vision_output})
negative = node_helpers.conditioning_set_values(negative, {"clip_vision_output": clip_vision_output})
return io.NodeOutput(positive, negative, latent)
class LatentUpscaleModelLoader(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="LatentUpscaleModelLoader",
display_name="Load Latent Upscale Model",
category="loaders",
inputs=[
io.Combo.Input("model_name", options=folder_paths.get_filename_list("latent_upscale_models")),
],
outputs=[
io.LatentUpscaleModel.Output(),
],
)
@classmethod
def execute(cls, model_name) -> io.NodeOutput:
model_path = folder_paths.get_full_path_or_raise("latent_upscale_models", model_name)
sd, metadata = comfy.utils.load_torch_file(model_path, safe_load=True, return_metadata=True)
if "blocks.0.block.0.conv.weight" in sd:
config = {
"in_channels": sd["in_conv.conv.weight"].shape[1],
"out_channels": sd["out_conv.conv.weight"].shape[0],
"hidden_channels": sd["in_conv.conv.weight"].shape[0],
"num_blocks": len([k for k in sd.keys() if k.startswith("blocks.") and k.endswith(".block.0.conv.weight")]),
"global_residual": False,
}
model_type = "720p"
model = HunyuanVideo15SRModel(model_type, config)
model.load_sd(sd)
elif "up.0.block.0.conv1.conv.weight" in sd:
sd = {key.replace("nin_shortcut", "nin_shortcut.conv", 1): value for key, value in sd.items()}
config = {
"z_channels": sd["conv_in.conv.weight"].shape[1],
"out_channels": sd["conv_out.conv.weight"].shape[0],
"block_out_channels": tuple(sd[f"up.{i}.block.0.conv1.conv.weight"].shape[0] for i in range(len([k for k in sd.keys() if k.startswith("up.") and k.endswith(".block.0.conv1.conv.weight")]))),
}
model_type = "1080p"
model = HunyuanVideo15SRModel(model_type, config)
model.load_sd(sd)
elif "post_upsample_res_blocks.0.conv2.bias" in sd:
config = json.loads(metadata["config"])
model = LatentUpsampler.from_config(config).to(dtype=comfy.model_management.vae_dtype(allowed_dtypes=[torch.bfloat16, torch.float32]))
model.load_state_dict(sd)
return io.NodeOutput(model)
class HunyuanVideo15LatentUpscaleWithModel(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanVideo15LatentUpscaleWithModel",
display_name="Hunyuan Video 15 Latent Upscale With Model",
category="latent",
inputs=[
io.LatentUpscaleModel.Input("model"),
io.Latent.Input("samples"),
io.Combo.Input("upscale_method", options=["nearest-exact", "bilinear", "area", "bicubic", "bislerp"], default="bilinear"),
io.Int.Input("width", default=1280, min=0, max=16384, step=8),
io.Int.Input("height", default=720, min=0, max=16384, step=8),
io.Combo.Input("crop", options=["disabled", "center"]),
],
outputs=[
io.Latent.Output(),
],
)
@classmethod
def execute(cls, model, samples, upscale_method, width, height, crop) -> io.NodeOutput:
if width == 0 and height == 0:
return io.NodeOutput(samples)
else:
if width == 0:
height = max(64, height)
width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2]))
elif height == 0:
width = max(64, width)
height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1]))
else:
width = max(64, width)
height = max(64, height)
s = comfy.utils.common_upscale(samples["samples"], width // 16, height // 16, upscale_method, crop)
s = model.resample_latent(s)
return io.NodeOutput({"samples": s.cpu().float()})
PROMPT_TEMPLATE_ENCODE_VIDEO_I2V = (
"<|start_header_id|>system<|end_header_id|>\n\n<image>\nDescribe the video by detailing the following aspects according to the reference image: "
"1. The main content and theme of the video."
"2. The color, shape, size, texture, quantity, text, and spatial relationships of the objects."
"3. Actions, events, behaviors temporal relationships, physical movement changes of the objects."
"4. background environment, light, style and atmosphere."
"5. camera angles, movements, and transitions used in the video:<|eot_id|>\n\n"
"<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|>"
"<|start_header_id|>assistant<|end_header_id|>\n\n"
)
class TextEncodeHunyuanVideo_ImageToVideo(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="TextEncodeHunyuanVideo_ImageToVideo",
category="advanced/conditioning",
inputs=[
io.Clip.Input("clip"),
io.ClipVisionOutput.Input("clip_vision_output"),
io.String.Input("prompt", multiline=True, dynamic_prompts=True),
io.Int.Input(
"image_interleave",
default=2,
min=1,
max=512,
tooltip="How much the image influences things vs the text prompt. Higher number means more influence from the text prompt.",
advanced=True,
),
],
outputs=[
io.Conditioning.Output(),
],
)
@classmethod
def execute(cls, clip, clip_vision_output, prompt, image_interleave) -> io.NodeOutput:
tokens = clip.tokenize(prompt, llama_template=PROMPT_TEMPLATE_ENCODE_VIDEO_I2V, image_embeds=clip_vision_output.mm_projected, image_interleave=image_interleave)
return io.NodeOutput(clip.encode_from_tokens_scheduled(tokens))
encode = execute # TODO: remove
class TextEncodeHunyuanVideo15Omni(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="TextEncodeHunyuanVideo15Omni",
display_name="Text Encode HunyuanVideo 15 Omni",
category="advanced/conditioning",
inputs=[
io.Clip.Input("clip"),
io.String.Input("prompt", multiline=True, dynamic_prompts=True),
io.Combo.Input("task", options=["t2v", "i2v", "interpolation", "reference2v", "editing", "tiv2v"], default="t2v"),
io.Boolean.Input("use_visual_inputs", default=True, advanced=True),
io.Int.Input("max_visual_inputs", default=8, min=1, max=64, advanced=True),
io.ClipVisionOutput.Input("clip_vision_output", optional=True),
],
outputs=[
io.Conditioning.Output(),
],
)
@staticmethod
def _task_system_prompt(task: str) -> str:
prompts = {
"t2v": "Describe a high-quality target video from the user's request with concrete scene details, motion, camera behavior, and style.",
"i2v": "Describe a target video that should stay consistent with the provided reference image while following the user's request.",
"interpolation": "Describe a target video that smoothly transitions between the provided keyframe images while following the user's request.",
"reference2v": "Describe a target video that composes the provided reference subjects into a coherent scene following the user's request.",
"editing": "Describe an edited output video that follows the user's instruction while preserving relevant source video content.",
"tiv2v": "Describe an edited output video using both the provided source video and reference image guidance according to the user's instruction.",
}
return prompts.get(task, prompts["t2v"])
@classmethod
def _build_template(cls, task: str, image_count: int) -> str:
system_prompt = cls._task_system_prompt(task)
visual_tokens = "<|vision_start|><|image_pad|><|vision_end|>\n" * image_count
return (
"<|im_start|>system\n"
f"{system_prompt}"
"<|im_end|>\n"
"<|im_start|>user\n"
f"{visual_tokens}" + "{}<|im_end|>\n"
"<|im_start|>assistant\n"
)
@staticmethod
def _extract_image_embeds(clip_vision_output, max_visual_inputs: int):
if clip_vision_output is None:
return []
mm_projected = getattr(clip_vision_output, "mm_projected", None)
if mm_projected is None:
return []
if mm_projected.ndim == 2:
return [mm_projected]
count = min(mm_projected.shape[0], max_visual_inputs)
return [mm_projected[i] for i in range(count)]
@classmethod
def execute(cls, clip, prompt, task, use_visual_inputs, max_visual_inputs, clip_vision_output=None) -> io.NodeOutput:
image_embeds = cls._extract_image_embeds(clip_vision_output, max_visual_inputs) if use_visual_inputs else []
template = cls._build_template(task, len(image_embeds))
# HunyuanVideo 1.5 tokenizers use `images=...`; HunyuanVideo 1.0 uses `image_embeds=...`.
try:
tokens = clip.tokenize(prompt, llama_template=template, images=image_embeds)
except TypeError:
embeds = None
if len(image_embeds) > 0:
embeds = torch.stack(image_embeds, dim=0)
tokens = clip.tokenize(prompt, llama_template=template, image_embeds=embeds, image_interleave=1)
return io.NodeOutput(clip.encode_from_tokens_scheduled(tokens))
encode = execute # TODO: remove
class HunyuanClipVisionOutputConcat(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanClipVisionOutputConcat",
display_name="Hunyuan CLIP Vision Output Concat",
category="conditioning/video_models",
inputs=[
io.ClipVisionOutput.Input("clip_vision_output_1"),
io.ClipVisionOutput.Input("clip_vision_output_2", optional=True),
io.ClipVisionOutput.Input("clip_vision_output_3", optional=True),
io.ClipVisionOutput.Input("clip_vision_output_4", optional=True),
],
outputs=[
io.ClipVisionOutput.Output(),
],
)
@classmethod
def execute(cls, clip_vision_output_1, clip_vision_output_2=None, clip_vision_output_3=None, clip_vision_output_4=None) -> io.NodeOutput:
outputs = [o for o in (clip_vision_output_1, clip_vision_output_2, clip_vision_output_3, clip_vision_output_4) if o is not None]
merged = comfy.clip_vision.Output()
tensor_attrs = ["last_hidden_state", "image_embeds", "penultimate_hidden_states", "all_hidden_states", "mm_projected"]
for attr in tensor_attrs:
values = [getattr(o, attr) for o in outputs if hasattr(o, attr)]
if len(values) > 0 and torch.is_tensor(values[0]):
setattr(merged, attr, torch.cat(values, dim=0))
image_sizes = []
for o in outputs:
if hasattr(o, "image_sizes"):
image_sizes.extend(getattr(o, "image_sizes"))
if len(image_sizes) > 0:
merged.image_sizes = image_sizes
return io.NodeOutput(merged)
class HunyuanVideo15OmniConditioning(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanVideo15OmniConditioning",
display_name="HunyuanVideo 15 Omni Conditioning",
category="conditioning/video_models",
inputs=[
io.Conditioning.Input("positive"),
io.Conditioning.Input("negative"),
io.Vae.Input("vae"),
io.Combo.Input("task", options=["t2v", "i2v", "interpolation", "reference2v", "editing", "tiv2v"], default="t2v"),
io.Int.Input("width", default=848, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("height", default=480, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("length", default=81, min=1, max=nodes.MAX_RESOLUTION, step=4),
io.Int.Input("batch_size", default=1, min=1, max=4096),
io.Image.Input("reference_images", optional=True, tooltip="For i2v/interpolation/reference2v/tiv2v."),
io.Image.Input("condition_video", optional=True, tooltip="For editing/tiv2v."),
io.ClipVisionOutput.Input("clip_vision_output", optional=True),
],
outputs=[
io.Conditioning.Output(display_name="positive"),
io.Conditioning.Output(display_name="negative"),
io.Latent.Output(display_name="latent"),
],
)
@staticmethod
def _latent_length(length: int) -> int:
return ((length - 1) // 4) + 1
@staticmethod
def _upscale_frames(frames: torch.Tensor, width: int, height: int):
return comfy.utils.common_upscale(frames.movedim(-1, 1), width, height, "bilinear", "center").movedim(1, -1)
@classmethod
def _encode_single_image(cls, vae, image: torch.Tensor, width: int, height: int):
upscaled = cls._upscale_frames(image[:1], width, height)
return vae.encode(upscaled[:, :, :, :3])
@classmethod
def _encode_video(cls, vae, video: torch.Tensor, width: int, height: int, length: int):
upscaled = cls._upscale_frames(video[:length], width, height)
return vae.encode(upscaled[:, :, :, :3])
@staticmethod
def _assign_frame(target: torch.Tensor, source: torch.Tensor, frame_idx: int):
if frame_idx < 0 or frame_idx >= target.shape[2]:
return
target[:, :, frame_idx:frame_idx + 1] = source[:, :, :1]
@classmethod
def execute(cls, positive, negative, vae, task, width, height, length, batch_size, reference_images=None, condition_video=None, clip_vision_output=None) -> io.NodeOutput:
latent_length = cls._latent_length(length)
latent = torch.zeros([batch_size, 32, latent_length, height // 16, width // 16], device=comfy.model_management.intermediate_device())
if task == "t2v":
if clip_vision_output is not None:
positive = node_helpers.conditioning_set_values(positive, {"clip_vision_output": clip_vision_output})
negative = node_helpers.conditioning_set_values(negative, {"clip_vision_output": clip_vision_output})
return io.NodeOutput(positive, negative, {"samples": latent})
cond_latent = torch.zeros_like(latent[:1])
omni_mask = torch.zeros((latent_length,), device=cond_latent.device, dtype=cond_latent.dtype)
if task == "i2v":
if reference_images is None or reference_images.shape[0] < 1:
raise ValueError("Task i2v requires at least one reference image.")
encoded = cls._encode_single_image(vae, reference_images, width, height)
cls._assign_frame(cond_latent, encoded, 0)
omni_mask[0] = 1.0
elif task == "interpolation":
if reference_images is None or reference_images.shape[0] < 2:
raise ValueError("Task interpolation requires at least two reference images.")
encoded_first = cls._encode_single_image(vae, reference_images[:1], width, height)
encoded_last = cls._encode_single_image(vae, reference_images[-1:], width, height)
cls._assign_frame(cond_latent, encoded_first, 0)
cls._assign_frame(cond_latent, encoded_last, latent_length - 1)
omni_mask[0] = 1.0
omni_mask[-1] = 1.0
elif task == "reference2v":
if reference_images is None or reference_images.shape[0] < 1:
raise ValueError("Task reference2v requires at least one reference image.")
num_refs = min(reference_images.shape[0], max(1, latent_length - 1))
for idx in range(num_refs):
encoded = cls._encode_single_image(vae, reference_images[idx:idx + 1], width, height)
frame_idx = min(idx + 1, latent_length - 1)
cls._assign_frame(cond_latent, encoded, frame_idx)
omni_mask[frame_idx] = 1.0
elif task == "editing":
if condition_video is None or condition_video.shape[0] < 1:
raise ValueError("Task editing requires condition_video.")
encoded = cls._encode_video(vae, condition_video, width, height, length)
valid_frames = min(latent_length, encoded.shape[2])
cond_latent[:, :, :valid_frames] = encoded[:, :, :valid_frames]
omni_mask[:valid_frames] = 1.0
elif task == "tiv2v":
if condition_video is None or condition_video.shape[0] < 1:
raise ValueError("Task tiv2v requires condition_video.")
if reference_images is None or reference_images.shape[0] < 1:
raise ValueError("Task tiv2v requires at least one reference image.")
encoded_video = cls._encode_video(vae, condition_video, width, height, length)
valid_frames = min(latent_length, encoded_video.shape[2])
cond_latent[:, :, :valid_frames] = encoded_video[:, :, :valid_frames]
omni_mask[:valid_frames] = 1.0
encoded_ref = cls._encode_single_image(vae, reference_images[:1], width, height)
ref_idx = 1 if latent_length > 1 else 0
cond_latent[:, :, ref_idx:ref_idx + 1] += encoded_ref[:, :, :1]
omni_mask[ref_idx] += 1.0
cond_latent = comfy.utils.resize_to_batch_size(cond_latent, batch_size)
# BaseModel/HunyuanVideo15 inverts concat_mask (mask = 1 - concat_mask), so pass the pre-inverted mask.
concat_mask = (1.0 - omni_mask).view(1, 1, latent_length, 1, 1).expand(cond_latent.shape[0], 1, latent_length, cond_latent.shape[-2], cond_latent.shape[-1]).to(cond_latent.dtype)
positive = node_helpers.conditioning_set_values(positive, {"concat_latent_image": cond_latent, "concat_mask": concat_mask})
negative = node_helpers.conditioning_set_values(negative, {"concat_latent_image": cond_latent, "concat_mask": concat_mask})
if clip_vision_output is not None:
positive = node_helpers.conditioning_set_values(positive, {"clip_vision_output": clip_vision_output})
negative = node_helpers.conditioning_set_values(negative, {"clip_vision_output": clip_vision_output})
return io.NodeOutput(positive, negative, {"samples": latent})
class HunyuanImageToVideo(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanImageToVideo",
category="conditioning/video_models",
inputs=[
io.Conditioning.Input("positive"),
io.Vae.Input("vae"),
io.Int.Input("width", default=848, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("height", default=480, min=16, max=nodes.MAX_RESOLUTION, step=16),
io.Int.Input("length", default=53, min=1, max=nodes.MAX_RESOLUTION, step=4),
io.Int.Input("batch_size", default=1, min=1, max=4096),
io.Combo.Input("guidance_type", options=["v1 (concat)", "v2 (replace)", "custom"], advanced=True),
io.Image.Input("start_image", optional=True),
],
outputs=[
io.Conditioning.Output(display_name="positive"),
io.Latent.Output(display_name="latent"),
],
)
@classmethod
def execute(cls, positive, vae, width, height, length, batch_size, guidance_type, start_image=None) -> io.NodeOutput:
latent = torch.zeros([batch_size, 16, ((length - 1) // 4) + 1, height // 8, width // 8], device=comfy.model_management.intermediate_device())
out_latent = {}
if start_image is not None:
start_image = comfy.utils.common_upscale(start_image[:length, :, :, :3].movedim(-1, 1), width, height, "bilinear", "center").movedim(1, -1)
concat_latent_image = vae.encode(start_image)
mask = torch.ones((1, 1, latent.shape[2], concat_latent_image.shape[-2], concat_latent_image.shape[-1]), device=start_image.device, dtype=start_image.dtype)
mask[:, :, :((start_image.shape[0] - 1) // 4) + 1] = 0.0
if guidance_type == "v1 (concat)":
cond = {"concat_latent_image": concat_latent_image, "concat_mask": mask}
elif guidance_type == "v2 (replace)":
cond = {'guiding_frame_index': 0}
latent[:, :, :concat_latent_image.shape[2]] = concat_latent_image
out_latent["noise_mask"] = mask
elif guidance_type == "custom":
cond = {"ref_latent": concat_latent_image}
positive = node_helpers.conditioning_set_values(positive, cond)
out_latent["samples"] = latent
return io.NodeOutput(positive, out_latent)
encode = execute # TODO: remove
class EmptyHunyuanImageLatent(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="EmptyHunyuanImageLatent",
category="latent",
inputs=[
io.Int.Input("width", default=2048, min=64, max=nodes.MAX_RESOLUTION, step=32),
io.Int.Input("height", default=2048, min=64, max=nodes.MAX_RESOLUTION, step=32),
io.Int.Input("batch_size", default=1, min=1, max=4096),
],
outputs=[
io.Latent.Output(),
],
)
@classmethod
def execute(cls, width, height, batch_size=1) -> io.NodeOutput:
latent = torch.zeros([batch_size, 64, height // 32, width // 32], device=comfy.model_management.intermediate_device())
return io.NodeOutput({"samples":latent})
generate = execute # TODO: remove
class HunyuanRefinerLatent(io.ComfyNode):
@classmethod
def define_schema(cls):
return io.Schema(
node_id="HunyuanRefinerLatent",
inputs=[
io.Conditioning.Input("positive"),
io.Conditioning.Input("negative"),
io.Latent.Input("latent"),
io.Float.Input("noise_augmentation", default=0.10, min=0.0, max=1.0, step=0.01, advanced=True),
],
outputs=[
io.Conditioning.Output(display_name="positive"),
io.Conditioning.Output(display_name="negative"),
io.Latent.Output(display_name="latent"),
],
)
@classmethod
def execute(cls, positive, negative, latent, noise_augmentation) -> io.NodeOutput:
latent = latent["samples"]
positive = node_helpers.conditioning_set_values(positive, {"concat_latent_image": latent, "noise_augmentation": noise_augmentation})
negative = node_helpers.conditioning_set_values(negative, {"concat_latent_image": latent, "noise_augmentation": noise_augmentation})
out_latent = {}
out_latent["samples"] = torch.zeros([latent.shape[0], 32, latent.shape[-3], latent.shape[-2], latent.shape[-1]], device=comfy.model_management.intermediate_device())
return io.NodeOutput(positive, negative, out_latent)
class HunyuanExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[io.ComfyNode]]:
return [
CLIPTextEncodeHunyuanDiT,
TextEncodeHunyuanVideo_ImageToVideo,
TextEncodeHunyuanVideo15Omni,
HunyuanClipVisionOutputConcat,
EmptyHunyuanLatentVideo,
EmptyHunyuanVideo15Latent,
HunyuanVideo15ImageToVideo,
HunyuanVideo15OmniConditioning,
HunyuanVideo15SuperResolution,
HunyuanVideo15LatentUpscaleWithModel,
LatentUpscaleModelLoader,
HunyuanImageToVideo,
EmptyHunyuanImageLatent,
HunyuanRefinerLatent,
]
async def comfy_entrypoint() -> HunyuanExtension:
return HunyuanExtension()