mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-09 05:40:49 +08:00
2251 lines
88 KiB
Python
2251 lines
88 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import math
|
|
import os
|
|
import random
|
|
|
|
import numpy as np
|
|
import safetensors.torch
|
|
import torch
|
|
from PIL import Image, ImageOps, ImageSequence
|
|
from PIL.PngImagePlugin import PngInfo
|
|
from huggingface_hub import snapshot_download
|
|
from natsort import natsorted
|
|
|
|
from comfy_api.latest import io
|
|
from .. import clip_vision as clip_vision_module
|
|
from .. import controlnet
|
|
from .. import diffusers_load
|
|
from .. import model_management
|
|
from .. import node_helpers
|
|
from .. import sample
|
|
from .. import samplers
|
|
from .. import sd
|
|
from .. import utils
|
|
from ..cli_args import args
|
|
from ..cmd import folder_paths, latent_preview
|
|
from ..comfy_types import IO, ComfyNodeABC, InputTypeDict, FileLocator
|
|
from ..component_model.deprecation import _deprecate_method
|
|
from ..component_model.images_types import ImageMaskTuple
|
|
from ..component_model.tensor_types import RGBImage, RGBImageBatch, MaskBatch, RGBAImageBatch, Latent
|
|
from ..execution_context import current_execution_context
|
|
from ..images import open_image
|
|
from ..interruption import interrupt_current_processing
|
|
from ..ldm.flux.weight_dtypes import FLUX_WEIGHT_DTYPES
|
|
from ..model_downloader import get_filename_list_with_downloadable, get_full_path_or_raise, KNOWN_CHECKPOINTS, \
|
|
KNOWN_CLIP_VISION_MODELS, KNOWN_GLIGEN_MODELS, KNOWN_UNCLIP_CHECKPOINTS, KNOWN_LORAS, KNOWN_CONTROLNETS, \
|
|
KNOWN_DIFF_CONTROLNETS, KNOWN_VAES, KNOWN_APPROX_VAES, get_huggingface_repo_list, KNOWN_CLIP_MODELS, \
|
|
KNOWN_UNET_MODELS
|
|
from ..nodes.common import MAX_RESOLUTION
|
|
from ..open_exr import load_exr
|
|
from ..sd import VAE
|
|
from ..utils import comfy_tqdm
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@_deprecate_method(version="0.2.3", message="Use interrupt_current_processing from comfy.interruption")
|
|
def interrupt_processing(value=True):
|
|
interrupt_current_processing(value)
|
|
|
|
|
|
class CLIPTextEncode(ComfyNodeABC):
|
|
@classmethod
|
|
def INPUT_TYPES(s) -> InputTypeDict:
|
|
return {
|
|
"required": {
|
|
"text": (IO.STRING, {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}),
|
|
"clip": (IO.CLIP, {"tooltip": "The CLIP model used for encoding the text."})
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = (IO.CONDITIONING,)
|
|
OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images."
|
|
|
|
def encode(self, clip, text):
|
|
if clip is None:
|
|
raise RuntimeError("ERROR: clip input is invalid: None\n\nIf the clip is from a checkpoint loader node your checkpoint does not contain a valid clip or text encoder model.")
|
|
tokens = clip.tokenize(text)
|
|
return (clip.encode_from_tokens_scheduled(tokens),)
|
|
|
|
|
|
class ConditioningCombine:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_1": ("CONDITIONING",), "conditioning_2": ("CONDITIONING",)}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "combine"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def combine(self, conditioning_1, conditioning_2):
|
|
return (conditioning_1 + conditioning_2,)
|
|
|
|
|
|
class ConditioningAverage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING",), "conditioning_from": ("CONDITIONING",),
|
|
"conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01})
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "addWeighted"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logger.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
pooled_output_from = conditioning_from[0][1].get("pooled_output", None)
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from)
|
|
t0 = cond_from[:, :t1.shape[1]]
|
|
if t0.shape[1] < t1.shape[1]:
|
|
t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1)
|
|
|
|
tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength))
|
|
t_to = conditioning_to[i][1].copy()
|
|
if pooled_output_from is not None and pooled_output_to is not None:
|
|
t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength))
|
|
elif pooled_output_from is not None:
|
|
t_to["pooled_output"] = pooled_output_from
|
|
|
|
n = [tw, t_to]
|
|
out.append(n)
|
|
return (out,)
|
|
|
|
|
|
class ConditioningConcat:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"conditioning_to": ("CONDITIONING",),
|
|
"conditioning_from": ("CONDITIONING",),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "concat"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def concat(self, conditioning_to, conditioning_from):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logger.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
tw = torch.cat((t1, cond_from), 1)
|
|
n = [tw, conditioning_to[i][1].copy()]
|
|
out.append(n)
|
|
|
|
return (out,)
|
|
|
|
|
|
class ConditioningSetArea:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c,)
|
|
|
|
|
|
class ConditioningSetAreaPercentage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c,)
|
|
|
|
|
|
class ConditioningSetAreaStrength:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"strength": strength})
|
|
return (c,)
|
|
|
|
|
|
class ConditioningSetMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"mask": ("MASK",),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"set_cond_area": (["default", "mask bounds"],),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, mask, set_cond_area, strength):
|
|
set_area_to_bounds = False
|
|
if set_cond_area != "default":
|
|
set_area_to_bounds = True
|
|
if len(mask.shape) < 3:
|
|
mask = mask.unsqueeze(0)
|
|
|
|
c = node_helpers.conditioning_set_values(conditioning, {"mask": mask,
|
|
"set_area_to_bounds": set_area_to_bounds,
|
|
"mask_strength": strength})
|
|
return (c,)
|
|
|
|
|
|
class ConditioningZeroOut:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",)}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "zero_out"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def zero_out(self, conditioning):
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
pooled_output = d.get("pooled_output", None)
|
|
if pooled_output is not None:
|
|
d["pooled_output"] = torch.zeros_like(pooled_output)
|
|
conditioning_lyrics = d.get("conditioning_lyrics", None)
|
|
if conditioning_lyrics is not None:
|
|
d["conditioning_lyrics"] = torch.zeros_like(conditioning_lyrics)
|
|
n = [torch.zeros_like(t[0]), d]
|
|
c.append(n)
|
|
return (c,)
|
|
|
|
|
|
class ConditioningSetTimestepRange:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "set_range"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def set_range(self, conditioning, start, end):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start,
|
|
"end_percent": end})
|
|
return (c,)
|
|
|
|
|
|
class VAEDecode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"samples": ("LATENT", {"tooltip": "The latent to be decoded."}),
|
|
"vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."})
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
OUTPUT_TOOLTIPS = ("The decoded image.",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "latent"
|
|
DESCRIPTION = "Decodes latent images back into pixel space images."
|
|
|
|
def decode(self, vae, samples):
|
|
if samples is None:
|
|
return None,
|
|
images = vae.decode(samples["samples"])
|
|
if len(images.shape) == 5: # Combine batches
|
|
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
|
|
return (images,)
|
|
|
|
|
|
class VAEDecodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",), "vae": ("VAE",),
|
|
"tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 32}),
|
|
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}),
|
|
}, "optional": {
|
|
"temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to decode at a time."}),
|
|
"temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def decode(self, vae, samples, tile_size, overlap=64, temporal_size=64, temporal_overlap=8):
|
|
if samples is None:
|
|
return None,
|
|
if tile_size < overlap * 4:
|
|
overlap = tile_size // 4
|
|
if temporal_size < temporal_overlap * 2:
|
|
temporal_overlap = temporal_overlap // 2
|
|
temporal_compression = vae.temporal_compression_decode()
|
|
if temporal_compression is not None:
|
|
temporal_size = max(2, temporal_size // temporal_compression)
|
|
temporal_overlap = max(1, min(temporal_size // 2, temporal_overlap // temporal_compression))
|
|
else:
|
|
temporal_size = None
|
|
temporal_overlap = None
|
|
|
|
compression = vae.spacial_compression_decode()
|
|
images = vae.decode_tiled(samples["samples"], tile_x=tile_size // compression, tile_y=tile_size // compression, overlap=overlap // compression, tile_t=temporal_size, overlap_t=temporal_overlap)
|
|
if len(images.shape) == 5: # Combine batches
|
|
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
|
|
return (images,)
|
|
|
|
|
|
class VAEEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"pixels": ("IMAGE",), "vae": ("VAE",)}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def encode(self, vae: VAE, pixels) -> tuple[Optional[Latent]]:
|
|
if pixels is None:
|
|
return None,
|
|
t = vae.encode(pixels)
|
|
return (Latent(**{"samples": t}),)
|
|
|
|
class VAEEncodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"pixels": ("IMAGE",), "vae": ("VAE",),
|
|
"tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}),
|
|
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}),
|
|
"temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to encode at a time."}),
|
|
"temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def encode(self, vae, pixels, tile_size, overlap, temporal_size=64, temporal_overlap=8) -> tuple[Optional[Latent]]:
|
|
if pixels is None:
|
|
return None,
|
|
t = vae.encode_tiled(pixels, tile_x=tile_size, tile_y=tile_size, overlap=overlap, tile_t=temporal_size, overlap_t=temporal_overlap)
|
|
return (Latent(**{"samples": t}),)
|
|
|
|
class VAEEncodeForInpaint:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"pixels": ("IMAGE",), "vae": ("VAE",), "mask": ("MASK",), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}), }}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def encode(self, vae, pixels, mask, grow_mask_by=6) -> tuple[Optional[Latent]]:
|
|
if pixels is None:
|
|
return None,
|
|
x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio
|
|
y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
pixels = pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2
|
|
y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2
|
|
pixels = pixels[:, x_offset:x + x_offset, y_offset:y + y_offset, :]
|
|
mask = mask[:, :, x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
# grow mask by a few pixels to keep things seamless in latent space
|
|
if grow_mask_by == 0:
|
|
mask_erosion = mask
|
|
else:
|
|
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
|
|
padding = math.ceil((grow_mask_by - 1) / 2)
|
|
|
|
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1)
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:, :, :, i] -= 0.5
|
|
pixels[:, :, :, i] *= m
|
|
pixels[:, :, :, i] += 0.5
|
|
t = vae.encode(pixels)
|
|
|
|
return (Latent(**{"samples": t, "noise_mask": (mask_erosion[:, :, :x, :y].round())}),)
|
|
|
|
|
|
class InpaintModelConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING",),
|
|
"negative": ("CONDITIONING",),
|
|
"vae": ("VAE",),
|
|
"pixels": ("IMAGE",),
|
|
"mask": ("MASK",),
|
|
"noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING", "CONDITIONING", "LATENT")
|
|
RETURN_NAMES = ("positive", "negative", "latent")
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning/inpaint"
|
|
|
|
def encode(self, positive: io.Conditioning.CondList, negative: io.Conditioning.CondList, pixels, vae, mask, noise_mask=True) -> tuple[io.Conditioning.CondList, io.Conditioning.CondList, Optional[Latent]]:
|
|
if pixels is None:
|
|
return positive, negative, None
|
|
x = (pixels.shape[1] // 8) * 8
|
|
y = (pixels.shape[2] // 8) * 8
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
orig_pixels = pixels
|
|
pixels = orig_pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % 8) // 2
|
|
y_offset = (pixels.shape[2] % 8) // 2
|
|
pixels = pixels[:, x_offset:x + x_offset, y_offset:y + y_offset, :]
|
|
mask = mask[:, :, x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:, :, :, i] -= 0.5
|
|
pixels[:, :, :, i] *= m
|
|
pixels[:, :, :, i] += 0.5
|
|
concat_latent = vae.encode(pixels)
|
|
orig_latent = vae.encode(orig_pixels)
|
|
|
|
out_latent: Latent = {"samples": orig_latent}
|
|
|
|
if noise_mask:
|
|
out_latent["noise_mask"] = mask
|
|
|
|
out: list[io.Conditioning.CondList] = []
|
|
for conditioning in [positive, negative]:
|
|
c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent,
|
|
"concat_mask": mask})
|
|
out.append(c)
|
|
return (out[0], out[1], out_latent)
|
|
|
|
|
|
class SaveLatent:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"filename_prefix": ("STRING", {"default": "latents/ComfyUI"})},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir)
|
|
|
|
# support save metadata for latent sharing
|
|
prompt_info = ""
|
|
if prompt is not None:
|
|
prompt_info = json.dumps(prompt)
|
|
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = {"prompt": prompt_info}
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata[x] = json.dumps(extra_pnginfo[x])
|
|
|
|
file = f"{filename}_{counter:05}_.latent"
|
|
|
|
results: list[FileLocator] = []
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": "output"
|
|
})
|
|
|
|
file = os.path.join(full_output_folder, file)
|
|
|
|
output = {}
|
|
output["latent_tensor"] = samples["samples"].contiguous()
|
|
output["latent_format_version_0"] = torch.tensor([])
|
|
|
|
utils.save_torch_file(output, file, metadata=metadata)
|
|
return {"ui": {"latents": results}}
|
|
|
|
|
|
class LoadLatent:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")]
|
|
return {"required": {"latent": [sorted(files), ]}, }
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "load"
|
|
|
|
def load(self, latent):
|
|
latent_path = folder_paths.get_annotated_filepath(latent)
|
|
latent = safetensors.torch.load_file(latent_path, device="cpu")
|
|
multiplier = 1.0
|
|
if "latent_format_version_0" not in latent:
|
|
multiplier = 1.0 / 0.18215
|
|
samples = {"samples": latent["latent_tensor"].float() * multiplier}
|
|
return (samples,)
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, latent):
|
|
if not folder_paths.exists_annotated_filepath(latent):
|
|
return "Invalid latent file: {}".format(latent)
|
|
return True
|
|
|
|
|
|
class CheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"config_name": (folder_paths.get_filename_list("configs"),),
|
|
"ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_CHECKPOINTS),)}}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
DEPRECATED = True
|
|
|
|
def load_checkpoint(self, config_name, ckpt_name):
|
|
config_path = folder_paths.get_full_path("configs", config_name)
|
|
ckpt_path = get_full_path_or_raise("checkpoints", ckpt_name, KNOWN_CHECKPOINTS)
|
|
return sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
|
|
|
|
class CheckpointLoaderSimple:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_CHECKPOINTS), {"tooltip": "The name of the checkpoint (model) to load."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
OUTPUT_TOOLTIPS = ("The model used for denoising latents.",
|
|
"The CLIP model used for encoding text prompts.",
|
|
"The VAE model used for encoding and decoding images to and from latent space.")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents."
|
|
|
|
def load_checkpoint(self, ckpt_name):
|
|
ckpt_path = get_full_path_or_raise("checkpoints", ckpt_name, KNOWN_CHECKPOINTS)
|
|
out = sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out[:3]
|
|
|
|
|
|
class DiffusersLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
paths = []
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
for root, subdir, files in os.walk(search_path, followlinks=True):
|
|
if "model_index.json" in files:
|
|
paths.append(os.path.relpath(root, start=search_path))
|
|
|
|
paths += get_huggingface_repo_list()
|
|
paths = list(frozenset(paths))
|
|
return {"required": {"model_path": (paths,),
|
|
"weight_dtype": (FLUX_WEIGHT_DTYPES,)
|
|
}}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_checkpoint(self, model_path, output_vae=True, output_clip=True, weight_dtype: str = "default"):
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
path = os.path.join(search_path, model_path)
|
|
if os.path.exists(path):
|
|
model_path = path
|
|
break
|
|
if not os.path.exists(model_path):
|
|
with comfy_tqdm():
|
|
model_path = snapshot_download(model_path)
|
|
|
|
model_options = get_model_options_for_dtype(weight_dtype)
|
|
return diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings"), model_options=model_options)
|
|
|
|
|
|
class unCLIPCheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_UNCLIP_CHECKPOINTS),),
|
|
}}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
|
|
ckpt_path = get_full_path_or_raise("checkpoints", ckpt_name, KNOWN_UNCLIP_CHECKPOINTS)
|
|
out = sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out
|
|
|
|
|
|
class CLIPSetLastLayer:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"clip": ("CLIP",),
|
|
"stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "set_last_layer"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def set_last_layer(self, clip, stop_at_clip_layer):
|
|
clip = clip.clone()
|
|
clip.clip_layer(stop_at_clip_layer)
|
|
return (clip,)
|
|
|
|
|
|
class LoraLoader:
|
|
def __init__(self):
|
|
self.loaded_lora = None
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}),
|
|
"clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}),
|
|
"lora_name": (get_filename_list_with_downloadable("loras", KNOWN_LORAS), {"tooltip": "The name of the LoRA."}),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}),
|
|
"strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP")
|
|
OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.")
|
|
FUNCTION = "load_lora"
|
|
|
|
CATEGORY = "loaders"
|
|
DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together."
|
|
|
|
def load_lora(self, model, clip, lora_name, strength_model, strength_clip):
|
|
if strength_model == 0 and strength_clip == 0:
|
|
return (model, clip)
|
|
|
|
lora_path = get_full_path_or_raise("loras", lora_name, KNOWN_LORAS)
|
|
lora = None
|
|
if self.loaded_lora is not None:
|
|
if self.loaded_lora[0] == lora_path:
|
|
lora = self.loaded_lora[1]
|
|
else:
|
|
self.loaded_lora = None
|
|
|
|
if lora is None:
|
|
lora = utils.load_torch_file(lora_path, safe_load=True)
|
|
self.loaded_lora = (lora_path, lora)
|
|
|
|
model_lora, clip_lora = sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip, lora_name=lora_name)
|
|
return (model_lora, clip_lora)
|
|
|
|
|
|
class LoraLoaderModelOnly(LoraLoader):
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"model": ("MODEL",),
|
|
"lora_name": (get_filename_list_with_downloadable("loras"),),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_lora_model_only"
|
|
|
|
def load_lora_model_only(self, model, lora_name, strength_model):
|
|
return (self.load_lora(model, None, lora_name, strength_model, 0)[0],)
|
|
|
|
|
|
class VAELoader:
|
|
video_taes = ["taehv", "lighttaew2_2", "lighttaew2_1", "lighttaehy1_5"]
|
|
image_taes = ["taesd", "taesdxl", "taesd3", "taef1"]
|
|
@staticmethod
|
|
def vae_list(s=None):
|
|
vaes = get_filename_list_with_downloadable("vae", KNOWN_VAES)
|
|
approx_vaes = get_filename_list_with_downloadable("vae_approx", KNOWN_APPROX_VAES)
|
|
sdxl_taesd_enc = False
|
|
sdxl_taesd_dec = False
|
|
sd1_taesd_enc = False
|
|
sd1_taesd_dec = False
|
|
sd3_taesd_enc = False
|
|
sd3_taesd_dec = False
|
|
f1_taesd_enc = False
|
|
f1_taesd_dec = False
|
|
|
|
for v in approx_vaes:
|
|
if v.startswith("taesd_decoder."):
|
|
sd1_taesd_dec = True
|
|
elif v.startswith("taesd_encoder."):
|
|
sd1_taesd_enc = True
|
|
elif v.startswith("taesdxl_decoder."):
|
|
sdxl_taesd_dec = True
|
|
elif v.startswith("taesdxl_encoder."):
|
|
sdxl_taesd_enc = True
|
|
elif v.startswith("taesd3_decoder."):
|
|
sd3_taesd_dec = True
|
|
elif v.startswith("taesd3_encoder."):
|
|
sd3_taesd_enc = True
|
|
elif v.startswith("taef1_encoder."):
|
|
f1_taesd_dec = True
|
|
elif v.startswith("taef1_decoder."):
|
|
f1_taesd_enc = True
|
|
else:
|
|
for tae in VAELoader.video_taes:
|
|
if v.startswith(tae):
|
|
vaes.append(v)
|
|
|
|
if sd1_taesd_dec and sd1_taesd_enc:
|
|
vaes.append("taesd")
|
|
if sdxl_taesd_dec and sdxl_taesd_enc:
|
|
vaes.append("taesdxl")
|
|
if sd3_taesd_dec and sd3_taesd_enc:
|
|
vaes.append("taesd3")
|
|
if f1_taesd_dec and f1_taesd_enc:
|
|
vaes.append("taef1")
|
|
vaes.append("pixel_space")
|
|
return vaes
|
|
|
|
@staticmethod
|
|
def load_taesd(name: str):
|
|
sd_ = {}
|
|
approx_vaes = folder_paths.get_filename_list("vae_approx")
|
|
|
|
encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes))
|
|
decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes))
|
|
|
|
enc = utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder))
|
|
for k in enc:
|
|
sd_["taesd_encoder.{}".format(k)] = enc[k]
|
|
|
|
dec = utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder))
|
|
for k in dec:
|
|
sd_["taesd_decoder.{}".format(k)] = dec[k]
|
|
|
|
if name == "taesd":
|
|
sd_["vae_scale"] = torch.tensor(0.18215)
|
|
sd_["vae_shift"] = torch.tensor(0.0)
|
|
elif name == "taesdxl":
|
|
sd_["vae_scale"] = torch.tensor(0.13025)
|
|
sd_["vae_shift"] = torch.tensor(0.0)
|
|
elif name == "taesd3":
|
|
sd_["vae_scale"] = torch.tensor(1.5305)
|
|
sd_["vae_shift"] = torch.tensor(0.0609)
|
|
elif name == "taef1":
|
|
sd_["vae_scale"] = torch.tensor(0.3611)
|
|
sd_["vae_shift"] = torch.tensor(0.1159)
|
|
return sd_
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"vae_name": (s.vae_list(s),)}}
|
|
RETURN_TYPES = ("VAE",)
|
|
FUNCTION = "load_vae"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
# TODO: scale factor?
|
|
def load_vae(self, vae_name):
|
|
metadata = {}
|
|
if vae_name == "pixel_space":
|
|
sd_ = {}
|
|
sd_["pixel_space_vae"] = torch.tensor(1.0)
|
|
elif vae_name in self.image_taes:
|
|
sd_ = self.load_taesd(vae_name)
|
|
else:
|
|
if os.path.splitext(vae_name)[0] in self.video_taes:
|
|
vae_path = folder_paths.get_full_path_or_raise("vae_approx", vae_name)
|
|
else:
|
|
vae_path = get_full_path_or_raise("vae", vae_name, KNOWN_VAES)
|
|
sd_, metadata = utils.load_torch_file(vae_path, return_metadata=True)
|
|
vae = sd.VAE(sd=sd_, metadata=metadata, ckpt_name=vae_name)
|
|
vae.throw_exception_if_invalid()
|
|
return (vae,)
|
|
|
|
|
|
class ControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"control_net_name": (get_filename_list_with_downloadable("controlnet", KNOWN_CONTROLNETS),)}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, control_net_name):
|
|
controlnet_path = get_full_path_or_raise("controlnet", control_net_name, KNOWN_CONTROLNETS)
|
|
controlnet_ = controlnet.load_controlnet(controlnet_path)
|
|
if controlnet is None:
|
|
raise RuntimeError("ERROR: controlnet file is invalid and does not contain a valid controlnet model.")
|
|
return (controlnet_,)
|
|
|
|
|
|
class ControlNetLoaderWeights:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"control_net_name": (get_filename_list_with_downloadable("controlnet", KNOWN_CONTROLNETS),),
|
|
"weight_dtype": (FLUX_WEIGHT_DTYPES,)
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, control_net_name, weight_dtype):
|
|
controlnet_path = get_full_path_or_raise("controlnet", control_net_name, KNOWN_CONTROLNETS)
|
|
model_options = get_model_options_for_dtype(weight_dtype)
|
|
|
|
controlnet_ = controlnet.load_controlnet(controlnet_path, model_options=model_options)
|
|
return (controlnet_,)
|
|
|
|
|
|
class DiffControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"model": ("MODEL",),
|
|
"control_net_name": (get_filename_list_with_downloadable("controlnet", KNOWN_DIFF_CONTROLNETS),)}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, model, control_net_name):
|
|
controlnet_path = get_full_path_or_raise("controlnet", control_net_name, KNOWN_DIFF_CONTROLNETS)
|
|
controlnet_ = controlnet.load_controlnet(controlnet_path, model)
|
|
return (controlnet_,)
|
|
|
|
|
|
class ControlNetApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"control_net": ("CONTROL_NET",),
|
|
"image": ("IMAGE",),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01})
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
DEPRECATED = True
|
|
CATEGORY = "conditioning/controlnet"
|
|
|
|
def apply_controlnet(self, conditioning, control_net, image: RGBImageBatch, strength):
|
|
if strength == 0:
|
|
return (conditioning,)
|
|
|
|
c = []
|
|
control_hint = image.movedim(-1, 1)
|
|
for t in conditioning:
|
|
n = [t[0], t[1].copy()]
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength)
|
|
if 'control' in t[1]:
|
|
c_net.set_previous_controlnet(t[1]['control'])
|
|
n[1]['control'] = c_net
|
|
n[1]['control_apply_to_uncond'] = True
|
|
c.append(n)
|
|
return (c,)
|
|
|
|
|
|
class ControlNetApplyAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING",),
|
|
"negative": ("CONDITIONING",),
|
|
"control_net": ("CONTROL_NET",),
|
|
"image": ("IMAGE",),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
},
|
|
"optional": {"vae": ("VAE",),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("CONDITIONING", "CONDITIONING")
|
|
RETURN_NAMES = ("positive", "negative")
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
CATEGORY = "conditioning/controlnet"
|
|
|
|
def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]):
|
|
if strength == 0:
|
|
return (positive, negative)
|
|
|
|
control_hint = image.movedim(-1, 1)
|
|
cnets = {}
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
|
|
prev_cnet = d.get('control', None)
|
|
if prev_cnet in cnets:
|
|
c_net = cnets[prev_cnet]
|
|
else:
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat)
|
|
c_net.set_previous_controlnet(prev_cnet)
|
|
cnets[prev_cnet] = c_net
|
|
|
|
d['control'] = c_net
|
|
d['control_apply_to_uncond'] = False
|
|
n = [t[0], d]
|
|
c.append(n)
|
|
out.append(c)
|
|
return (out[0], out[1])
|
|
|
|
|
|
def get_model_options_for_dtype(weight_dtype):
|
|
model_options = {}
|
|
if weight_dtype == "fp8_e4m3fn":
|
|
model_options["dtype"] = torch.float8_e4m3fn
|
|
elif weight_dtype == "fp8_e5m2":
|
|
model_options["dtype"] = torch.float8_e5m2
|
|
elif weight_dtype == "fp8_e4m3fn_fast":
|
|
model_options["dtype"] = torch.float8_e4m3fn
|
|
model_options["fp8_optimizations"] = True
|
|
return model_options
|
|
|
|
|
|
class UNETLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"unet_name": (get_filename_list_with_downloadable("diffusion_models", KNOWN_UNET_MODELS),),
|
|
"weight_dtype": (FLUX_WEIGHT_DTYPES,)
|
|
}}
|
|
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_unet"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_unet(self, unet_name, weight_dtype="default"):
|
|
model_options = get_model_options_for_dtype(weight_dtype)
|
|
unet_path = get_full_path_or_raise("diffusion_models", unet_name, KNOWN_UNET_MODELS)
|
|
model = sd.load_diffusion_model(unet_path, model_options=model_options)
|
|
return (model,)
|
|
|
|
|
|
class CLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"clip_name": (get_filename_list_with_downloadable("text_encoders", KNOWN_CLIP_MODELS),),
|
|
"type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart", "cosmos", "lumina2", "wan", "hidream", "chroma", "ace", "omnigen2", "qwen_image", "hunyuan_image", "flux2", "ovis"],),
|
|
},
|
|
"optional": {
|
|
"device": (["default", "cpu"], {"advanced": True}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 xxl/ clip-g / clip-l\nstable_audio: t5 base\nmochi: t5 xxl\ncosmos: old t5 xxl\nlumina2: gemma 2 2B\nwan: umt5 xxl\n hidream: llama-3.1 (Recommend) or t5\nomnigen2: qwen vl 2.5 3B"
|
|
|
|
def load_clip(self, clip_name, type="stable_diffusion", device="default"):
|
|
clip_type = getattr(sd.CLIPType, type.upper(), sd.CLIPType.STABLE_DIFFUSION)
|
|
|
|
model_options = {}
|
|
if device == "cpu":
|
|
model_options["load_device"] = model_options["offload_device"] = torch.device("cpu")
|
|
|
|
clip_path = get_full_path_or_raise("text_encoders", clip_name, KNOWN_CLIP_MODELS)
|
|
clip = sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options)
|
|
return (clip,)
|
|
|
|
|
|
class DualCLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"clip_name1": (get_filename_list_with_downloadable("text_encoders"),), "clip_name2": (
|
|
get_filename_list_with_downloadable("text_encoders"),),
|
|
"type": (["sdxl", "sd3", "flux", "hunyuan_video", "hidream", "hunyuan_image", "hunyuan_video_15", "kandinsky5", "kandinsky5_image", "newbie"],),
|
|
},
|
|
"optional": {
|
|
"device": (["default", "cpu"], {"advanced": True}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5\nhidream: at least one of t5 or llama, recommended t5 and llama\nhunyuan_image: qwen2.5vl 7b and byt5 small\nnewbie: gemma-3-4b-it, jina clip v2"
|
|
|
|
def load_clip(self, clip_name1, clip_name2, type, device="default"):
|
|
clip_type = getattr(sd.CLIPType, type.upper(), sd.CLIPType.STABLE_DIFFUSION)
|
|
clip_path1 = get_full_path_or_raise("text_encoders", clip_name1)
|
|
clip_path2 = get_full_path_or_raise("text_encoders", clip_name2)
|
|
|
|
model_options = {}
|
|
if device == "cpu":
|
|
model_options["load_device"] = model_options["offload_device"] = torch.device("cpu")
|
|
|
|
clip = sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options)
|
|
return (clip,)
|
|
|
|
|
|
class CLIPVisionLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"clip_name": (get_filename_list_with_downloadable("clip_vision", KNOWN_CLIP_VISION_MODELS),),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CLIP_VISION",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_clip(self, clip_name):
|
|
clip_path = get_full_path_or_raise("clip_vision", clip_name, KNOWN_CLIP_VISION_MODELS)
|
|
clip_vision = clip_vision_module.load(clip_path)
|
|
if clip_vision is None:
|
|
raise RuntimeError("ERROR: clip vision file is invalid and does not contain a valid vision model.")
|
|
return (clip_vision,)
|
|
|
|
|
|
class CLIPVisionEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"clip_vision": ("CLIP_VISION",),
|
|
"image": ("IMAGE",),
|
|
},
|
|
"optional": {
|
|
"crop": (["center", "none"], {"default": "center"})
|
|
}}
|
|
|
|
RETURN_TYPES = ("CLIP_VISION_OUTPUT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def encode(self, clip_vision, image, crop="center"):
|
|
crop_image = True
|
|
if crop != "center":
|
|
crop_image = False
|
|
output = clip_vision.encode_image(image, crop=crop_image)
|
|
return (output,)
|
|
|
|
|
|
class StyleModelLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"style_model_name": (get_filename_list_with_downloadable("style_models"),)}}
|
|
|
|
RETURN_TYPES = ("STYLE_MODEL",)
|
|
FUNCTION = "load_style_model"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_style_model(self, style_model_name):
|
|
style_model_path = get_full_path_or_raise("style_models", style_model_name)
|
|
style_model = sd.load_style_model(style_model_path)
|
|
return (style_model,)
|
|
|
|
|
|
class StyleModelApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"style_model": ("STYLE_MODEL",),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT",),
|
|
},
|
|
"optional": {
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.001}),
|
|
"strength_type": (["multiply", "attn_bias"], {"default": "multiply"}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_stylemodel"
|
|
|
|
CATEGORY = "conditioning/style_model"
|
|
|
|
def apply_stylemodel(self, conditioning, style_model, clip_vision_output, strength=1.0, strength_type="multiply"):
|
|
cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0)
|
|
if strength_type == "multiply":
|
|
cond *= strength
|
|
|
|
n = cond.shape[1]
|
|
c_out = []
|
|
for t in conditioning:
|
|
(txt, keys) = t
|
|
keys = keys.copy()
|
|
# even if the strength is 1.0 (i.e, no change), if there's already a mask, we have to add to it
|
|
if "attention_mask" in keys or (strength_type == "attn_bias" and strength != 1.0):
|
|
# math.log raises an error if the argument is zero
|
|
# torch.log returns -inf, which is what we want
|
|
attn_bias = torch.log(torch.Tensor([strength if strength_type == "attn_bias" else 1.0]))
|
|
# get the size of the mask image
|
|
mask_ref_size = keys.get("attention_mask_img_shape", (1, 1))
|
|
n_ref = mask_ref_size[0] * mask_ref_size[1]
|
|
n_txt = txt.shape[1]
|
|
# grab the existing mask
|
|
mask = keys.get("attention_mask", None)
|
|
# create a default mask if it doesn't exist
|
|
if mask is None:
|
|
mask = torch.zeros((txt.shape[0], n_txt + n_ref, n_txt + n_ref), dtype=torch.float16)
|
|
# convert the mask dtype, because it might be boolean
|
|
# we want it to be interpreted as a bias
|
|
if mask.dtype == torch.bool:
|
|
# log(True) = log(1) = 0
|
|
# log(False) = log(0) = -inf
|
|
mask = torch.log(mask.to(dtype=torch.float16))
|
|
# now we make the mask bigger to add space for our new tokens
|
|
new_mask = torch.zeros((txt.shape[0], n_txt + n + n_ref, n_txt + n + n_ref), dtype=torch.float16)
|
|
# copy over the old mask, in quandrants
|
|
new_mask[:, :n_txt, :n_txt] = mask[:, :n_txt, :n_txt]
|
|
new_mask[:, :n_txt, n_txt + n:] = mask[:, :n_txt, n_txt:]
|
|
new_mask[:, n_txt + n:, :n_txt] = mask[:, n_txt:, :n_txt]
|
|
new_mask[:, n_txt + n:, n_txt + n:] = mask[:, n_txt:, n_txt:]
|
|
# now fill in the attention bias to our redux tokens
|
|
new_mask[:, :n_txt, n_txt:n_txt + n] = attn_bias
|
|
new_mask[:, n_txt + n:, n_txt:n_txt + n] = attn_bias
|
|
keys["attention_mask"] = new_mask.to(txt.device)
|
|
keys["attention_mask_img_shape"] = mask_ref_size
|
|
|
|
c_out.append([torch.cat((txt, cond), dim=1), keys])
|
|
|
|
return (c_out,)
|
|
|
|
|
|
class unCLIPConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING",),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT",),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_adm"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation):
|
|
if strength == 0:
|
|
return (conditioning,)
|
|
|
|
c = node_helpers.conditioning_set_values(conditioning, {"unclip_conditioning": [{"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}]}, append=True)
|
|
return (c,)
|
|
|
|
|
|
class GLIGENLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"gligen_name": (get_filename_list_with_downloadable("gligen", KNOWN_GLIGEN_MODELS),)}}
|
|
|
|
RETURN_TYPES = ("GLIGEN",)
|
|
FUNCTION = "load_gligen"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_gligen(self, gligen_name):
|
|
gligen_path = get_full_path_or_raise("gligen", gligen_name, KNOWN_GLIGEN_MODELS)
|
|
gligen = sd.load_gligen(gligen_path)
|
|
return (gligen,)
|
|
|
|
|
|
class GLIGENTextBoxApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING",),
|
|
"clip": ("CLIP",),
|
|
"gligen_textbox_model": ("GLIGEN",),
|
|
"text": ("STRING", {"multiline": True, "dynamicPrompts": True}),
|
|
"width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning/gligen"
|
|
|
|
def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y):
|
|
c = []
|
|
cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected")
|
|
for t in conditioning_to:
|
|
n = [t[0], t[1].copy()]
|
|
position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)]
|
|
prev = []
|
|
if "gligen" in n[1]:
|
|
prev = n[1]['gligen'][2]
|
|
|
|
n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params)
|
|
c.append(n)
|
|
return (c,)
|
|
|
|
|
|
class EmptyLatentImage:
|
|
def __init__(self):
|
|
self.device = model_management.intermediate_device()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}),
|
|
"height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."})
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
OUTPUT_TOOLTIPS = ("The empty latent image batch.",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "latent"
|
|
DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling."
|
|
|
|
def generate(self, width, height, batch_size=1):
|
|
latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device)
|
|
return ({"samples": latent},)
|
|
|
|
|
|
class LatentFromBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"batch_index": ("INT", {"default": 0, "min": 0, "max": 63}),
|
|
"length": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "frombatch"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def frombatch(self, samples, batch_index, length):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
batch_index = min(s_in.shape[0] - 1, batch_index)
|
|
length = min(s_in.shape[0] - batch_index, length)
|
|
s["samples"] = s_in[batch_index:batch_index + length].clone()
|
|
if "noise_mask" in samples:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] == 1:
|
|
s["noise_mask"] = masks.clone()
|
|
else:
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = masks[batch_index:batch_index + length].clone()
|
|
if "batch_index" not in s:
|
|
s["batch_index"] = [x for x in range(batch_index, batch_index + length)]
|
|
else:
|
|
s["batch_index"] = samples["batch_index"][batch_index:batch_index + length]
|
|
return (s,)
|
|
|
|
|
|
class RepeatLatentBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"amount": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "repeat"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def repeat(self, samples, amount):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
|
|
s["samples"] = s_in.repeat((amount,) + ((1,) * (s_in.ndim - 1)))
|
|
if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat((math.ceil(s_in.shape[0] / masks.shape[0]),) + ((1,) * (masks.ndim - 1)))[:s_in.shape[0]]
|
|
s["noise_mask"] = samples["noise_mask"].repeat((amount,) + ((1,) * (samples["noise_mask"].ndim - 1)))
|
|
if "batch_index" in s:
|
|
offset = max(s["batch_index"]) - min(s["batch_index"]) + 1
|
|
s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]]
|
|
return (s,)
|
|
|
|
|
|
class LatentUpscale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"crop": (s.crop_methods,)}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, width, height, crop):
|
|
if samples is None:
|
|
return None,
|
|
if width == 0 and height == 0:
|
|
s = samples
|
|
else:
|
|
s = samples.copy()
|
|
|
|
if width == 0:
|
|
height = max(64, height)
|
|
width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2]))
|
|
elif height == 0:
|
|
width = max(64, width)
|
|
height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1]))
|
|
else:
|
|
width = max(64, width)
|
|
height = max(64, height)
|
|
|
|
s["samples"] = utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop)
|
|
return (s,)
|
|
|
|
|
|
class LatentUpscaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}), }}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, scale_by):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
width = round(samples["samples"].shape[-1] * scale_by)
|
|
height = round(samples["samples"].shape[-2] * scale_by)
|
|
s["samples"] = utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled")
|
|
return (s,)
|
|
|
|
|
|
class LatentRotate:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "rotate"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def rotate(self, samples, rotation):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
rotate_by = 0
|
|
if rotation.startswith("90"):
|
|
rotate_by = 1
|
|
elif rotation.startswith("180"):
|
|
rotate_by = 2
|
|
elif rotation.startswith("270"):
|
|
rotate_by = 3
|
|
|
|
s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2])
|
|
return (s,)
|
|
|
|
|
|
class LatentFlip:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"flip_method": (["x-axis: vertically", "y-axis: horizontally"],),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "flip"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def flip(self, samples, flip_method):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
if flip_method.startswith("x"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[2])
|
|
elif flip_method.startswith("y"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[3])
|
|
|
|
return (s,)
|
|
|
|
|
|
class LatentComposite:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples_to": ("LATENT",),
|
|
"samples_from": ("LATENT",),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "composite"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0):
|
|
x = x // 8
|
|
y = y // 8
|
|
feather = feather // 8
|
|
samples_out = samples_to.copy()
|
|
s = samples_to["samples"].clone()
|
|
samples_to = samples_to["samples"]
|
|
samples_from = samples_from["samples"]
|
|
if feather == 0:
|
|
s[:, :, y:y + samples_from.shape[2], x:x + samples_from.shape[3]] = samples_from[:, :, :samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
else:
|
|
samples_from = samples_from[:, :, :samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
mask = torch.ones_like(samples_from)
|
|
for t in range(feather):
|
|
if y != 0:
|
|
mask[:, :, t:1 + t, :] *= ((1.0 / feather) * (t + 1))
|
|
|
|
if y + samples_from.shape[2] < samples_to.shape[2]:
|
|
mask[:, :, mask.shape[2] - 1 - t: mask.shape[2] - t, :] *= ((1.0 / feather) * (t + 1))
|
|
if x != 0:
|
|
mask[:, :, :, t:1 + t] *= ((1.0 / feather) * (t + 1))
|
|
if x + samples_from.shape[3] < samples_to.shape[3]:
|
|
mask[:, :, :, mask.shape[3] - 1 - t: mask.shape[3] - t] *= ((1.0 / feather) * (t + 1))
|
|
rev_mask = torch.ones_like(mask) - mask
|
|
s[:, :, y:y + samples_from.shape[2], x:x + samples_from.shape[3]] = samples_from[:, :, :samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:, :, y:y + samples_from.shape[2], x:x + samples_from.shape[3]] * rev_mask
|
|
samples_out["samples"] = s
|
|
return (samples_out,)
|
|
|
|
|
|
class LatentBlend:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"samples1": ("LATENT",),
|
|
"samples2": ("LATENT",),
|
|
"blend_factor": ("FLOAT", {
|
|
"default": 0.5,
|
|
"min": 0,
|
|
"max": 1,
|
|
"step": 0.01
|
|
}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "blend"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def blend(self, samples1, samples2, blend_factor: float, blend_mode: str = "normal"):
|
|
|
|
samples_out = samples1.copy()
|
|
samples1 = samples1["samples"]
|
|
samples2 = samples2["samples"]
|
|
|
|
if samples1.shape != samples2.shape:
|
|
samples2.permute(0, 3, 1, 2)
|
|
samples2 = utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center')
|
|
samples2.permute(0, 2, 3, 1)
|
|
|
|
samples_blended = self.blend_mode(samples1, samples2, blend_mode)
|
|
samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor)
|
|
samples_out["samples"] = samples_blended
|
|
return (samples_out,)
|
|
|
|
def blend_mode(self, img1, img2, mode):
|
|
if mode == "normal":
|
|
return img2
|
|
else:
|
|
raise ValueError(f"Unsupported blend mode: {mode}")
|
|
|
|
|
|
class LatentCrop:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "crop"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def crop(self, samples, width, height, x, y):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
samples = samples['samples']
|
|
x = x // 8
|
|
y = y // 8
|
|
|
|
# enfonce minimum size of 64
|
|
if x > (samples.shape[3] - 8):
|
|
x = samples.shape[3] - 8
|
|
if y > (samples.shape[2] - 8):
|
|
y = samples.shape[2] - 8
|
|
|
|
new_height = height // 8
|
|
new_width = width // 8
|
|
to_x = new_width + x
|
|
to_y = new_height + y
|
|
s['samples'] = samples[:, :, y:to_y, x:to_x]
|
|
return (s,)
|
|
|
|
|
|
class SetLatentNoiseMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT",),
|
|
"mask": ("MASK",),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "set_mask"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def set_mask(self, samples, mask):
|
|
if samples is None:
|
|
return None,
|
|
s = samples.copy()
|
|
s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
|
|
return (s,)
|
|
|
|
|
|
def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False):
|
|
latent_image = latent["samples"]
|
|
latent_image = sample.fix_empty_latent_channels(model, latent_image)
|
|
|
|
if disable_noise:
|
|
noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu")
|
|
else:
|
|
batch_inds = latent["batch_index"] if "batch_index" in latent else None
|
|
noise = sample.prepare_noise(latent_image, seed, batch_inds)
|
|
|
|
noise_mask = None
|
|
if "noise_mask" in latent:
|
|
noise_mask = latent["noise_mask"]
|
|
|
|
callback = latent_preview.prepare_callback(model, steps)
|
|
disable_pbar = not current_execution_context().server.receive_all_progress_notifications
|
|
samples = sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
|
|
denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step,
|
|
force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed)
|
|
out = latent.copy()
|
|
out["samples"] = samples
|
|
return (out,)
|
|
|
|
|
|
class KSampler:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}),
|
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True, "tooltip": "The random seed used for creating the noise."}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}),
|
|
"sampler_name": (samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}),
|
|
"scheduler": (samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}),
|
|
"positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}),
|
|
"negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}),
|
|
"latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
OUTPUT_TOOLTIPS = ("The denoised latent.",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image."
|
|
|
|
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
|
|
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
|
|
|
|
|
|
class KSamplerAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"model": ("MODEL",),
|
|
"add_noise": (["enable", "disable"],),
|
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01}),
|
|
"sampler_name": (samplers.KSampler.SAMPLERS,),
|
|
"scheduler": (samplers.KSampler.SCHEDULERS,),
|
|
"positive": ("CONDITIONING",),
|
|
"negative": ("CONDITIONING",),
|
|
"latent_image": ("LATENT",),
|
|
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
|
|
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
|
|
"return_with_leftover_noise": (["disable", "enable"],),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
|
|
def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
|
|
force_full_denoise = True
|
|
if return_with_leftover_noise == "enable":
|
|
force_full_denoise = False
|
|
disable_noise = False
|
|
if add_noise == "disable":
|
|
disable_noise = True
|
|
return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)
|
|
|
|
|
|
class SaveImage:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
self.type = "output"
|
|
self.prefix_append = ""
|
|
self.compress_level = 4
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"images": ("IMAGE", {"tooltip": "The images to save."}),
|
|
"filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."})
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save_images"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "image"
|
|
DESCRIPTION = "Saves the input images to your ComfyUI output directory."
|
|
|
|
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
if images is None:
|
|
return {"ui": {"images": []}}
|
|
filename_prefix += self.prefix_append
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
|
|
results = list()
|
|
image: RGBImage
|
|
for (batch_number, image) in enumerate(images):
|
|
i = 255. * image.float().cpu().numpy()
|
|
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
|
|
|
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
|
|
file = f"{filename_with_batch_num}_{counter:05}_.png"
|
|
abs_path = os.path.join(full_output_folder, file)
|
|
img.save(abs_path, pnginfo=metadata, compress_level=self.compress_level)
|
|
results.append({
|
|
"abs_path": os.path.abspath(abs_path),
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": self.type
|
|
})
|
|
counter += 1
|
|
|
|
return {"ui": {"images": results}}
|
|
|
|
|
|
class PreviewImage(SaveImage):
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_temp_directory()
|
|
self.type = "temp"
|
|
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5))
|
|
self.compress_level = 1
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"images": ("IMAGE",), },
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
|
|
class LoadImage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
files = folder_paths.filter_files_content_types(files, ["image"])
|
|
return {
|
|
"required": {
|
|
"image": (natsorted(files), {"image_upload": True}),
|
|
},
|
|
}
|
|
|
|
CATEGORY = "image"
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "load_image"
|
|
|
|
def load_image(self, image: str) -> ImageMaskTuple:
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
output_images = []
|
|
output_masks = []
|
|
w, h = None, None
|
|
|
|
excluded_formats = ['MPO']
|
|
|
|
# maintain the legacy path
|
|
# this will ultimately return a tensor, so we'd rather have the tensors directly
|
|
# from cv2 rather than get them out of a PIL image
|
|
_, ext = os.path.splitext(image)
|
|
if ext == ".exr":
|
|
return load_exr(image_path, srgb=False)
|
|
with open_image(image_path) as img:
|
|
for i in ImageSequence.Iterator(img):
|
|
i = node_helpers.pillow(ImageOps.exif_transpose, i)
|
|
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
image = i.convert("RGB")
|
|
|
|
if len(output_images) == 0:
|
|
w = image.size[0]
|
|
h = image.size[1]
|
|
|
|
if image.size[0] != w or image.size[1] != h:
|
|
continue
|
|
|
|
image = np.array(image).astype(np.float32) / 255.0
|
|
image = torch.from_numpy(image)[None,]
|
|
if 'A' in i.getbands():
|
|
mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0
|
|
mask = 1. - torch.from_numpy(mask)
|
|
elif i.mode == 'P' and 'transparency' in i.info:
|
|
mask = np.array(i.convert('RGBA').getchannel('A')).astype(np.float32) / 255.0
|
|
mask = 1. - torch.from_numpy(mask)
|
|
else:
|
|
# unlike upstream, the mask is now the size of the original image, even when there was no alpha channel
|
|
# this is opaque
|
|
mask = torch.zeros((h, w), dtype=torch.float32, device="cpu")
|
|
output_images.append(image)
|
|
output_masks.append(mask.unsqueeze(0))
|
|
|
|
if len(output_images) > 1 and img.format not in excluded_formats:
|
|
output_image = torch.cat(output_images, dim=0)
|
|
output_mask = torch.cat(output_masks, dim=0)
|
|
else:
|
|
output_image = output_images[0]
|
|
output_mask = output_masks[0]
|
|
|
|
return ImageMaskTuple(output_image, output_mask)
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
|
|
class LoadImageMask:
|
|
_color_channels = ["alpha", "red", "green", "blue"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {"required":
|
|
{"image": (natsorted(files), {"image_upload": True}),
|
|
"channel": (s._color_channels,), }
|
|
}
|
|
|
|
CATEGORY = "mask"
|
|
|
|
RETURN_TYPES = ("MASK",)
|
|
FUNCTION = "load_image"
|
|
|
|
def load_image(self, image, channel):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
i = node_helpers.pillow(Image.open, image_path)
|
|
i = node_helpers.pillow(ImageOps.exif_transpose, i)
|
|
if i.getbands() != ("R", "G", "B", "A"):
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
i = i.convert("RGBA")
|
|
mask = None
|
|
c = channel[0].upper()
|
|
if c in i.getbands():
|
|
mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0
|
|
mask = torch.from_numpy(mask)
|
|
if c == 'A':
|
|
mask = 1. - mask
|
|
else:
|
|
mask = torch.zeros((64, 64), dtype=torch.float32, device="cpu")
|
|
return (mask.unsqueeze(0),)
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
|
|
class LoadImageOutput(LoadImage):
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"image": ("COMBO", {
|
|
"image_upload": True,
|
|
"image_folder": "output",
|
|
"remote": {
|
|
"route": "/internal/files/output",
|
|
"refresh_button": True,
|
|
"control_after_refresh": "first",
|
|
},
|
|
}),
|
|
}
|
|
}
|
|
|
|
DESCRIPTION = "Load an image from the output folder. When the refresh button is clicked, the node will update the image list and automatically select the first image, allowing for easy iteration."
|
|
EXPERIMENTAL = True
|
|
FUNCTION = "load_image"
|
|
|
|
|
|
class ImageScale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"crop": (s.crop_methods,)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image: RGBImageBatch, upscale_method, width, height, crop) -> tuple[RGBImageBatch]:
|
|
if width == 0 and height == 0:
|
|
s = image
|
|
else:
|
|
samples = image.movedim(-1, 1)
|
|
|
|
if width == 0:
|
|
width = max(1, round(samples.shape[3] * height / samples.shape[2]))
|
|
elif height == 0:
|
|
height = max(1, round(samples.shape[2] * width / samples.shape[3]))
|
|
|
|
s = utils.common_upscale(samples, width, height, upscale_method, crop)
|
|
s = s.movedim(1, -1)
|
|
return (s,)
|
|
|
|
|
|
class ImageScaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}), }}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image, upscale_method, scale_by):
|
|
samples = image.movedim(-1, 1)
|
|
width = round(samples.shape[3] * scale_by)
|
|
height = round(samples.shape[2] * scale_by)
|
|
s = utils.common_upscale(samples, width, height, upscale_method, "disabled")
|
|
s = s.movedim(1, -1)
|
|
return (s,)
|
|
|
|
|
|
class ImageInvert:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"image": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "invert"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def invert(self, image):
|
|
s = 1.0 - image
|
|
return (s,)
|
|
|
|
|
|
class ImageBatch:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"image1": ("IMAGE",), "image2": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "batch"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def batch(self, image1, image2):
|
|
if image1.shape[-1] != image2.shape[-1]:
|
|
if image1.shape[-1] > image2.shape[-1]:
|
|
image2 = torch.nn.functional.pad(image2, (0,1), mode='constant', value=1.0)
|
|
else:
|
|
image1 = torch.nn.functional.pad(image1, (0,1), mode='constant', value=1.0)
|
|
if image1.shape[1:] != image2.shape[1:]:
|
|
image2 = utils.common_upscale(image2.movedim(-1, 1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1, -1)
|
|
s = torch.cat((image1, image2), dim=0)
|
|
return (s,)
|
|
|
|
|
|
class EmptyImage:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}),
|
|
"color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def generate(self, width, height, batch_size=1, color=0):
|
|
r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
|
|
g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
|
|
b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
|
|
return (torch.cat((r, g, b), dim=-1),)
|
|
|
|
|
|
class ImagePadForOutpaint:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"image": ("IMAGE",),
|
|
"left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "expand_image"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def expand_image(self, image: RGBImageBatch | RGBAImageBatch, left, top, right, bottom, feathering) -> tuple[RGBImageBatch | RGBAImageBatch, MaskBatch]:
|
|
batch, height, width, channels = image.size()
|
|
|
|
new_image = torch.ones(
|
|
(batch, height + top + bottom, width + left + right, channels),
|
|
dtype=torch.float32,
|
|
) * 0.5
|
|
|
|
new_image[:, top:top + height, left:left + width, :] = image
|
|
|
|
mask = torch.ones(
|
|
(batch, height + top + bottom, width + left + right),
|
|
dtype=torch.float32,
|
|
)
|
|
|
|
t = torch.zeros(
|
|
(height, width),
|
|
dtype=torch.float32
|
|
)
|
|
|
|
if feathering > 0 and feathering * 2 < height and feathering * 2 < width:
|
|
|
|
for i in range(height):
|
|
for j in range(width):
|
|
dt = i if top != 0 else height
|
|
db = height - i if bottom != 0 else height
|
|
|
|
dl = j if left != 0 else width
|
|
dr = width - j if right != 0 else width
|
|
|
|
d = min(dt, db, dl, dr)
|
|
|
|
if d >= feathering:
|
|
continue
|
|
|
|
v = (feathering - d) / feathering
|
|
|
|
t[i, j] = v * v
|
|
|
|
mask[:, top:top + height, left:left + width] = t
|
|
|
|
# mask is already in batch shape
|
|
return new_image, mask
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
"KSampler": KSampler,
|
|
"CheckpointLoaderSimple": CheckpointLoaderSimple,
|
|
"CLIPTextEncode": CLIPTextEncode,
|
|
"CLIPSetLastLayer": CLIPSetLastLayer,
|
|
"VAEDecode": VAEDecode,
|
|
"VAEEncode": VAEEncode,
|
|
"VAEEncodeForInpaint": VAEEncodeForInpaint,
|
|
"VAELoader": VAELoader,
|
|
"EmptyLatentImage": EmptyLatentImage,
|
|
"LatentUpscale": LatentUpscale,
|
|
"LatentUpscaleBy": LatentUpscaleBy,
|
|
"LatentFromBatch": LatentFromBatch,
|
|
"RepeatLatentBatch": RepeatLatentBatch,
|
|
"SaveImage": SaveImage,
|
|
"PreviewImage": PreviewImage,
|
|
"LoadImage": LoadImage,
|
|
"LoadImageMask": LoadImageMask,
|
|
"LoadImageOutput": LoadImageOutput,
|
|
"ImageScale": ImageScale,
|
|
"ImageScaleBy": ImageScaleBy,
|
|
"ImageInvert": ImageInvert,
|
|
"ImageBatch": ImageBatch,
|
|
"ImagePadForOutpaint": ImagePadForOutpaint,
|
|
"EmptyImage": EmptyImage,
|
|
"ConditioningAverage": ConditioningAverage,
|
|
"ConditioningCombine": ConditioningCombine,
|
|
"ConditioningConcat": ConditioningConcat,
|
|
"ConditioningSetArea": ConditioningSetArea,
|
|
"ConditioningSetAreaPercentage": ConditioningSetAreaPercentage,
|
|
"ConditioningSetAreaStrength": ConditioningSetAreaStrength,
|
|
"ConditioningSetMask": ConditioningSetMask,
|
|
"KSamplerAdvanced": KSamplerAdvanced,
|
|
"SetLatentNoiseMask": SetLatentNoiseMask,
|
|
"LatentComposite": LatentComposite,
|
|
"LatentBlend": LatentBlend,
|
|
"LatentRotate": LatentRotate,
|
|
"LatentFlip": LatentFlip,
|
|
"LatentCrop": LatentCrop,
|
|
"LoraLoader": LoraLoader,
|
|
"CLIPLoader": CLIPLoader,
|
|
"UNETLoader": UNETLoader,
|
|
"DualCLIPLoader": DualCLIPLoader,
|
|
"CLIPVisionEncode": CLIPVisionEncode,
|
|
"StyleModelApply": StyleModelApply,
|
|
"unCLIPConditioning": unCLIPConditioning,
|
|
"ControlNetApply": ControlNetApply,
|
|
"ControlNetApplyAdvanced": ControlNetApplyAdvanced,
|
|
"ControlNetLoader": ControlNetLoader,
|
|
"ControlNetLoaderWeights": ControlNetLoaderWeights,
|
|
"DiffControlNetLoader": DiffControlNetLoader,
|
|
"StyleModelLoader": StyleModelLoader,
|
|
"CLIPVisionLoader": CLIPVisionLoader,
|
|
"VAEDecodeTiled": VAEDecodeTiled,
|
|
"VAEEncodeTiled": VAEEncodeTiled,
|
|
"unCLIPCheckpointLoader": unCLIPCheckpointLoader,
|
|
"GLIGENLoader": GLIGENLoader,
|
|
"GLIGENTextBoxApply": GLIGENTextBoxApply,
|
|
"InpaintModelConditioning": InpaintModelConditioning,
|
|
|
|
"CheckpointLoader": CheckpointLoader,
|
|
"DiffusersLoader": DiffusersLoader,
|
|
|
|
"LoadLatent": LoadLatent,
|
|
"SaveLatent": SaveLatent,
|
|
|
|
"ConditioningZeroOut": ConditioningZeroOut,
|
|
"ConditioningSetTimestepRange": ConditioningSetTimestepRange,
|
|
"LoraLoaderModelOnly": LoraLoaderModelOnly,
|
|
}
|
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
|
# Sampling
|
|
"KSampler": "KSampler",
|
|
"KSamplerAdvanced": "KSampler (Advanced)",
|
|
# Loaders
|
|
"CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)",
|
|
"CheckpointLoaderSimple": "Load Checkpoint",
|
|
"VAELoader": "Load VAE",
|
|
"LoraLoader": "Load LoRA",
|
|
"CLIPLoader": "Load CLIP",
|
|
"ControlNetLoader": "Load ControlNet Model",
|
|
"ControlNetLoaderWeights": "Load ControlNet Model (Weights)",
|
|
"DiffControlNetLoader": "Load ControlNet Model (diff)",
|
|
"StyleModelLoader": "Load Style Model",
|
|
"CLIPVisionLoader": "Load CLIP Vision",
|
|
"UNETLoader": "Load Diffusion Model",
|
|
# Conditioning
|
|
"CLIPVisionEncode": "CLIP Vision Encode",
|
|
"StyleModelApply": "Apply Style Model",
|
|
"CLIPTextEncode": "CLIP Text Encode (Prompt)",
|
|
"CLIPSetLastLayer": "CLIP Set Last Layer",
|
|
"ConditioningCombine": "Conditioning (Combine)",
|
|
"ConditioningAverage ": "Conditioning (Average)",
|
|
"ConditioningConcat": "Conditioning (Concat)",
|
|
"ConditioningSetArea": "Conditioning (Set Area)",
|
|
"ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)",
|
|
"ConditioningSetMask": "Conditioning (Set Mask)",
|
|
"ControlNetApply": "Apply ControlNet (OLD)",
|
|
"ControlNetApplyAdvanced": "Apply ControlNet",
|
|
# Latent
|
|
"VAEEncodeForInpaint": "VAE Encode (for Inpainting)",
|
|
"SetLatentNoiseMask": "Set Latent Noise Mask",
|
|
"VAEDecode": "VAE Decode",
|
|
"VAEEncode": "VAE Encode",
|
|
"LatentRotate": "Rotate Latent",
|
|
"LatentFlip": "Flip Latent",
|
|
"LatentCrop": "Crop Latent",
|
|
"EmptyLatentImage": "Empty Latent Image",
|
|
"LatentUpscale": "Upscale Latent",
|
|
"LatentUpscaleBy": "Upscale Latent By",
|
|
"LatentComposite": "Latent Composite",
|
|
"LatentBlend": "Latent Blend",
|
|
"LatentFromBatch": "Latent From Batch",
|
|
"RepeatLatentBatch": "Repeat Latent Batch",
|
|
# Image
|
|
"SaveImage": "Save Image",
|
|
"PreviewImage": "Preview Image",
|
|
"LoadImage": "Load Image",
|
|
"LoadImageMask": "Load Image (as Mask)",
|
|
"LoadImageOutput": "Load Image (from Outputs)",
|
|
"ImageScale": "Upscale Image",
|
|
"ImageScaleBy": "Upscale Image By",
|
|
"ImageInvert": "Invert Image",
|
|
"ImagePadForOutpaint": "Pad Image for Outpainting",
|
|
"ImageBatch": "Batch Images",
|
|
"ImageCrop": "Image Crop",
|
|
"ImageStitch": "Image Stitch",
|
|
"ImageBlend": "Image Blend",
|
|
"ImageBlur": "Image Blur",
|
|
"ImageQuantize": "Image Quantize",
|
|
"ImageSharpen": "Image Sharpen",
|
|
"ImageScaleToTotalPixels": "Scale Image to Total Pixels",
|
|
"GetImageSize": "Get Image Size",
|
|
# _for_testing
|
|
"VAEDecodeTiled": "VAE Decode (Tiled)",
|
|
"VAEEncodeTiled": "VAE Encode (Tiled)",
|
|
}
|