import torch import os import json import hashlib import math import random import logging from PIL import Image, ImageOps, ImageSequence from PIL.PngImagePlugin import PngInfo from pkg_resources import resource_filename import numpy as np import safetensors.torch from .. import diffusers_load from .. import samplers from .. import sample from .. import sd from .. import utils from .. import clip_vision as clip_vision_module from .. import model_management from ..cli_args import args from ..cmd import folder_paths, latent_preview from ..model_downloader import get_filename_list_with_downloadable, get_or_download, KNOWN_CHECKPOINTS, \ KNOWN_CLIP_VISION_MODELS, KNOWN_GLIGEN_MODELS, KNOWN_UNCLIP_CHECKPOINTS, KNOWN_LORAS from ..nodes.common import MAX_RESOLUTION from .. import controlnet class CLIPTextEncode: @classmethod def INPUT_TYPES(s): return {"required": {"text": ("STRING", {"multiline": True}), "clip": ("CLIP", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "encode" CATEGORY = "conditioning" def encode(self, clip, text): tokens = clip.tokenize(text) cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True) return ([[cond, {"pooled_output": pooled}]], ) class ConditioningCombine: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "combine" CATEGORY = "conditioning" def combine(self, conditioning_1, conditioning_2): return (conditioning_1 + conditioning_2, ) class ConditioningAverage : @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "addWeighted" CATEGORY = "conditioning" def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): out = [] if len(conditioning_from) > 1: logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") cond_from = conditioning_from[0][0] pooled_output_from = conditioning_from[0][1].get("pooled_output", None) for i in range(len(conditioning_to)): t1 = conditioning_to[i][0] pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) t0 = cond_from[:,:t1.shape[1]] if t0.shape[1] < t1.shape[1]: t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) t_to = conditioning_to[i][1].copy() if pooled_output_from is not None and pooled_output_to is not None: t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) elif pooled_output_from is not None: t_to["pooled_output"] = pooled_output_from n = [tw, t_to] out.append(n) return (out, ) class ConditioningConcat: @classmethod def INPUT_TYPES(s): return {"required": { "conditioning_to": ("CONDITIONING",), "conditioning_from": ("CONDITIONING",), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "concat" CATEGORY = "conditioning" def concat(self, conditioning_to, conditioning_from): out = [] if len(conditioning_from) > 1: logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") cond_from = conditioning_from[0][0] for i in range(len(conditioning_to)): t1 = conditioning_to[i][0] tw = torch.cat((t1, cond_from),1) n = [tw, conditioning_to[i][1].copy()] out.append(n) return (out, ) class ConditioningSetArea: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, width, height, x, y, strength): c = [] for t in conditioning: n = [t[0], t[1].copy()] n[1]['area'] = (height // 8, width // 8, y // 8, x // 8) n[1]['strength'] = strength n[1]['set_area_to_bounds'] = False c.append(n) return (c, ) class ConditioningSetAreaPercentage: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, width, height, x, y, strength): c = [] for t in conditioning: n = [t[0], t[1].copy()] n[1]['area'] = ("percentage", height, width, y, x) n[1]['strength'] = strength n[1]['set_area_to_bounds'] = False c.append(n) return (c, ) class ConditioningSetAreaStrength: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, strength): c = [] for t in conditioning: n = [t[0], t[1].copy()] n[1]['strength'] = strength c.append(n) return (c, ) class ConditioningSetMask: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "mask": ("MASK", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), "set_cond_area": (["default", "mask bounds"],), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, mask, set_cond_area, strength): c = [] set_area_to_bounds = False if set_cond_area != "default": set_area_to_bounds = True if len(mask.shape) < 3: mask = mask.unsqueeze(0) for t in conditioning: n = [t[0], t[1].copy()] _, h, w = mask.shape n[1]['mask'] = mask n[1]['set_area_to_bounds'] = set_area_to_bounds n[1]['mask_strength'] = strength c.append(n) return (c, ) class ConditioningZeroOut: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "zero_out" CATEGORY = "advanced/conditioning" def zero_out(self, conditioning): c = [] for t in conditioning: d = t[1].copy() if "pooled_output" in d: d["pooled_output"] = torch.zeros_like(d["pooled_output"]) n = [torch.zeros_like(t[0]), d] c.append(n) return (c, ) class ConditioningSetTimestepRange: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "set_range" CATEGORY = "advanced/conditioning" def set_range(self, conditioning, start, end): c = [] for t in conditioning: d = t[1].copy() d['start_percent'] = start d['end_percent'] = end n = [t[0], d] c.append(n) return (c, ) class VAEDecode: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}} RETURN_TYPES = ("IMAGE",) FUNCTION = "decode" CATEGORY = "latent" def decode(self, vae, samples): return (vae.decode(samples["samples"]), ) class VAEDecodeTiled: @classmethod def INPUT_TYPES(s): return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}) }} RETURN_TYPES = ("IMAGE",) FUNCTION = "decode" CATEGORY = "_for_testing" def decode(self, vae, samples, tile_size): return (vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, ), ) class VAEEncode: @classmethod def INPUT_TYPES(s): return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "latent" def encode(self, vae, pixels): t = vae.encode(pixels[:,:,:,:3]) return ({"samples":t}, ) class VAEEncodeTiled: @classmethod def INPUT_TYPES(s): return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}) }} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "_for_testing" def encode(self, vae, pixels, tile_size): t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, ) return ({"samples":t}, ) class VAEEncodeForInpaint: @classmethod def INPUT_TYPES(s): return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "latent/inpaint" def encode(self, vae, pixels, mask, grow_mask_by=6): x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") pixels = pixels.clone() if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2 y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] #grow mask by a few pixels to keep things seamless in latent space if grow_mask_by == 0: mask_erosion = mask else: kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) padding = math.ceil((grow_mask_by - 1) / 2) mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) m = (1.0 - mask.round()).squeeze(1) for i in range(3): pixels[:,:,:,i] -= 0.5 pixels[:,:,:,i] *= m pixels[:,:,:,i] += 0.5 t = vae.encode(pixels) return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) class InpaintModelConditioning: @classmethod def INPUT_TYPES(s): return {"required": {"positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "vae": ("VAE", ), "pixels": ("IMAGE", ), "mask": ("MASK", ), }} RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") RETURN_NAMES = ("positive", "negative", "latent") FUNCTION = "encode" CATEGORY = "conditioning/inpaint" def encode(self, positive, negative, pixels, vae, mask): x = (pixels.shape[1] // 8) * 8 y = (pixels.shape[2] // 8) * 8 mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") orig_pixels = pixels pixels = orig_pixels.clone() if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % 8) // 2 y_offset = (pixels.shape[2] % 8) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] m = (1.0 - mask.round()).squeeze(1) for i in range(3): pixels[:,:,:,i] -= 0.5 pixels[:,:,:,i] *= m pixels[:,:,:,i] += 0.5 concat_latent = vae.encode(pixels) orig_latent = vae.encode(orig_pixels) out_latent = {} out_latent["samples"] = orig_latent out_latent["noise_mask"] = mask out = [] for conditioning in [positive, negative]: c = [] for t in conditioning: d = t[1].copy() d["concat_latent_image"] = concat_latent d["concat_mask"] = mask n = [t[0], d] c.append(n) out.append(c) return (out[0], out[1], out_latent) class SaveLatent: def __init__(self): self.output_dir = folder_paths.get_output_directory() @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT", ), "filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } RETURN_TYPES = () FUNCTION = "save" OUTPUT_NODE = True CATEGORY = "_for_testing" def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) # support save metadata for latent sharing prompt_info = "" if prompt is not None: prompt_info = json.dumps(prompt) metadata = None if not args.disable_metadata: metadata = {"prompt": prompt_info} if extra_pnginfo is not None: for x in extra_pnginfo: metadata[x] = json.dumps(extra_pnginfo[x]) file = f"{filename}_{counter:05}_.latent" results = list() results.append({ "filename": file, "subfolder": subfolder, "type": "output" }) file = os.path.join(full_output_folder, file) output = {} output["latent_tensor"] = samples["samples"] output["latent_format_version_0"] = torch.tensor([]) utils.save_torch_file(output, file, metadata=metadata) return { "ui": { "latents": results } } class LoadLatent: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] return {"required": {"latent": [sorted(files), ]}, } CATEGORY = "_for_testing" RETURN_TYPES = ("LATENT", ) FUNCTION = "load" def load(self, latent): latent_path = folder_paths.get_annotated_filepath(latent) latent = safetensors.torch.load_file(latent_path, device="cpu") multiplier = 1.0 if "latent_format_version_0" not in latent: multiplier = 1.0 / 0.18215 samples = {"samples": latent["latent_tensor"].float() * multiplier} return (samples, ) @classmethod def IS_CHANGED(s, latent): image_path = folder_paths.get_annotated_filepath(latent) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, latent): if not folder_paths.exists_annotated_filepath(latent): return "Invalid latent file: {}".format(latent) return True class CheckpointLoader: @classmethod def INPUT_TYPES(s): return {"required": { "config_name": (folder_paths.get_filename_list("configs"),), "ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_CHECKPOINTS),)}} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "advanced/loaders" def load_checkpoint(self, config_name, ckpt_name, output_vae=True, output_clip=True): config_path = folder_paths.get_full_path("configs", config_name) ckpt_path = get_or_download("checkpoints", ckpt_name, KNOWN_CHECKPOINTS) return sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) class CheckpointLoaderSimple: @classmethod def INPUT_TYPES(s): return {"required": { "ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_CHECKPOINTS),), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "loaders" def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): ckpt_path = get_or_download("checkpoints", ckpt_name, KNOWN_CHECKPOINTS) out = sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out[:3] class DiffusersLoader: @classmethod def INPUT_TYPES(cls): paths = [] for search_path in folder_paths.get_folder_paths("diffusers"): if os.path.exists(search_path): for root, subdir, files in os.walk(search_path, followlinks=True): if "model_index.json" in files: paths.append(os.path.relpath(root, start=search_path)) return {"required": {"model_path": (paths,), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "advanced/loaders/deprecated" def load_checkpoint(self, model_path, output_vae=True, output_clip=True): for search_path in folder_paths.get_folder_paths("diffusers"): if os.path.exists(search_path): path = os.path.join(search_path, model_path) if os.path.exists(path): model_path = path break return diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) class unCLIPCheckpointLoader: @classmethod def INPUT_TYPES(s): return {"required": { "ckpt_name": (get_filename_list_with_downloadable("checkpoints", KNOWN_UNCLIP_CHECKPOINTS),), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") FUNCTION = "load_checkpoint" CATEGORY = "loaders" def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): ckpt_path = get_or_download("checkpoints", ckpt_name, KNOWN_UNCLIP_CHECKPOINTS) out = sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out class CLIPSetLastLayer: @classmethod def INPUT_TYPES(s): return {"required": { "clip": ("CLIP", ), "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), }} RETURN_TYPES = ("CLIP",) FUNCTION = "set_last_layer" CATEGORY = "conditioning" def set_last_layer(self, clip, stop_at_clip_layer): clip = clip.clone() clip.clip_layer(stop_at_clip_layer) return (clip,) class LoraLoader: def __init__(self): self.loaded_lora = None @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "clip": ("CLIP", ), "lora_name": (get_filename_list_with_downloadable("loras", KNOWN_LORAS),), "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), }} RETURN_TYPES = ("MODEL", "CLIP") FUNCTION = "load_lora" CATEGORY = "loaders" def load_lora(self, model, clip, lora_name, strength_model, strength_clip): if strength_model == 0 and strength_clip == 0: return (model, clip) lora_path = get_or_download("loras", lora_name, KNOWN_LORAS) lora = None if self.loaded_lora is not None: if self.loaded_lora[0] == lora_path: lora = self.loaded_lora[1] else: temp = self.loaded_lora self.loaded_lora = None del temp if lora is None: lora = utils.load_torch_file(lora_path, safe_load=True) self.loaded_lora = (lora_path, lora) model_lora, clip_lora = sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) return (model_lora, clip_lora) class LoraLoaderModelOnly(LoraLoader): @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "lora_name": (folder_paths.get_filename_list("loras"), ), "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}), }} RETURN_TYPES = ("MODEL",) FUNCTION = "load_lora_model_only" def load_lora_model_only(self, model, lora_name, strength_model): return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) class VAELoader: @staticmethod def vae_list(): vaes = folder_paths.get_filename_list("vae") approx_vaes = folder_paths.get_filename_list("vae_approx") sdxl_taesd_enc = False sdxl_taesd_dec = False sd1_taesd_enc = False sd1_taesd_dec = False for v in approx_vaes: if v.startswith("taesd_decoder."): sd1_taesd_dec = True elif v.startswith("taesd_encoder."): sd1_taesd_enc = True elif v.startswith("taesdxl_decoder."): sdxl_taesd_dec = True elif v.startswith("taesdxl_encoder."): sdxl_taesd_enc = True if sd1_taesd_dec and sd1_taesd_enc: vaes.append("taesd") if sdxl_taesd_dec and sdxl_taesd_enc: vaes.append("taesdxl") return vaes @staticmethod def load_taesd(name): sd = {} approx_vaes = folder_paths.get_filename_list("vae_approx") encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) enc = utils.load_torch_file(folder_paths.get_full_path("vae_approx", encoder)) for k in enc: sd["taesd_encoder.{}".format(k)] = enc[k] dec = utils.load_torch_file(folder_paths.get_full_path("vae_approx", decoder)) for k in dec: sd["taesd_decoder.{}".format(k)] = dec[k] if name == "taesd": sd["vae_scale"] = torch.tensor(0.18215) elif name == "taesdxl": sd["vae_scale"] = torch.tensor(0.13025) return sd @classmethod def INPUT_TYPES(s): return {"required": { "vae_name": (s.vae_list(),)}} RETURN_TYPES = ("VAE",) FUNCTION = "load_vae" CATEGORY = "loaders" #TODO: scale factor? def load_vae(self, vae_name): if vae_name in ["taesd", "taesdxl"]: sd = self.load_taesd(vae_name) else: vae_path = folder_paths.get_full_path("vae", vae_name) sd = utils.load_torch_file(vae_path) vae = sd.VAE(sd=sd) return (vae,) class ControlNetLoader: @classmethod def INPUT_TYPES(s): return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"),)}} RETURN_TYPES = ("CONTROL_NET",) FUNCTION = "load_controlnet" CATEGORY = "loaders" def load_controlnet(self, control_net_name): controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) controlnet_ = controlnet.load_controlnet(controlnet_path) return (controlnet_,) class DiffControlNetLoader: @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "control_net_name": (folder_paths.get_filename_list("controlnet"),)}} RETURN_TYPES = ("CONTROL_NET",) FUNCTION = "load_controlnet" CATEGORY = "loaders" def load_controlnet(self, model, control_net_name): controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) controlnet_ = controlnet.load_controlnet(controlnet_path, model) return (controlnet,) class ControlNetApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "control_net": ("CONTROL_NET", ), "image": ("IMAGE", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_controlnet" CATEGORY = "conditioning" def apply_controlnet(self, conditioning, control_net, image, strength): if strength == 0: return (conditioning, ) c = [] control_hint = image.movedim(-1,1) for t in conditioning: n = [t[0], t[1].copy()] c_net = control_net.copy().set_cond_hint(control_hint, strength) if 'control' in t[1]: c_net.set_previous_controlnet(t[1]['control']) n[1]['control'] = c_net n[1]['control_apply_to_uncond'] = True c.append(n) return (c, ) class ControlNetApplyAdvanced: @classmethod def INPUT_TYPES(s): return {"required": {"positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "control_net": ("CONTROL_NET", ), "image": ("IMAGE", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) }} RETURN_TYPES = ("CONDITIONING","CONDITIONING") RETURN_NAMES = ("positive", "negative") FUNCTION = "apply_controlnet" CATEGORY = "conditioning" def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent): if strength == 0: return (positive, negative) control_hint = image.movedim(-1,1) cnets = {} out = [] for conditioning in [positive, negative]: c = [] for t in conditioning: d = t[1].copy() prev_cnet = d.get('control', None) if prev_cnet in cnets: c_net = cnets[prev_cnet] else: c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent)) c_net.set_previous_controlnet(prev_cnet) cnets[prev_cnet] = c_net d['control'] = c_net d['control_apply_to_uncond'] = False n = [t[0], d] c.append(n) out.append(c) return (out[0], out[1]) class UNETLoader: @classmethod def INPUT_TYPES(s): return {"required": { "unet_name": (folder_paths.get_filename_list("unet"),), }} RETURN_TYPES = ("MODEL",) FUNCTION = "load_unet" CATEGORY = "advanced/loaders" def load_unet(self, unet_name): unet_path = folder_paths.get_full_path("unet", unet_name) model = sd.load_unet(unet_path) return (model,) class CLIPLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name": (folder_paths.get_filename_list("clip"),), "type": (["stable_diffusion", "stable_cascade"], ), }} RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "advanced/loaders" def load_clip(self, clip_name, type="stable_diffusion"): clip_type = sd.CLIPType.STABLE_DIFFUSION if type == "stable_cascade": clip_type = sd.CLIPType.STABLE_CASCADE clip_path = folder_paths.get_full_path("clip", clip_name) clip = sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type) return (clip,) class DualCLIPLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name1": (folder_paths.get_filename_list("clip"),), "clip_name2": ( folder_paths.get_filename_list("clip"),), }} RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "advanced/loaders" def load_clip(self, clip_name1, clip_name2): clip_path1 = folder_paths.get_full_path("clip", clip_name1) clip_path2 = folder_paths.get_full_path("clip", clip_name2) clip = sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings")) return (clip,) class CLIPVisionLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name": (get_filename_list_with_downloadable("clip_vision", KNOWN_CLIP_VISION_MODELS),), }} RETURN_TYPES = ("CLIP_VISION",) FUNCTION = "load_clip" CATEGORY = "loaders" def load_clip(self, clip_name): clip_path = get_or_download("clip_vision", clip_name, KNOWN_CLIP_VISION_MODELS) clip_vision = clip_vision_module.load(clip_path) return (clip_vision,) class CLIPVisionEncode: @classmethod def INPUT_TYPES(s): return {"required": { "clip_vision": ("CLIP_VISION",), "image": ("IMAGE",) }} RETURN_TYPES = ("CLIP_VISION_OUTPUT",) FUNCTION = "encode" CATEGORY = "conditioning" def encode(self, clip_vision, image): output = clip_vision.encode_image(image) return (output,) class StyleModelLoader: @classmethod def INPUT_TYPES(s): return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"),)}} RETURN_TYPES = ("STYLE_MODEL",) FUNCTION = "load_style_model" CATEGORY = "loaders" def load_style_model(self, style_model_name): style_model_path = folder_paths.get_full_path("style_models", style_model_name) style_model = sd.load_style_model(style_model_path) return (style_model,) class StyleModelApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "style_model": ("STYLE_MODEL", ), "clip_vision_output": ("CLIP_VISION_OUTPUT", ), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_stylemodel" CATEGORY = "conditioning/style_model" def apply_stylemodel(self, clip_vision_output, style_model, conditioning): cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) c = [] for t in conditioning: n = [torch.cat((t[0], cond), dim=1), t[1].copy()] c.append(n) return (c, ) class unCLIPConditioning: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "clip_vision_output": ("CLIP_VISION_OUTPUT", ), "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_adm" CATEGORY = "conditioning" def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): if strength == 0: return (conditioning, ) c = [] for t in conditioning: o = t[1].copy() x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation} if "unclip_conditioning" in o: o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x] else: o["unclip_conditioning"] = [x] n = [t[0], o] c.append(n) return (c, ) class GLIGENLoader: @classmethod def INPUT_TYPES(s): return {"required": { "gligen_name": (get_filename_list_with_downloadable("gligen", KNOWN_GLIGEN_MODELS),)}} RETURN_TYPES = ("GLIGEN",) FUNCTION = "load_gligen" CATEGORY = "loaders" def load_gligen(self, gligen_name): gligen_path = get_or_download("gligen", gligen_name, KNOWN_GLIGEN_MODELS) gligen = sd.load_gligen(gligen_path) return (gligen,) class GLIGENTextBoxApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_to": ("CONDITIONING", ), "clip": ("CLIP", ), "gligen_textbox_model": ("GLIGEN", ), "text": ("STRING", {"multiline": True}), "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning/gligen" def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): c = [] cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") for t in conditioning_to: n = [t[0], t[1].copy()] position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] prev = [] if "gligen" in n[1]: prev = n[1]['gligen'][2] n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) c.append(n) return (c, ) class EmptyLatentImage: def __init__(self): self.device = model_management.intermediate_device() @classmethod def INPUT_TYPES(s): return {"required": { "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096})}} RETURN_TYPES = ("LATENT",) FUNCTION = "generate" CATEGORY = "latent" def generate(self, width, height, batch_size=1): latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) return ({"samples":latent}, ) class LatentFromBatch: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), "length": ("INT", {"default": 1, "min": 1, "max": 64}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "frombatch" CATEGORY = "latent/batch" def frombatch(self, samples, batch_index, length): s = samples.copy() s_in = samples["samples"] batch_index = min(s_in.shape[0] - 1, batch_index) length = min(s_in.shape[0] - batch_index, length) s["samples"] = s_in[batch_index:batch_index + length].clone() if "noise_mask" in samples: masks = samples["noise_mask"] if masks.shape[0] == 1: s["noise_mask"] = masks.clone() else: if masks.shape[0] < s_in.shape[0]: masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] s["noise_mask"] = masks[batch_index:batch_index + length].clone() if "batch_index" not in s: s["batch_index"] = [x for x in range(batch_index, batch_index+length)] else: s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] return (s,) class RepeatLatentBatch: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "amount": ("INT", {"default": 1, "min": 1, "max": 64}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "repeat" CATEGORY = "latent/batch" def repeat(self, samples, amount): s = samples.copy() s_in = samples["samples"] s["samples"] = s_in.repeat((amount, 1,1,1)) if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: masks = samples["noise_mask"] if masks.shape[0] < s_in.shape[0]: masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1)) if "batch_index" in s: offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] return (s,) class LatentUpscale: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] crop_methods = ["disabled", "center"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "crop": (s.crop_methods,)}} RETURN_TYPES = ("LATENT",) FUNCTION = "upscale" CATEGORY = "latent" def upscale(self, samples, upscale_method, width, height, crop): if width == 0 and height == 0: s = samples else: s = samples.copy() if width == 0: height = max(64, height) width = max(64, round(samples["samples"].shape[3] * height / samples["samples"].shape[2])) elif height == 0: width = max(64, width) height = max(64, round(samples["samples"].shape[2] * width / samples["samples"].shape[3])) else: width = max(64, width) height = max(64, height) s["samples"] = utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) return (s,) class LatentUpscaleBy: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("LATENT",) FUNCTION = "upscale" CATEGORY = "latent" def upscale(self, samples, upscale_method, scale_by): s = samples.copy() width = round(samples["samples"].shape[3] * scale_by) height = round(samples["samples"].shape[2] * scale_by) s["samples"] = utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") return (s,) class LatentRotate: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), }} RETURN_TYPES = ("LATENT",) FUNCTION = "rotate" CATEGORY = "latent/transform" def rotate(self, samples, rotation): s = samples.copy() rotate_by = 0 if rotation.startswith("90"): rotate_by = 1 elif rotation.startswith("180"): rotate_by = 2 elif rotation.startswith("270"): rotate_by = 3 s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) return (s,) class LatentFlip: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "flip_method": (["x-axis: vertically", "y-axis: horizontally"],), }} RETURN_TYPES = ("LATENT",) FUNCTION = "flip" CATEGORY = "latent/transform" def flip(self, samples, flip_method): s = samples.copy() if flip_method.startswith("x"): s["samples"] = torch.flip(samples["samples"], dims=[2]) elif flip_method.startswith("y"): s["samples"] = torch.flip(samples["samples"], dims=[3]) return (s,) class LatentComposite: @classmethod def INPUT_TYPES(s): return {"required": { "samples_to": ("LATENT",), "samples_from": ("LATENT",), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "composite" CATEGORY = "latent" def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): x = x // 8 y = y // 8 feather = feather // 8 samples_out = samples_to.copy() s = samples_to["samples"].clone() samples_to = samples_to["samples"] samples_from = samples_from["samples"] if feather == 0: s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] else: samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] mask = torch.ones_like(samples_from) for t in range(feather): if y != 0: mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) if y + samples_from.shape[2] < samples_to.shape[2]: mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) if x != 0: mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) if x + samples_from.shape[3] < samples_to.shape[3]: mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) rev_mask = torch.ones_like(mask) - mask s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask samples_out["samples"] = s return (samples_out,) class LatentBlend: @classmethod def INPUT_TYPES(s): return {"required": { "samples1": ("LATENT",), "samples2": ("LATENT",), "blend_factor": ("FLOAT", { "default": 0.5, "min": 0, "max": 1, "step": 0.01 }), }} RETURN_TYPES = ("LATENT",) FUNCTION = "blend" CATEGORY = "_for_testing" def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): samples_out = samples1.copy() samples1 = samples1["samples"] samples2 = samples2["samples"] if samples1.shape != samples2.shape: samples2.permute(0, 3, 1, 2) samples2 = utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') samples2.permute(0, 2, 3, 1) samples_blended = self.blend_mode(samples1, samples2, blend_mode) samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) samples_out["samples"] = samples_blended return (samples_out,) def blend_mode(self, img1, img2, mode): if mode == "normal": return img2 else: raise ValueError(f"Unsupported blend mode: {mode}") class LatentCrop: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "crop" CATEGORY = "latent/transform" def crop(self, samples, width, height, x, y): s = samples.copy() samples = samples['samples'] x = x // 8 y = y // 8 #enfonce minimum size of 64 if x > (samples.shape[3] - 8): x = samples.shape[3] - 8 if y > (samples.shape[2] - 8): y = samples.shape[2] - 8 new_height = height // 8 new_width = width // 8 to_x = new_width + x to_y = new_height + y s['samples'] = samples[:,:,y:to_y, x:to_x] return (s,) class SetLatentNoiseMask: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "mask": ("MASK",), }} RETURN_TYPES = ("LATENT",) FUNCTION = "set_mask" CATEGORY = "latent/inpaint" def set_mask(self, samples, mask): s = samples.copy() s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) return (s,) def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): latent_image = latent["samples"] if disable_noise: noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") else: batch_inds = latent["batch_index"] if "batch_index" in latent else None noise = sample.prepare_noise(latent_image, seed, batch_inds) noise_mask = None if "noise_mask" in latent: noise_mask = latent["noise_mask"] callback = latent_preview.prepare_callback(model, steps) disable_pbar = not utils.PROGRESS_BAR_ENABLED samples = sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) out = latent.copy() out["samples"] = samples return (out, ) class KSampler: @classmethod def INPUT_TYPES(s): return {"required": {"model": ("MODEL",), "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), "sampler_name": (samplers.KSampler.SAMPLERS, ), "scheduler": (samplers.KSampler.SCHEDULERS, ), "positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "latent_image": ("LATENT", ), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), } } RETURN_TYPES = ("LATENT",) FUNCTION = "sample" CATEGORY = "sampling" def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) class KSamplerAdvanced: @classmethod def INPUT_TYPES(s): return {"required": {"model": ("MODEL",), "add_noise": (["enable", "disable"], ), "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), "sampler_name": (samplers.KSampler.SAMPLERS, ), "scheduler": (samplers.KSampler.SCHEDULERS, ), "positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "latent_image": ("LATENT", ), "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), "return_with_leftover_noise": (["disable", "enable"], ), } } RETURN_TYPES = ("LATENT",) FUNCTION = "sample" CATEGORY = "sampling" def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): force_full_denoise = True if return_with_leftover_noise == "enable": force_full_denoise = False disable_noise = False if add_noise == "disable": disable_noise = True return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) class SaveImage: def __init__(self): self.output_dir = folder_paths.get_output_directory() self.type = "output" self.prefix_append = "" self.compress_level = 4 @classmethod def INPUT_TYPES(s): return {"required": {"images": ("IMAGE", ), "filename_prefix": ("STRING", {"default": "ComfyUI"})}, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } RETURN_TYPES = () FUNCTION = "save_images" OUTPUT_NODE = True CATEGORY = "image" def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): filename_prefix += self.prefix_append full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) results = list() for (batch_number, image) in enumerate(images): i = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) metadata = None if not args.disable_metadata: metadata = PngInfo() if prompt is not None: metadata.add_text("prompt", json.dumps(prompt)) if extra_pnginfo is not None: for x in extra_pnginfo: metadata.add_text(x, json.dumps(extra_pnginfo[x])) filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) file = f"{filename_with_batch_num}_{counter:05}_.png" abs_path = os.path.join(full_output_folder, file) img.save(abs_path, pnginfo=metadata, compress_level=self.compress_level) results.append({ "abs_path": os.path.abspath(abs_path), "filename": file, "subfolder": subfolder, "type": self.type }) counter += 1 return { "ui": { "images": results } } class PreviewImage(SaveImage): def __init__(self): self.output_dir = folder_paths.get_temp_directory() self.type = "temp" self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) self.compress_level = 1 @classmethod def INPUT_TYPES(s): return {"required": {"images": ("IMAGE", ), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } class LoadImage: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": {"image": (sorted(files), {"image_upload": True})}, } CATEGORY = "image" RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "load_image" def load_image(self, image): image_path = folder_paths.get_annotated_filepath(image) img = Image.open(image_path) output_images = [] output_masks = [] for i in ImageSequence.Iterator(img): i = ImageOps.exif_transpose(i) if i.mode == 'I': i = i.point(lambda i: i * (1 / 255)) image = i.convert("RGB") image = np.array(image).astype(np.float32) / 255.0 image = torch.from_numpy(image)[None,] if 'A' in i.getbands(): mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) else: mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") output_images.append(image) output_masks.append(mask.unsqueeze(0)) if len(output_images) > 1: output_image = torch.cat(output_images, dim=0) output_mask = torch.cat(output_masks, dim=0) else: output_image = output_images[0] output_mask = output_masks[0] return (output_image, output_mask) @classmethod def IS_CHANGED(s, image): image_path = folder_paths.get_annotated_filepath(image) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, image): if not folder_paths.exists_annotated_filepath(image): return "Invalid image file: {}".format(image) return True class LoadImageMask: _color_channels = ["alpha", "red", "green", "blue"] @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": {"image": (sorted(files), {"image_upload": True}), "channel": (s._color_channels, ), } } CATEGORY = "mask" RETURN_TYPES = ("MASK",) FUNCTION = "load_image" def load_image(self, image, channel): image_path = folder_paths.get_annotated_filepath(image) i = Image.open(image_path) i = ImageOps.exif_transpose(i) if i.getbands() != ("R", "G", "B", "A"): if i.mode == 'I': i = i.point(lambda i: i * (1 / 255)) i = i.convert("RGBA") mask = None c = channel[0].upper() if c in i.getbands(): mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 mask = torch.from_numpy(mask) if c == 'A': mask = 1. - mask else: mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") return (mask.unsqueeze(0),) @classmethod def IS_CHANGED(s, image, channel): image_path = folder_paths.get_annotated_filepath(image) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, image): if not folder_paths.exists_annotated_filepath(image): return "Invalid image file: {}".format(image) return True class ImageScale: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] crop_methods = ["disabled", "center"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), "crop": (s.crop_methods,)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "upscale" CATEGORY = "image/upscaling" def upscale(self, image, upscale_method, width, height, crop): if width == 0 and height == 0: s = image else: samples = image.movedim(-1,1) if width == 0: width = max(1, round(samples.shape[3] * height / samples.shape[2])) elif height == 0: height = max(1, round(samples.shape[2] * width / samples.shape[3])) s = utils.common_upscale(samples, width, height, upscale_method, crop) s = s.movedim(1,-1) return (s,) class ImageScaleBy: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("IMAGE",) FUNCTION = "upscale" CATEGORY = "image/upscaling" def upscale(self, image, upscale_method, scale_by): samples = image.movedim(-1,1) width = round(samples.shape[3] * scale_by) height = round(samples.shape[2] * scale_by) s = utils.common_upscale(samples, width, height, upscale_method, "disabled") s = s.movedim(1,-1) return (s,) class ImageInvert: @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "invert" CATEGORY = "image" def invert(self, image): s = 1.0 - image return (s,) class ImageBatch: @classmethod def INPUT_TYPES(s): return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "batch" CATEGORY = "image" def batch(self, image1, image2): if image1.shape[1:] != image2.shape[1:]: image2 = utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) s = torch.cat((image1, image2), dim=0) return (s,) class EmptyImage: def __init__(self, device="cpu"): self.device = device @classmethod def INPUT_TYPES(s): return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), }} RETURN_TYPES = ("IMAGE",) FUNCTION = "generate" CATEGORY = "image" def generate(self, width, height, batch_size=1, color=0): r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) return (torch.cat((r, g, b), dim=-1), ) class ImagePadForOutpaint: @classmethod def INPUT_TYPES(s): return { "required": { "image": ("IMAGE",), "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), } } RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "expand_image" CATEGORY = "image" def expand_image(self, image, left, top, right, bottom, feathering): d1, d2, d3, d4 = image.size() new_image = torch.ones( (d1, d2 + top + bottom, d3 + left + right, d4), dtype=torch.float32, ) * 0.5 new_image[:, top:top + d2, left:left + d3, :] = image mask = torch.ones( (d2 + top + bottom, d3 + left + right), dtype=torch.float32, ) t = torch.zeros( (d2, d3), dtype=torch.float32 ) if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: for i in range(d2): for j in range(d3): dt = i if top != 0 else d2 db = d2 - i if bottom != 0 else d2 dl = j if left != 0 else d3 dr = d3 - j if right != 0 else d3 d = min(dt, db, dl, dr) if d >= feathering: continue v = (feathering - d) / feathering t[i, j] = v * v mask[top:top + d2, left:left + d3] = t return (new_image, mask) NODE_CLASS_MAPPINGS = { "KSampler": KSampler, "CheckpointLoaderSimple": CheckpointLoaderSimple, "CLIPTextEncode": CLIPTextEncode, "CLIPSetLastLayer": CLIPSetLastLayer, "VAEDecode": VAEDecode, "VAEEncode": VAEEncode, "VAEEncodeForInpaint": VAEEncodeForInpaint, "VAELoader": VAELoader, "EmptyLatentImage": EmptyLatentImage, "LatentUpscale": LatentUpscale, "LatentUpscaleBy": LatentUpscaleBy, "LatentFromBatch": LatentFromBatch, "RepeatLatentBatch": RepeatLatentBatch, "SaveImage": SaveImage, "PreviewImage": PreviewImage, "LoadImage": LoadImage, "LoadImageMask": LoadImageMask, "ImageScale": ImageScale, "ImageScaleBy": ImageScaleBy, "ImageInvert": ImageInvert, "ImageBatch": ImageBatch, "ImagePadForOutpaint": ImagePadForOutpaint, "EmptyImage": EmptyImage, "ConditioningAverage": ConditioningAverage , "ConditioningCombine": ConditioningCombine, "ConditioningConcat": ConditioningConcat, "ConditioningSetArea": ConditioningSetArea, "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, "ConditioningSetAreaStrength": ConditioningSetAreaStrength, "ConditioningSetMask": ConditioningSetMask, "KSamplerAdvanced": KSamplerAdvanced, "SetLatentNoiseMask": SetLatentNoiseMask, "LatentComposite": LatentComposite, "LatentBlend": LatentBlend, "LatentRotate": LatentRotate, "LatentFlip": LatentFlip, "LatentCrop": LatentCrop, "LoraLoader": LoraLoader, "CLIPLoader": CLIPLoader, "UNETLoader": UNETLoader, "DualCLIPLoader": DualCLIPLoader, "CLIPVisionEncode": CLIPVisionEncode, "StyleModelApply": StyleModelApply, "unCLIPConditioning": unCLIPConditioning, "ControlNetApply": ControlNetApply, "ControlNetApplyAdvanced": ControlNetApplyAdvanced, "ControlNetLoader": ControlNetLoader, "DiffControlNetLoader": DiffControlNetLoader, "StyleModelLoader": StyleModelLoader, "CLIPVisionLoader": CLIPVisionLoader, "VAEDecodeTiled": VAEDecodeTiled, "VAEEncodeTiled": VAEEncodeTiled, "unCLIPCheckpointLoader": unCLIPCheckpointLoader, "GLIGENLoader": GLIGENLoader, "GLIGENTextBoxApply": GLIGENTextBoxApply, "InpaintModelConditioning": InpaintModelConditioning, "CheckpointLoader": CheckpointLoader, "DiffusersLoader": DiffusersLoader, "LoadLatent": LoadLatent, "SaveLatent": SaveLatent, "ConditioningZeroOut": ConditioningZeroOut, "ConditioningSetTimestepRange": ConditioningSetTimestepRange, "LoraLoaderModelOnly": LoraLoaderModelOnly, } NODE_DISPLAY_NAME_MAPPINGS = { # Sampling "KSampler": "KSampler", "KSamplerAdvanced": "KSampler (Advanced)", # Loaders "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", "CheckpointLoaderSimple": "Load Checkpoint", "VAELoader": "Load VAE", "LoraLoader": "Load LoRA", "CLIPLoader": "Load CLIP", "ControlNetLoader": "Load ControlNet Model", "DiffControlNetLoader": "Load ControlNet Model (diff)", "StyleModelLoader": "Load Style Model", "CLIPVisionLoader": "Load CLIP Vision", "UpscaleModelLoader": "Load Upscale Model", # Conditioning "CLIPVisionEncode": "CLIP Vision Encode", "StyleModelApply": "Apply Style Model", "CLIPTextEncode": "CLIP Text Encode (Prompt)", "CLIPSetLastLayer": "CLIP Set Last Layer", "ConditioningCombine": "Conditioning (Combine)", "ConditioningAverage ": "Conditioning (Average)", "ConditioningConcat": "Conditioning (Concat)", "ConditioningSetArea": "Conditioning (Set Area)", "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", "ConditioningSetMask": "Conditioning (Set Mask)", "ControlNetApply": "Apply ControlNet", "ControlNetApplyAdvanced": "Apply ControlNet (Advanced)", # Latent "VAEEncodeForInpaint": "VAE Encode (for Inpainting)", "SetLatentNoiseMask": "Set Latent Noise Mask", "VAEDecode": "VAE Decode", "VAEEncode": "VAE Encode", "LatentRotate": "Rotate Latent", "LatentFlip": "Flip Latent", "LatentCrop": "Crop Latent", "EmptyLatentImage": "Empty Latent Image", "LatentUpscale": "Upscale Latent", "LatentUpscaleBy": "Upscale Latent By", "LatentComposite": "Latent Composite", "LatentBlend": "Latent Blend", "LatentFromBatch" : "Latent From Batch", "RepeatLatentBatch": "Repeat Latent Batch", # Image "SaveImage": "Save Image", "PreviewImage": "Preview Image", "LoadImage": "Load Image", "LoadImageMask": "Load Image (as Mask)", "ImageScale": "Upscale Image", "ImageScaleBy": "Upscale Image By", "ImageUpscaleWithModel": "Upscale Image (using Model)", "ImageInvert": "Invert Image", "ImagePadForOutpaint": "Pad Image for Outpainting", "ImageBatch": "Batch Images", # _for_testing "VAEDecodeTiled": "VAE Decode (Tiled)", "VAEEncodeTiled": "VAE Encode (Tiled)", }