mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-11 23:00:51 +08:00
- Experimental support for sage attention on Linux - Diffusers loader now supports model indices - Transformers model management now aligns with updates to ComfyUI - Flux layers correctly use unbind - Add float8 support for model loading in more places - Experimental quantization approaches from Quanto and torchao - Model upscaling interacts with memory management better This update also disables ROCm testing because it isn't reliable enough on consumer hardware. ROCm is not really supported by the 7600.
1984 lines
76 KiB
Python
1984 lines
76 KiB
Python
import torch
|
|
|
|
import os
|
|
import json
|
|
import hashlib
|
|
import math
|
|
import random
|
|
import logging
|
|
|
|
from PIL import Image, ImageOps, ImageSequence
|
|
from PIL.PngImagePlugin import PngInfo
|
|
from huggingface_hub import snapshot_download
|
|
from natsort import natsorted
|
|
import numpy as np
|
|
import safetensors.torch
|
|
|
|
from .. import diffusers_load
|
|
from .. import samplers
|
|
from .. import sample
|
|
from .. import sd
|
|
from .. import utils
|
|
from .. import clip_vision as clip_vision_module
|
|
from .. import model_management
|
|
from ..cli_args import args
|
|
|
|
from ..cmd import folder_paths, latent_preview
|
|
from ..component_model.tensor_types import RGBImage, RGBImageBatch, MaskBatch
|
|
from ..execution_context import current_execution_context
|
|
from ..images import open_image
|
|
from ..ldm.flux.weight_dtypes import FLUX_WEIGHT_DTYPES
|
|
from ..model_downloader import get_filename_list_with_downloadable, get_or_download, KNOWN_CHECKPOINTS, KNOWN_CLIP_VISION_MODELS, KNOWN_GLIGEN_MODELS, KNOWN_UNCLIP_CHECKPOINTS, KNOWN_LORAS, KNOWN_CONTROLNETS, KNOWN_DIFF_CONTROLNETS, KNOWN_VAES, KNOWN_APPROX_VAES, get_huggingface_repo_list, KNOWN_CLIP_MODELS, KNOWN_UNET_MODELS
|
|
from ..nodes.common import MAX_RESOLUTION
|
|
from .. import controlnet
|
|
from ..open_exr import load_exr
|
|
from .. import node_helpers
|
|
from ..sd import VAE
|
|
from ..utils import comfy_tqdm
|
|
|
|
|
|
class CLIPTextEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"text": ("STRING", {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}),
|
|
"clip": ("CLIP", {"tooltip": "The CLIP model used for encoding the text."})
|
|
}
|
|
}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images."
|
|
|
|
def encode(self, clip, text):
|
|
tokens = clip.tokenize(text)
|
|
output = clip.encode_from_tokens(tokens, return_pooled=True, return_dict=True)
|
|
cond = output.pop("cond")
|
|
return ([[cond, output]], )
|
|
|
|
class ConditioningCombine:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "combine"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def combine(self, conditioning_1, conditioning_2):
|
|
return (conditioning_1 + conditioning_2, )
|
|
|
|
class ConditioningAverage :
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ),
|
|
"conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "addWeighted"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
pooled_output_from = conditioning_from[0][1].get("pooled_output", None)
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from)
|
|
t0 = cond_from[:,:t1.shape[1]]
|
|
if t0.shape[1] < t1.shape[1]:
|
|
t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1)
|
|
|
|
tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength))
|
|
t_to = conditioning_to[i][1].copy()
|
|
if pooled_output_from is not None and pooled_output_to is not None:
|
|
t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength))
|
|
elif pooled_output_from is not None:
|
|
t_to["pooled_output"] = pooled_output_from
|
|
|
|
n = [tw, t_to]
|
|
out.append(n)
|
|
return (out, )
|
|
|
|
class ConditioningConcat:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"conditioning_to": ("CONDITIONING",),
|
|
"conditioning_from": ("CONDITIONING",),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "concat"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def concat(self, conditioning_to, conditioning_from):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
tw = torch.cat((t1, cond_from),1)
|
|
n = [tw, conditioning_to[i][1].copy()]
|
|
out.append(n)
|
|
|
|
return (out, )
|
|
|
|
class ConditioningSetArea:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c, )
|
|
|
|
class ConditioningSetAreaPercentage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c, )
|
|
|
|
class ConditioningSetAreaStrength:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"strength": strength})
|
|
return (c, )
|
|
|
|
|
|
class ConditioningSetMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"mask": ("MASK", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"set_cond_area": (["default", "mask bounds"],),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, mask, set_cond_area, strength):
|
|
set_area_to_bounds = False
|
|
if set_cond_area != "default":
|
|
set_area_to_bounds = True
|
|
if len(mask.shape) < 3:
|
|
mask = mask.unsqueeze(0)
|
|
|
|
c = node_helpers.conditioning_set_values(conditioning, {"mask": mask,
|
|
"set_area_to_bounds": set_area_to_bounds,
|
|
"mask_strength": strength})
|
|
return (c, )
|
|
|
|
class ConditioningZeroOut:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "zero_out"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def zero_out(self, conditioning):
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
pooled_output = d.get("pooled_output", None)
|
|
if pooled_output is not None:
|
|
d["pooled_output"] = torch.zeros_like(pooled_output)
|
|
n = [torch.zeros_like(t[0]), d]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class ConditioningSetTimestepRange:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "set_range"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def set_range(self, conditioning, start, end):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start,
|
|
"end_percent": end})
|
|
return (c, )
|
|
|
|
class VAEDecode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"samples": ("LATENT", {"tooltip": "The latent to be decoded."}),
|
|
"vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."})
|
|
}
|
|
}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
OUTPUT_TOOLTIPS = ("The decoded image.",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "latent"
|
|
DESCRIPTION = "Decodes latent images back into pixel space images."
|
|
|
|
def decode(self, vae, samples):
|
|
return (vae.decode(samples["samples"]), )
|
|
|
|
class VAEDecodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ),
|
|
"tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
|
|
}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def decode(self, vae, samples, tile_size):
|
|
return (vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, ), )
|
|
|
|
class VAEEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def encode(self, vae: VAE, pixels):
|
|
t = vae.encode(pixels[:,:,:,:3])
|
|
return ({"samples":t}, )
|
|
|
|
class VAEEncodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ),
|
|
"tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def encode(self, vae, pixels, tile_size):
|
|
t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, )
|
|
return ({"samples":t}, )
|
|
|
|
class VAEEncodeForInpaint:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def encode(self, vae, pixels, mask, grow_mask_by=6):
|
|
x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio
|
|
y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
pixels = pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2
|
|
y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2
|
|
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
|
|
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
#grow mask by a few pixels to keep things seamless in latent space
|
|
if grow_mask_by == 0:
|
|
mask_erosion = mask
|
|
else:
|
|
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
|
|
padding = math.ceil((grow_mask_by - 1) / 2)
|
|
|
|
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1)
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:,:,:,i] -= 0.5
|
|
pixels[:,:,:,i] *= m
|
|
pixels[:,:,:,i] += 0.5
|
|
t = vae.encode(pixels)
|
|
|
|
return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, )
|
|
|
|
|
|
class InpaintModelConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"vae": ("VAE", ),
|
|
"pixels": ("IMAGE", ),
|
|
"mask": ("MASK", ),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT")
|
|
RETURN_NAMES = ("positive", "negative", "latent")
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning/inpaint"
|
|
|
|
def encode(self, positive, negative, pixels, vae, mask):
|
|
x = (pixels.shape[1] // 8) * 8
|
|
y = (pixels.shape[2] // 8) * 8
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
orig_pixels = pixels
|
|
pixels = orig_pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % 8) // 2
|
|
y_offset = (pixels.shape[2] % 8) // 2
|
|
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
|
|
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:,:,:,i] -= 0.5
|
|
pixels[:,:,:,i] *= m
|
|
pixels[:,:,:,i] += 0.5
|
|
concat_latent = vae.encode(pixels)
|
|
orig_latent = vae.encode(orig_pixels)
|
|
|
|
out_latent = {}
|
|
|
|
out_latent["samples"] = orig_latent
|
|
out_latent["noise_mask"] = mask
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent,
|
|
"concat_mask": mask})
|
|
out.append(c)
|
|
return (out[0], out[1], out_latent)
|
|
|
|
|
|
class SaveLatent:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT", ),
|
|
"filename_prefix": ("STRING", {"default": "latents/ComfyUI"})},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir)
|
|
|
|
# support save metadata for latent sharing
|
|
prompt_info = ""
|
|
if prompt is not None:
|
|
prompt_info = json.dumps(prompt)
|
|
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = {"prompt": prompt_info}
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata[x] = json.dumps(extra_pnginfo[x])
|
|
|
|
file = f"{filename}_{counter:05}_.latent"
|
|
|
|
results = list()
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": "output"
|
|
})
|
|
|
|
file = os.path.join(full_output_folder, file)
|
|
|
|
output = {}
|
|
output["latent_tensor"] = samples["samples"]
|
|
output["latent_format_version_0"] = torch.tensor([])
|
|
|
|
utils.save_torch_file(output, file, metadata=metadata)
|
|
return { "ui": { "latents": results } }
|
|
|
|
|
|
class LoadLatent:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")]
|
|
return {"required": {"latent": [sorted(files), ]}, }
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
RETURN_TYPES = ("LATENT", )
|
|
FUNCTION = "load"
|
|
|
|
def load(self, latent):
|
|
latent_path = folder_paths.get_annotated_filepath(latent)
|
|
latent = safetensors.torch.load_file(latent_path, device="cpu")
|
|
multiplier = 1.0
|
|
if "latent_format_version_0" not in latent:
|
|
multiplier = 1.0 / 0.18215
|
|
samples = {"samples": latent["latent_tensor"].float() * multiplier}
|
|
return (samples, )
|
|
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, latent):
|
|
if not folder_paths.exists_annotated_filepath(latent):
|
|
return "Invalid latent file: {}".format(latent)
|
|
return True
|
|
|
|
|
|
class CheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "config_name": (folder_paths.get_filename_list("configs"),),
|
|
"ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_CHECKPOINTS),)}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
DEPRECATED = True
|
|
|
|
def load_checkpoint(self, config_name, ckpt_name):
|
|
config_path = folder_paths.get_full_path("configs", config_name)
|
|
ckpt_path = get_or_download("checkpoints", ckpt_name, KNOWN_CHECKPOINTS)
|
|
return sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
|
|
class CheckpointLoaderSimple:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_CHECKPOINTS),{"tooltip": "The name of the checkpoint (model) to load."}),
|
|
}
|
|
}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
OUTPUT_TOOLTIPS = ("The model used for denoising latents.",
|
|
"The CLIP model used for encoding text prompts.",
|
|
"The VAE model used for encoding and decoding images to and from latent space.")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents."
|
|
|
|
def load_checkpoint(self, ckpt_name):
|
|
ckpt_path = get_or_download("checkpoints", ckpt_name, KNOWN_CHECKPOINTS)
|
|
out = sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out[:3]
|
|
|
|
class DiffusersLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
paths = []
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
for root, subdir, files in os.walk(search_path, followlinks=True):
|
|
if "model_index.json" in files:
|
|
paths.append(os.path.relpath(root, start=search_path))
|
|
|
|
paths += get_huggingface_repo_list()
|
|
paths = list(frozenset(paths))
|
|
return {"required": {"model_path": (paths,),
|
|
"weight_dtype": (FLUX_WEIGHT_DTYPES,)
|
|
}}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_checkpoint(self, model_path, output_vae=True, output_clip=True,weight_dtype:str="default"):
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
path = os.path.join(search_path, model_path)
|
|
if os.path.exists(path):
|
|
model_path = path
|
|
break
|
|
if not os.path.exists(model_path):
|
|
with comfy_tqdm():
|
|
model_path = snapshot_download(model_path)
|
|
|
|
model_options = get_model_options_for_dtype(weight_dtype)
|
|
return diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings"), model_options=model_options)
|
|
|
|
|
|
class unCLIPCheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_UNCLIP_CHECKPOINTS),),
|
|
}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
|
|
ckpt_path = get_or_download("checkpoints", ckpt_name, KNOWN_UNCLIP_CHECKPOINTS)
|
|
out = sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out
|
|
|
|
class CLIPSetLastLayer:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip": ("CLIP", ),
|
|
"stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "set_last_layer"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def set_last_layer(self, clip, stop_at_clip_layer):
|
|
clip = clip.clone()
|
|
clip.clip_layer(stop_at_clip_layer)
|
|
return (clip,)
|
|
|
|
class LoraLoader:
|
|
def __init__(self):
|
|
self.loaded_lora = None
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}),
|
|
"clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}),
|
|
"lora_name": (get_filename_list_with_downloadable("loras", KNOWN_LORAS),{"tooltip": "The name of the LoRA."}),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}),
|
|
"strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP")
|
|
OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.")
|
|
FUNCTION = "load_lora"
|
|
|
|
CATEGORY = "loaders"
|
|
DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together."
|
|
|
|
def load_lora(self, model, clip, lora_name, strength_model, strength_clip):
|
|
if strength_model == 0 and strength_clip == 0:
|
|
return (model, clip)
|
|
|
|
lora_path = get_or_download("loras", lora_name, KNOWN_LORAS)
|
|
lora = None
|
|
if self.loaded_lora is not None:
|
|
if self.loaded_lora[0] == lora_path:
|
|
lora = self.loaded_lora[1]
|
|
else:
|
|
temp = self.loaded_lora
|
|
self.loaded_lora = None
|
|
del temp
|
|
|
|
if lora is None:
|
|
lora = utils.load_torch_file(lora_path, safe_load=True)
|
|
self.loaded_lora = (lora_path, lora)
|
|
|
|
model_lora, clip_lora = sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip)
|
|
return (model_lora, clip_lora)
|
|
|
|
class LoraLoaderModelOnly(LoraLoader):
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"lora_name": (get_filename_list_with_downloadable("loras"), ),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_lora_model_only"
|
|
|
|
def load_lora_model_only(self, model, lora_name, strength_model):
|
|
return (self.load_lora(model, None, lora_name, strength_model, 0)[0],)
|
|
|
|
class VAELoader:
|
|
@staticmethod
|
|
def vae_list():
|
|
vaes = get_filename_list_with_downloadable("vae", KNOWN_VAES)
|
|
approx_vaes = get_filename_list_with_downloadable("vae_approx", KNOWN_APPROX_VAES)
|
|
sdxl_taesd_enc = False
|
|
sdxl_taesd_dec = False
|
|
sd1_taesd_enc = False
|
|
sd1_taesd_dec = False
|
|
sd3_taesd_enc = False
|
|
sd3_taesd_dec = False
|
|
f1_taesd_enc = False
|
|
f1_taesd_dec = False
|
|
|
|
for v in approx_vaes:
|
|
if v.startswith("taesd_decoder."):
|
|
sd1_taesd_dec = True
|
|
elif v.startswith("taesd_encoder."):
|
|
sd1_taesd_enc = True
|
|
elif v.startswith("taesdxl_decoder."):
|
|
sdxl_taesd_dec = True
|
|
elif v.startswith("taesdxl_encoder."):
|
|
sdxl_taesd_enc = True
|
|
elif v.startswith("taesd3_decoder."):
|
|
sd3_taesd_dec = True
|
|
elif v.startswith("taesd3_encoder."):
|
|
sd3_taesd_enc = True
|
|
elif v.startswith("taef1_encoder."):
|
|
f1_taesd_dec = True
|
|
elif v.startswith("taef1_decoder."):
|
|
f1_taesd_enc = True
|
|
if sd1_taesd_dec and sd1_taesd_enc:
|
|
vaes.append("taesd")
|
|
if sdxl_taesd_dec and sdxl_taesd_enc:
|
|
vaes.append("taesdxl")
|
|
if sd3_taesd_dec and sd3_taesd_enc:
|
|
vaes.append("taesd3")
|
|
if f1_taesd_dec and f1_taesd_enc:
|
|
vaes.append("taef1")
|
|
return vaes
|
|
|
|
@staticmethod
|
|
def load_taesd(name):
|
|
sd_ = {}
|
|
approx_vaes = folder_paths.get_filename_list("vae_approx")
|
|
|
|
encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes))
|
|
decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes))
|
|
|
|
enc = utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder))
|
|
for k in enc:
|
|
sd_["taesd_encoder.{}".format(k)] = enc[k]
|
|
|
|
dec = utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder))
|
|
for k in dec:
|
|
sd_["taesd_decoder.{}".format(k)] = dec[k]
|
|
|
|
if name == "taesd":
|
|
sd_["vae_scale"] = torch.tensor(0.18215)
|
|
sd_["vae_shift"] = torch.tensor(0.0)
|
|
elif name == "taesdxl":
|
|
sd_["vae_scale"] = torch.tensor(0.13025)
|
|
sd_["vae_shift"] = torch.tensor(0.0)
|
|
elif name == "taesd3":
|
|
sd_["vae_scale"] = torch.tensor(1.5305)
|
|
sd_["vae_shift"] = torch.tensor(0.0609)
|
|
elif name == "taef1":
|
|
sd_["vae_scale"] = torch.tensor(0.3611)
|
|
sd_["vae_shift"] = torch.tensor(0.1159)
|
|
return sd_
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "vae_name": (s.vae_list(),)}}
|
|
RETURN_TYPES = ("VAE",)
|
|
FUNCTION = "load_vae"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
#TODO: scale factor?
|
|
def load_vae(self, vae_name):
|
|
if vae_name in ["taesd", "taesdxl", "taesd3", "taef1"]:
|
|
sd_ = self.load_taesd(vae_name)
|
|
else:
|
|
vae_path = get_or_download("vae", vae_name, KNOWN_VAES)
|
|
sd_ = utils.load_torch_file(vae_path)
|
|
vae = sd.VAE(sd=sd_)
|
|
return (vae,)
|
|
|
|
class ControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "control_net_name": (get_filename_list_with_downloadable("controlnet", KNOWN_CONTROLNETS),)}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, control_net_name):
|
|
controlnet_path = get_or_download("controlnet", control_net_name, KNOWN_CONTROLNETS)
|
|
controlnet_ = controlnet.load_controlnet(controlnet_path)
|
|
return (controlnet_,)
|
|
|
|
|
|
class ControlNetLoaderWeights:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"control_net_name": (get_filename_list_with_downloadable("controlnet", KNOWN_CONTROLNETS),),
|
|
"weight_dtype": (FLUX_WEIGHT_DTYPES,)
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, control_net_name, weight_dtype):
|
|
controlnet_path = get_or_download("controlnet", control_net_name, KNOWN_CONTROLNETS)
|
|
model_options = {}
|
|
if weight_dtype == "float8_e5m2":
|
|
model_options["dtype"] = torch.float8_e5m2
|
|
elif weight_dtype == "float8_e4m3fn":
|
|
model_options["dtype"] = torch.float8_e4m3fn
|
|
|
|
controlnet_ = controlnet.load_controlnet(controlnet_path, model_options=model_options)
|
|
return (controlnet_,)
|
|
|
|
class DiffControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"control_net_name": (get_filename_list_with_downloadable("controlnet", KNOWN_DIFF_CONTROLNETS),)}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, model, control_net_name):
|
|
controlnet_path = get_or_download("controlnet", control_net_name, KNOWN_DIFF_CONTROLNETS)
|
|
controlnet_ = controlnet.load_controlnet(controlnet_path, model)
|
|
return (controlnet_,)
|
|
|
|
|
|
class ControlNetApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"control_net": ("CONTROL_NET", ),
|
|
"image": ("IMAGE", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
DEPRECATED = True
|
|
CATEGORY = "conditioning/controlnet"
|
|
|
|
def apply_controlnet(self, conditioning, control_net, image: RGBImageBatch, strength):
|
|
if strength == 0:
|
|
return (conditioning, )
|
|
|
|
c = []
|
|
control_hint = image.movedim(-1,1)
|
|
for t in conditioning:
|
|
n = [t[0], t[1].copy()]
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength)
|
|
if 'control' in t[1]:
|
|
c_net.set_previous_controlnet(t[1]['control'])
|
|
n[1]['control'] = c_net
|
|
n[1]['control_apply_to_uncond'] = True
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
|
|
class ControlNetApplyAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"control_net": ("CONTROL_NET", ),
|
|
"image": ("IMAGE", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
},
|
|
"optional": {"vae": ("VAE", ),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("CONDITIONING","CONDITIONING")
|
|
RETURN_NAMES = ("positive", "negative")
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
CATEGORY = "conditioning/controlnet"
|
|
|
|
def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]):
|
|
if strength == 0:
|
|
return (positive, negative)
|
|
|
|
control_hint = image.movedim(-1,1)
|
|
cnets = {}
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
|
|
prev_cnet = d.get('control', None)
|
|
if prev_cnet in cnets:
|
|
c_net = cnets[prev_cnet]
|
|
else:
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat)
|
|
c_net.set_previous_controlnet(prev_cnet)
|
|
cnets[prev_cnet] = c_net
|
|
|
|
d['control'] = c_net
|
|
d['control_apply_to_uncond'] = False
|
|
n = [t[0], d]
|
|
c.append(n)
|
|
out.append(c)
|
|
return (out[0], out[1])
|
|
|
|
def get_model_options_for_dtype(weight_dtype):
|
|
model_options = {}
|
|
if weight_dtype == "fp8_e4m3fn":
|
|
model_options["dtype"] = torch.float8_e4m3fn
|
|
elif weight_dtype == "fp8_e5m2":
|
|
model_options["dtype"] = torch.float8_e5m2
|
|
return model_options
|
|
|
|
|
|
class UNETLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "unet_name": (get_filename_list_with_downloadable("diffusion_models", KNOWN_UNET_MODELS),),
|
|
"weight_dtype": (FLUX_WEIGHT_DTYPES,)
|
|
}}
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_unet"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_unet(self, unet_name, weight_dtype):
|
|
model_options = get_model_options_for_dtype(weight_dtype)
|
|
|
|
unet_path = get_or_download("diffusion_models", unet_name, KNOWN_UNET_MODELS)
|
|
model = sd.load_diffusion_model(unet_path, model_options=model_options)
|
|
return (model,)
|
|
|
|
|
|
|
|
class CLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name": (get_filename_list_with_downloadable("clip", KNOWN_CLIP_MODELS),),
|
|
"type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio"], ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_clip(self, clip_name, type="stable_diffusion"):
|
|
clip_type = sd.CLIPType.STABLE_DIFFUSION
|
|
if type == "stable_cascade":
|
|
clip_type = sd.CLIPType.STABLE_CASCADE
|
|
elif type == "sd3":
|
|
clip_type = sd.CLIPType.SD3
|
|
elif type == "stable_audio":
|
|
clip_type = sd.CLIPType.STABLE_AUDIO
|
|
else:
|
|
logging.warning(f"Unknown clip type argument passed: {type} for model {clip_name}")
|
|
|
|
clip_path = get_or_download("clip", clip_name, KNOWN_CLIP_MODELS)
|
|
clip = sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type)
|
|
return (clip,)
|
|
|
|
class DualCLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name1": (get_filename_list_with_downloadable("clip"),), "clip_name2": (
|
|
get_filename_list_with_downloadable("clip"),),
|
|
"type": (["sdxl", "sd3", "flux"], ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_clip(self, clip_name1, clip_name2, type):
|
|
clip_path1 = get_or_download("clip", clip_name1)
|
|
clip_path2 = get_or_download("clip", clip_name2)
|
|
if type == "sdxl":
|
|
clip_type = sd.CLIPType.STABLE_DIFFUSION
|
|
elif type == "sd3":
|
|
clip_type = sd.CLIPType.SD3
|
|
elif type == "flux":
|
|
clip_type = sd.CLIPType.FLUX
|
|
else:
|
|
raise ValueError(f"Unknown clip type argument passed: {type} for model {clip_name1} and {clip_name2}")
|
|
|
|
clip = sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type)
|
|
return (clip,)
|
|
|
|
class CLIPVisionLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name": (get_filename_list_with_downloadable("clip_vision", KNOWN_CLIP_VISION_MODELS),),
|
|
}}
|
|
RETURN_TYPES = ("CLIP_VISION",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_clip(self, clip_name):
|
|
clip_path = get_or_download("clip_vision", clip_name, KNOWN_CLIP_VISION_MODELS)
|
|
clip_vision = clip_vision_module.load(clip_path)
|
|
return (clip_vision,)
|
|
|
|
class CLIPVisionEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_vision": ("CLIP_VISION",),
|
|
"image": ("IMAGE",)
|
|
}}
|
|
RETURN_TYPES = ("CLIP_VISION_OUTPUT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def encode(self, clip_vision, image):
|
|
output = clip_vision.encode_image(image)
|
|
return (output,)
|
|
|
|
class StyleModelLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "style_model_name": (get_filename_list_with_downloadable("style_models"),)}}
|
|
|
|
RETURN_TYPES = ("STYLE_MODEL",)
|
|
FUNCTION = "load_style_model"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_style_model(self, style_model_name):
|
|
style_model_path = get_or_download("style_models", style_model_name)
|
|
style_model = sd.load_style_model(style_model_path)
|
|
return (style_model,)
|
|
|
|
|
|
class StyleModelApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"style_model": ("STYLE_MODEL", ),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_stylemodel"
|
|
|
|
CATEGORY = "conditioning/style_model"
|
|
|
|
def apply_stylemodel(self, clip_vision_output, style_model, conditioning):
|
|
cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0)
|
|
c = []
|
|
for t in conditioning:
|
|
n = [torch.cat((t[0], cond), dim=1), t[1].copy()]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class unCLIPConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_adm"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation):
|
|
if strength == 0:
|
|
return (conditioning, )
|
|
|
|
c = []
|
|
for t in conditioning:
|
|
o = t[1].copy()
|
|
x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}
|
|
if "unclip_conditioning" in o:
|
|
o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x]
|
|
else:
|
|
o["unclip_conditioning"] = [x]
|
|
n = [t[0], o]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class GLIGENLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "gligen_name": (get_filename_list_with_downloadable("gligen", KNOWN_GLIGEN_MODELS),)}}
|
|
|
|
RETURN_TYPES = ("GLIGEN",)
|
|
FUNCTION = "load_gligen"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_gligen(self, gligen_name):
|
|
gligen_path = get_or_download("gligen", gligen_name, KNOWN_GLIGEN_MODELS)
|
|
gligen = sd.load_gligen(gligen_path)
|
|
return (gligen,)
|
|
|
|
class GLIGENTextBoxApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING", ),
|
|
"clip": ("CLIP", ),
|
|
"gligen_textbox_model": ("GLIGEN", ),
|
|
"text": ("STRING", {"multiline": True, "dynamicPrompts": True}),
|
|
"width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning/gligen"
|
|
|
|
def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y):
|
|
c = []
|
|
cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected")
|
|
for t in conditioning_to:
|
|
n = [t[0], t[1].copy()]
|
|
position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)]
|
|
prev = []
|
|
if "gligen" in n[1]:
|
|
prev = n[1]['gligen'][2]
|
|
|
|
n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params)
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class EmptyLatentImage:
|
|
def __init__(self):
|
|
self.device = model_management.intermediate_device()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}),
|
|
"height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."})
|
|
}
|
|
}
|
|
RETURN_TYPES = ("LATENT",)
|
|
OUTPUT_TOOLTIPS = ("The empty latent image batch.",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "latent"
|
|
DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling."
|
|
|
|
def generate(self, width, height, batch_size=1):
|
|
latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device)
|
|
return ({"samples":latent}, )
|
|
|
|
|
|
class LatentFromBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"batch_index": ("INT", {"default": 0, "min": 0, "max": 63}),
|
|
"length": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "frombatch"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def frombatch(self, samples, batch_index, length):
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
batch_index = min(s_in.shape[0] - 1, batch_index)
|
|
length = min(s_in.shape[0] - batch_index, length)
|
|
s["samples"] = s_in[batch_index:batch_index + length].clone()
|
|
if "noise_mask" in samples:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] == 1:
|
|
s["noise_mask"] = masks.clone()
|
|
else:
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = masks[batch_index:batch_index + length].clone()
|
|
if "batch_index" not in s:
|
|
s["batch_index"] = [x for x in range(batch_index, batch_index+length)]
|
|
else:
|
|
s["batch_index"] = samples["batch_index"][batch_index:batch_index + length]
|
|
return (s,)
|
|
|
|
class RepeatLatentBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"amount": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "repeat"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def repeat(self, samples, amount):
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
|
|
s["samples"] = s_in.repeat((amount, 1,1,1))
|
|
if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1))
|
|
if "batch_index" in s:
|
|
offset = max(s["batch_index"]) - min(s["batch_index"]) + 1
|
|
s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]]
|
|
return (s,)
|
|
|
|
class LatentUpscale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"crop": (s.crop_methods,)}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, width, height, crop):
|
|
if width == 0 and height == 0:
|
|
s = samples
|
|
else:
|
|
s = samples.copy()
|
|
|
|
if width == 0:
|
|
height = max(64, height)
|
|
width = max(64, round(samples["samples"].shape[3] * height / samples["samples"].shape[2]))
|
|
elif height == 0:
|
|
width = max(64, width)
|
|
height = max(64, round(samples["samples"].shape[2] * width / samples["samples"].shape[3]))
|
|
else:
|
|
width = max(64, width)
|
|
height = max(64, height)
|
|
|
|
s["samples"] = utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop)
|
|
return (s,)
|
|
|
|
class LatentUpscaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, scale_by):
|
|
s = samples.copy()
|
|
width = round(samples["samples"].shape[3] * scale_by)
|
|
height = round(samples["samples"].shape[2] * scale_by)
|
|
s["samples"] = utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled")
|
|
return (s,)
|
|
|
|
class LatentRotate:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "rotate"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def rotate(self, samples, rotation):
|
|
s = samples.copy()
|
|
rotate_by = 0
|
|
if rotation.startswith("90"):
|
|
rotate_by = 1
|
|
elif rotation.startswith("180"):
|
|
rotate_by = 2
|
|
elif rotation.startswith("270"):
|
|
rotate_by = 3
|
|
|
|
s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2])
|
|
return (s,)
|
|
|
|
class LatentFlip:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"flip_method": (["x-axis: vertically", "y-axis: horizontally"],),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "flip"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def flip(self, samples, flip_method):
|
|
s = samples.copy()
|
|
if flip_method.startswith("x"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[2])
|
|
elif flip_method.startswith("y"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[3])
|
|
|
|
return (s,)
|
|
|
|
class LatentComposite:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples_to": ("LATENT",),
|
|
"samples_from": ("LATENT",),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "composite"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0):
|
|
x = x // 8
|
|
y = y // 8
|
|
feather = feather // 8
|
|
samples_out = samples_to.copy()
|
|
s = samples_to["samples"].clone()
|
|
samples_to = samples_to["samples"]
|
|
samples_from = samples_from["samples"]
|
|
if feather == 0:
|
|
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
else:
|
|
samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
mask = torch.ones_like(samples_from)
|
|
for t in range(feather):
|
|
if y != 0:
|
|
mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1))
|
|
|
|
if y + samples_from.shape[2] < samples_to.shape[2]:
|
|
mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1))
|
|
if x != 0:
|
|
mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1))
|
|
if x + samples_from.shape[3] < samples_to.shape[3]:
|
|
mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1))
|
|
rev_mask = torch.ones_like(mask) - mask
|
|
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask
|
|
samples_out["samples"] = s
|
|
return (samples_out,)
|
|
|
|
class LatentBlend:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"samples1": ("LATENT",),
|
|
"samples2": ("LATENT",),
|
|
"blend_factor": ("FLOAT", {
|
|
"default": 0.5,
|
|
"min": 0,
|
|
"max": 1,
|
|
"step": 0.01
|
|
}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "blend"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"):
|
|
|
|
samples_out = samples1.copy()
|
|
samples1 = samples1["samples"]
|
|
samples2 = samples2["samples"]
|
|
|
|
if samples1.shape != samples2.shape:
|
|
samples2.permute(0, 3, 1, 2)
|
|
samples2 = utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center')
|
|
samples2.permute(0, 2, 3, 1)
|
|
|
|
samples_blended = self.blend_mode(samples1, samples2, blend_mode)
|
|
samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor)
|
|
samples_out["samples"] = samples_blended
|
|
return (samples_out,)
|
|
|
|
def blend_mode(self, img1, img2, mode):
|
|
if mode == "normal":
|
|
return img2
|
|
else:
|
|
raise ValueError(f"Unsupported blend mode: {mode}")
|
|
|
|
class LatentCrop:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "crop"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def crop(self, samples, width, height, x, y):
|
|
s = samples.copy()
|
|
samples = samples['samples']
|
|
x = x // 8
|
|
y = y // 8
|
|
|
|
#enfonce minimum size of 64
|
|
if x > (samples.shape[3] - 8):
|
|
x = samples.shape[3] - 8
|
|
if y > (samples.shape[2] - 8):
|
|
y = samples.shape[2] - 8
|
|
|
|
new_height = height // 8
|
|
new_width = width // 8
|
|
to_x = new_width + x
|
|
to_y = new_height + y
|
|
s['samples'] = samples[:,:,y:to_y, x:to_x]
|
|
return (s,)
|
|
|
|
class SetLatentNoiseMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"mask": ("MASK",),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "set_mask"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def set_mask(self, samples, mask):
|
|
s = samples.copy()
|
|
s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
|
|
return (s,)
|
|
|
|
def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False):
|
|
latent_image = latent["samples"]
|
|
latent_image = sample.fix_empty_latent_channels(model, latent_image)
|
|
|
|
if disable_noise:
|
|
noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu")
|
|
else:
|
|
batch_inds = latent["batch_index"] if "batch_index" in latent else None
|
|
noise = sample.prepare_noise(latent_image, seed, batch_inds)
|
|
|
|
noise_mask = None
|
|
if "noise_mask" in latent:
|
|
noise_mask = latent["noise_mask"]
|
|
|
|
callback = latent_preview.prepare_callback(model, steps)
|
|
disable_pbar = not current_execution_context().server.receive_all_progress_notifications
|
|
samples = sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
|
|
denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step,
|
|
force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed)
|
|
out = latent.copy()
|
|
out["samples"] = samples
|
|
return (out, )
|
|
|
|
class KSampler:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}),
|
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "The random seed used for creating the noise."}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}),
|
|
"sampler_name": (samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}),
|
|
"scheduler": (samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}),
|
|
"positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}),
|
|
"negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}),
|
|
"latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
OUTPUT_TOOLTIPS = ("The denoised latent.",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image."
|
|
|
|
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
|
|
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
|
|
|
|
class KSamplerAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"model": ("MODEL",),
|
|
"add_noise": (["enable", "disable"], ),
|
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}),
|
|
"sampler_name": (samplers.KSampler.SAMPLERS, ),
|
|
"scheduler": (samplers.KSampler.SCHEDULERS, ),
|
|
"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"latent_image": ("LATENT", ),
|
|
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
|
|
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
|
|
"return_with_leftover_noise": (["disable", "enable"], ),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
|
|
def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
|
|
force_full_denoise = True
|
|
if return_with_leftover_noise == "enable":
|
|
force_full_denoise = False
|
|
disable_noise = False
|
|
if add_noise == "disable":
|
|
disable_noise = True
|
|
return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)
|
|
|
|
class SaveImage:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
self.type = "output"
|
|
self.prefix_append = ""
|
|
self.compress_level = 4
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":{
|
|
"images": ("IMAGE", {"tooltip": "The images to save."}),
|
|
"filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."})
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save_images"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "image"
|
|
DESCRIPTION = "Saves the input images to your ComfyUI output directory."
|
|
|
|
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
filename_prefix += self.prefix_append
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
|
|
results = list()
|
|
image: RGBImage
|
|
for (batch_number, image) in enumerate(images):
|
|
i = 255. * image.float().cpu().numpy()
|
|
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
|
|
|
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
|
|
file = f"{filename_with_batch_num}_{counter:05}_.png"
|
|
abs_path = os.path.join(full_output_folder, file)
|
|
img.save(abs_path, pnginfo=metadata, compress_level=self.compress_level)
|
|
results.append({
|
|
"abs_path": os.path.abspath(abs_path),
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": self.type
|
|
})
|
|
counter += 1
|
|
|
|
return { "ui": { "images": results } }
|
|
|
|
class PreviewImage(SaveImage):
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_temp_directory()
|
|
self.type = "temp"
|
|
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5))
|
|
self.compress_level = 1
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"images": ("IMAGE", ), },
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
|
|
class LoadImage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {
|
|
"required": {
|
|
"image": (natsorted(files), {"image_upload": True}),
|
|
},
|
|
}
|
|
|
|
CATEGORY = "image"
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "load_image"
|
|
|
|
def load_image(self, image: str) -> tuple[RGBImageBatch, MaskBatch]:
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
|
|
output_images = []
|
|
output_masks = []
|
|
w, h = None, None
|
|
|
|
excluded_formats = ['MPO']
|
|
|
|
# maintain the legacy path
|
|
# this will ultimately return a tensor, so we'd rather have the tensors directly
|
|
# from cv2 rather than get them out of a PIL image
|
|
_, ext = os.path.splitext(image)
|
|
if ext == ".exr":
|
|
return load_exr(image_path, srgb=False)
|
|
with open_image(image_path) as img:
|
|
for i in ImageSequence.Iterator(img):
|
|
i = node_helpers.pillow(ImageOps.exif_transpose, i)
|
|
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
image = i.convert("RGB")
|
|
|
|
if len(output_images) == 0:
|
|
w = image.size[0]
|
|
h = image.size[1]
|
|
|
|
if image.size[0] != w or image.size[1] != h:
|
|
continue
|
|
|
|
image = np.array(image).astype(np.float32) / 255.0
|
|
image = torch.from_numpy(image)[None,]
|
|
if 'A' in i.getbands():
|
|
mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0
|
|
mask = 1. - torch.from_numpy(mask)
|
|
else:
|
|
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
|
|
output_images.append(image)
|
|
output_masks.append(mask.unsqueeze(0))
|
|
|
|
if len(output_images) > 1 and img.format not in excluded_formats:
|
|
output_image = torch.cat(output_images, dim=0)
|
|
output_mask = torch.cat(output_masks, dim=0)
|
|
else:
|
|
output_image = output_images[0]
|
|
output_mask = output_masks[0]
|
|
|
|
return (output_image, output_mask)
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
class LoadImageMask:
|
|
_color_channels = ["alpha", "red", "green", "blue"]
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {"required":
|
|
{"image": (sorted(files), {"image_upload": True}),
|
|
"channel": (s._color_channels, ), }
|
|
}
|
|
|
|
CATEGORY = "mask"
|
|
|
|
RETURN_TYPES = ("MASK",)
|
|
FUNCTION = "load_image"
|
|
def load_image(self, image, channel):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
i = node_helpers.pillow(Image.open, image_path)
|
|
i = node_helpers.pillow(ImageOps.exif_transpose, i)
|
|
if i.getbands() != ("R", "G", "B", "A"):
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
i = i.convert("RGBA")
|
|
mask = None
|
|
c = channel[0].upper()
|
|
if c in i.getbands():
|
|
mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0
|
|
mask = torch.from_numpy(mask)
|
|
if c == 'A':
|
|
mask = 1. - mask
|
|
else:
|
|
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
|
|
return (mask.unsqueeze(0),)
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
class ImageScale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"crop": (s.crop_methods,)}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image: RGBImageBatch, upscale_method, width, height, crop) -> tuple[RGBImageBatch]:
|
|
if width == 0 and height == 0:
|
|
s = image
|
|
else:
|
|
samples = image.movedim(-1,1)
|
|
|
|
if width == 0:
|
|
width = max(1, round(samples.shape[3] * height / samples.shape[2]))
|
|
elif height == 0:
|
|
height = max(1, round(samples.shape[2] * width / samples.shape[3]))
|
|
|
|
s = utils.common_upscale(samples, width, height, upscale_method, crop)
|
|
s = s.movedim(1,-1)
|
|
return (s,)
|
|
|
|
class ImageScaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image, upscale_method, scale_by):
|
|
samples = image.movedim(-1,1)
|
|
width = round(samples.shape[3] * scale_by)
|
|
height = round(samples.shape[2] * scale_by)
|
|
s = utils.common_upscale(samples, width, height, upscale_method, "disabled")
|
|
s = s.movedim(1,-1)
|
|
return (s,)
|
|
|
|
class ImageInvert:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "invert"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def invert(self, image):
|
|
s = 1.0 - image
|
|
return (s,)
|
|
|
|
class ImageBatch:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "batch"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def batch(self, image1, image2):
|
|
if image1.shape[1:] != image2.shape[1:]:
|
|
image2 = utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1)
|
|
s = torch.cat((image1, image2), dim=0)
|
|
return (s,)
|
|
|
|
class EmptyImage:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}),
|
|
"color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}),
|
|
}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def generate(self, width, height, batch_size=1, color=0):
|
|
r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
|
|
g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
|
|
b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
|
|
return (torch.cat((r, g, b), dim=-1), )
|
|
|
|
class ImagePadForOutpaint:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"image": ("IMAGE",),
|
|
"left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "expand_image"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def expand_image(self, image, left, top, right, bottom, feathering):
|
|
d1, d2, d3, d4 = image.size()
|
|
|
|
new_image = torch.ones(
|
|
(d1, d2 + top + bottom, d3 + left + right, d4),
|
|
dtype=torch.float32,
|
|
) * 0.5
|
|
|
|
new_image[:, top:top + d2, left:left + d3, :] = image
|
|
|
|
mask = torch.ones(
|
|
(d2 + top + bottom, d3 + left + right),
|
|
dtype=torch.float32,
|
|
)
|
|
|
|
t = torch.zeros(
|
|
(d2, d3),
|
|
dtype=torch.float32
|
|
)
|
|
|
|
if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3:
|
|
|
|
for i in range(d2):
|
|
for j in range(d3):
|
|
dt = i if top != 0 else d2
|
|
db = d2 - i if bottom != 0 else d2
|
|
|
|
dl = j if left != 0 else d3
|
|
dr = d3 - j if right != 0 else d3
|
|
|
|
d = min(dt, db, dl, dr)
|
|
|
|
if d >= feathering:
|
|
continue
|
|
|
|
v = (feathering - d) / feathering
|
|
|
|
t[i, j] = v * v
|
|
|
|
mask[top:top + d2, left:left + d3] = t
|
|
|
|
return (new_image, mask)
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
"KSampler": KSampler,
|
|
"CheckpointLoaderSimple": CheckpointLoaderSimple,
|
|
"CLIPTextEncode": CLIPTextEncode,
|
|
"CLIPSetLastLayer": CLIPSetLastLayer,
|
|
"VAEDecode": VAEDecode,
|
|
"VAEEncode": VAEEncode,
|
|
"VAEEncodeForInpaint": VAEEncodeForInpaint,
|
|
"VAELoader": VAELoader,
|
|
"EmptyLatentImage": EmptyLatentImage,
|
|
"LatentUpscale": LatentUpscale,
|
|
"LatentUpscaleBy": LatentUpscaleBy,
|
|
"LatentFromBatch": LatentFromBatch,
|
|
"RepeatLatentBatch": RepeatLatentBatch,
|
|
"SaveImage": SaveImage,
|
|
"PreviewImage": PreviewImage,
|
|
"LoadImage": LoadImage,
|
|
"LoadImageMask": LoadImageMask,
|
|
"ImageScale": ImageScale,
|
|
"ImageScaleBy": ImageScaleBy,
|
|
"ImageInvert": ImageInvert,
|
|
"ImageBatch": ImageBatch,
|
|
"ImagePadForOutpaint": ImagePadForOutpaint,
|
|
"EmptyImage": EmptyImage,
|
|
"ConditioningAverage": ConditioningAverage ,
|
|
"ConditioningCombine": ConditioningCombine,
|
|
"ConditioningConcat": ConditioningConcat,
|
|
"ConditioningSetArea": ConditioningSetArea,
|
|
"ConditioningSetAreaPercentage": ConditioningSetAreaPercentage,
|
|
"ConditioningSetAreaStrength": ConditioningSetAreaStrength,
|
|
"ConditioningSetMask": ConditioningSetMask,
|
|
"KSamplerAdvanced": KSamplerAdvanced,
|
|
"SetLatentNoiseMask": SetLatentNoiseMask,
|
|
"LatentComposite": LatentComposite,
|
|
"LatentBlend": LatentBlend,
|
|
"LatentRotate": LatentRotate,
|
|
"LatentFlip": LatentFlip,
|
|
"LatentCrop": LatentCrop,
|
|
"LoraLoader": LoraLoader,
|
|
"CLIPLoader": CLIPLoader,
|
|
"UNETLoader": UNETLoader,
|
|
"DualCLIPLoader": DualCLIPLoader,
|
|
"CLIPVisionEncode": CLIPVisionEncode,
|
|
"StyleModelApply": StyleModelApply,
|
|
"unCLIPConditioning": unCLIPConditioning,
|
|
"ControlNetApply": ControlNetApply,
|
|
"ControlNetApplyAdvanced": ControlNetApplyAdvanced,
|
|
"ControlNetLoader": ControlNetLoader,
|
|
"ControlNetLoaderWeights": ControlNetLoaderWeights,
|
|
"DiffControlNetLoader": DiffControlNetLoader,
|
|
"StyleModelLoader": StyleModelLoader,
|
|
"CLIPVisionLoader": CLIPVisionLoader,
|
|
"VAEDecodeTiled": VAEDecodeTiled,
|
|
"VAEEncodeTiled": VAEEncodeTiled,
|
|
"unCLIPCheckpointLoader": unCLIPCheckpointLoader,
|
|
"GLIGENLoader": GLIGENLoader,
|
|
"GLIGENTextBoxApply": GLIGENTextBoxApply,
|
|
"InpaintModelConditioning": InpaintModelConditioning,
|
|
|
|
"CheckpointLoader": CheckpointLoader,
|
|
"DiffusersLoader": DiffusersLoader,
|
|
|
|
"LoadLatent": LoadLatent,
|
|
"SaveLatent": SaveLatent,
|
|
|
|
"ConditioningZeroOut": ConditioningZeroOut,
|
|
"ConditioningSetTimestepRange": ConditioningSetTimestepRange,
|
|
"LoraLoaderModelOnly": LoraLoaderModelOnly,
|
|
}
|
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
|
# Sampling
|
|
"KSampler": "KSampler",
|
|
"KSamplerAdvanced": "KSampler (Advanced)",
|
|
# Loaders
|
|
"CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)",
|
|
"CheckpointLoaderSimple": "Load Checkpoint",
|
|
"VAELoader": "Load VAE",
|
|
"LoraLoader": "Load LoRA",
|
|
"CLIPLoader": "Load CLIP",
|
|
"ControlNetLoader": "Load ControlNet Model",
|
|
"ControlNetLoaderWeights": "Load ControlNet Model (Weights)",
|
|
"DiffControlNetLoader": "Load ControlNet Model (diff)",
|
|
"StyleModelLoader": "Load Style Model",
|
|
"CLIPVisionLoader": "Load CLIP Vision",
|
|
"UpscaleModelLoader": "Load Upscale Model",
|
|
"UNETLoader": "Load Diffusion Model",
|
|
# Conditioning
|
|
"CLIPVisionEncode": "CLIP Vision Encode",
|
|
"StyleModelApply": "Apply Style Model",
|
|
"CLIPTextEncode": "CLIP Text Encode (Prompt)",
|
|
"CLIPSetLastLayer": "CLIP Set Last Layer",
|
|
"ConditioningCombine": "Conditioning (Combine)",
|
|
"ConditioningAverage ": "Conditioning (Average)",
|
|
"ConditioningConcat": "Conditioning (Concat)",
|
|
"ConditioningSetArea": "Conditioning (Set Area)",
|
|
"ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)",
|
|
"ConditioningSetMask": "Conditioning (Set Mask)",
|
|
"ControlNetApply": "Apply ControlNet (OLD)",
|
|
"ControlNetApplyAdvanced": "Apply ControlNet",
|
|
# Latent
|
|
"VAEEncodeForInpaint": "VAE Encode (for Inpainting)",
|
|
"SetLatentNoiseMask": "Set Latent Noise Mask",
|
|
"VAEDecode": "VAE Decode",
|
|
"VAEEncode": "VAE Encode",
|
|
"LatentRotate": "Rotate Latent",
|
|
"LatentFlip": "Flip Latent",
|
|
"LatentCrop": "Crop Latent",
|
|
"EmptyLatentImage": "Empty Latent Image",
|
|
"LatentUpscale": "Upscale Latent",
|
|
"LatentUpscaleBy": "Upscale Latent By",
|
|
"LatentComposite": "Latent Composite",
|
|
"LatentBlend": "Latent Blend",
|
|
"LatentFromBatch" : "Latent From Batch",
|
|
"RepeatLatentBatch": "Repeat Latent Batch",
|
|
# Image
|
|
"SaveImage": "Save Image",
|
|
"PreviewImage": "Preview Image",
|
|
"LoadImage": "Load Image",
|
|
"LoadImageMask": "Load Image (as Mask)",
|
|
"ImageScale": "Upscale Image",
|
|
"ImageScaleBy": "Upscale Image By",
|
|
"ImageUpscaleWithModel": "Upscale Image (using Model)",
|
|
"ImageInvert": "Invert Image",
|
|
"ImagePadForOutpaint": "Pad Image for Outpainting",
|
|
"ImageBatch": "Batch Images",
|
|
# _for_testing
|
|
"VAEDecodeTiled": "VAE Decode (Tiled)",
|
|
"VAEEncodeTiled": "VAE Encode (Tiled)",
|
|
}
|