finish custom nodes tests

This commit is contained in:
Benjamin Berman 2023-08-21 12:15:25 -07:00
parent 30ed9fc143
commit 8a3331609e
9 changed files with 0 additions and 952 deletions

View File

@ -1,30 +0,0 @@
from transformers import logging as transformers_logging
from diffusers import logging as diffusers_logging
from warnings import filterwarnings
import logging
from .deep_floyd import *
transformers_logging.set_verbosity_error()
diffusers_logging.set_verbosity_error()
logging.getLogger("xformers").addFilter(lambda r: "A matching Triton is not available" not in r.getMessage())
filterwarnings("ignore", category=FutureWarning, message="The `reduce_labels` parameter is deprecated")
filterwarnings("ignore", category=UserWarning, message="You seem to be using the pipelines sequentially on GPU")
filterwarnings("ignore", category=UserWarning, message="TypedStorage is deprecated")
NODE_CLASS_MAPPINGS = {
# DeepFloyd
"IFLoader": IFLoader,
"IFEncoder": IFEncoder,
"IFStageI": IFStageI,
"IFStageII": IFStageII,
"IFStageIII": IFStageIII,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"IFLoader": "DeepFloyd IF Loader",
"IFEncoder": "DeepFloyd IF Encoder",
"IFStageI": "DeepFloyd IF Stage I",
"IFStageII": "DeepFloyd IF Stage II",
"IFStageIII": "DeepFloyd IF Stage III",
}

View File

@ -1,349 +0,0 @@
import gc
import json
import os.path
import typing
import torch
from diffusers import DiffusionPipeline, IFPipeline, StableDiffusionUpscalePipeline, IFSuperResolutionPipeline
from diffusers.utils import is_accelerate_available, is_accelerate_version
from transformers import T5EncoderModel, BitsAndBytesConfig
from comfy.model_management import throw_exception_if_processing_interrupted, get_torch_device, cpu_state, CPUState
from comfy.nodes.package_typing import CustomNode
from comfy.utils import ProgressBar, get_project_root
# todo: find or download the models automatically by their config jsons instead of using well known names
_model_base_path = os.path.join(get_project_root(), "models", "deepfloyd")
def _find_files(directory: str, filename: str) -> typing.List[str]:
return [os.path.join(root, file) for root, _, files in os.walk(directory) for file in files if file == filename]
# todo: ticket diffusers to correctly deal with an omitted unet
def _patched_enable_model_cpu_offload_ifpipeline(self: IFPipeline | IFSuperResolutionPipeline, gpu_id=0):
r"""
Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Compared
to `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`
method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with
`enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`.
"""
if is_accelerate_available() and is_accelerate_version(">=", "0.17.0.dev0"):
from accelerate import cpu_offload_with_hook
else:
raise ImportError("`enable_model_cpu_offload` requires `accelerate v0.17.0` or higher.")
if cpu_state == CPUState.GPU:
device = torch.device(f"cuda:{gpu_id}")
else:
device = get_torch_device()
if cpu_state == CPUState.CPU or cpu_state == CPUState.MPS:
return
if self.device.type != "cpu":
self.to("cpu", silence_dtype_warnings=True)
torch.cuda.empty_cache() # otherwise we don't see the memory savings (but they probably exist)
hook = None
if self.text_encoder is not None:
_, hook = cpu_offload_with_hook(self.text_encoder, device, prev_module_hook=hook)
# Accelerate will move the next model to the device _before_ calling the offload hook of the
# previous model. This will cause both models to be present on the device at the same time.
# IF uses T5 for its text encoder which is really large. We can manually call the offload
# hook for the text encoder to ensure it's moved to the cpu before the unet is moved to
# the GPU.
self.text_encoder_offload_hook = hook
# todo: patch here
if self.unet is not None:
_, hook = cpu_offload_with_hook(self.unet, device, prev_module_hook=hook)
# if the safety checker isn't called, `unet_offload_hook` will have to be called to manually offload the unet
self.unet_offload_hook = hook
if self.safety_checker is not None:
_, hook = cpu_offload_with_hook(self.safety_checker, device, prev_module_hook=hook)
# We'll offload the last model manually.
self.final_offload_hook = hook
def _cpu_offload(self: DiffusionPipeline, gpu_id=0):
# todo: use sequential for low vram, ordinary cpu offload for normal vram
if isinstance(self, IFPipeline) or isinstance(self, IFSuperResolutionPipeline):
_patched_enable_model_cpu_offload_ifpipeline(self, gpu_id)
# todo: include sequential usage
# elif isinstance(self, StableDiffusionUpscalePipeline):
# self.enable_sequential_cpu_offload(gpu_id)
elif hasattr(self, 'enable_model_cpu_offload'):
self.enable_model_cpu_offload(gpu_id)
class IFLoader(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model_name": (IFLoader._MODELS, {"default": "I-M"}),
"quantization": (list(IFLoader._QUANTIZATIONS.keys()), {"default": "16-bit"}),
},
"optional": {
"hugging_face_token": ("STRING", {"default": ""}),
}
}
CATEGORY = "deepfloyd"
FUNCTION = "process"
RETURN_TYPES = ("IF_MODEL",)
_MODELS = ["I-M", "I-L", "I-XL", "II-M", "II-L", "III", "t5"]
_QUANTIZATIONS = {
"4-bit": BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
),
"8-bit": BitsAndBytesConfig(
load_in_8bit=True,
),
"16-bit": None,
}
def process(self, model_name: str, quantization: str, hugging_face_token: str = ""):
assert model_name in IFLoader._MODELS
model_v: DiffusionPipeline
model_path: str
kwargs = {
"variant": "fp16",
"torch_dtype": torch.float16,
"requires_safety_checker": False,
"feature_extractor": None,
"safety_checker": None,
"watermarker": None,
"device_map": None
}
if hugging_face_token is not None and hugging_face_token != "":
kwargs['access_token'] = hugging_face_token
elif 'HUGGING_FACE_HUB_TOKEN' in os.environ:
pass
if IFLoader._QUANTIZATIONS[quantization] is not None:
kwargs['quantization_config'] = IFLoader._QUANTIZATIONS[quantization]
if model_name == "t5":
# find any valid IF model
try:
model_path = next(os.path.dirname(file) for file in _find_files(_model_base_path, "model_index.json") if
any(x == T5EncoderModel.__name__ for x in
json.load(open(file, 'r'))["text_encoder"]))
except:
model_path = "DeepFloyd/IF-I-M-v1.0"
kwargs["unet"] = None
elif model_name == "III":
model_path = f"{_model_base_path}/stable-diffusion-x4-upscaler"
del kwargs["variant"]
else:
model_path = f"{_model_base_path}/IF-{model_name}-v1.0"
kwargs["text_encoder"] = None
if not os.path.exists(model_path):
kwargs['cache_dir='] = os.path.abspath(_model_base_path)
if model_name == "t5":
model_path = "DeepFloyd/IF-I-M-v1.0"
else:
model_path = f"DeepFloyd/IF-{model_name}-v1.0"
model_v = DiffusionPipeline.from_pretrained(
pretrained_model_name_or_path=model_path,
**kwargs
)
device = get_torch_device()
model_v = model_v.to(device)
_cpu_offload(model_v, gpu_id=model_v.device.index)
return (model_v,)
class IFEncoder(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("IF_MODEL",),
"positive": ("STRING", {"default": "", "multiline": True}),
"negative": ("STRING", {"default": "", "multiline": True}),
},
}
CATEGORY = "deepfloyd"
FUNCTION = "process"
RETURN_TYPES = ("POSITIVE", "NEGATIVE",)
def process(self, model: IFPipeline, positive, negative):
positive, negative = model.encode_prompt(
prompt=positive,
negative_prompt=negative,
)
return (positive, negative,)
class IFStageI:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"positive": ("POSITIVE",),
"negative": ("NEGATIVE",),
"model": ("IF_MODEL",),
"width": ("INT", {"default": 64, "min": 8, "max": 128, "step": 8}),
"height": ("INT", {"default": 64, "min": 8, "max": 128, "step": 8}),
"batch_size": ("INT", {"default": 1, "min": 1, "max": 100}),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0})
},
}
CATEGORY = "deepfloyd"
FUNCTION = "process"
RETURN_TYPES = ("IMAGE",)
def process(self, model: IFPipeline, positive, negative, width, height, batch_size, seed, steps, cfg):
progress = ProgressBar(steps)
def callback(step, time_step, latent):
throw_exception_if_processing_interrupted()
progress.update_absolute(step)
gc.collect()
image = model(
prompt_embeds=positive,
negative_prompt_embeds=negative,
width=width,
height=height,
generator=torch.manual_seed(seed),
guidance_scale=cfg,
num_images_per_prompt=batch_size,
num_inference_steps=steps,
callback=callback,
output_type="pt",
).images
image = (image / 2 + 0.5).clamp(0, 1)
image = image.cpu().float().permute(0, 2, 3, 1)
return (image,)
class IFStageII:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"positive": ("POSITIVE",),
"negative": ("NEGATIVE",),
"model": ("IF_MODEL",),
"images": ("IMAGE",),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
},
}
CATEGORY = "deepfloyd"
FUNCTION = "process"
RETURN_NAMES = ("IMAGES",)
RETURN_TYPES = ("IMAGE",)
def process(self, model, images, positive, negative, seed, steps, cfg):
images = images.permute(0, 3, 1, 2)
progress = ProgressBar(steps)
batch_size = images.shape[0]
if batch_size > 1:
positive = positive.repeat(batch_size, 1, 1)
negative = negative.repeat(batch_size, 1, 1)
def callback(step, time_step, latent):
throw_exception_if_processing_interrupted()
progress.update_absolute(step)
images = model(
image=images,
prompt_embeds=positive,
negative_prompt_embeds=negative,
height=images.shape[2] // 8 * 8 * 4,
width=images.shape[3] // 8 * 8 * 4,
generator=torch.manual_seed(seed),
guidance_scale=cfg,
num_inference_steps=steps,
callback=callback,
output_type="pt",
).images
images = images.clamp(0, 1)
images = images.permute(0, 2, 3, 1)
images = images.to("cpu", torch.float32)
return (images,)
class IFStageIII:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("IF_MODEL",),
"image": ("IMAGE",),
"tile": ([False, True], {"default": False}),
"tile_size": ("INT", {"default": 512, "min": 64, "max": 1024, "step": 64}),
"noise": ("INT", {"default": 20, "min": 0, "max": 100}),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
"positive": ("STRING", {"default": "", "multiline": True}),
"negative": ("STRING", {"default": "", "multiline": True}),
},
}
CATEGORY = "deepfloyd"
FUNCTION = "process"
RETURN_TYPES = ("IMAGE",)
def process(self, model: StableDiffusionUpscalePipeline, image, tile, tile_size, noise, seed, steps, cfg, positive,
negative):
image = image.permute(0, 3, 1, 2)
progress = ProgressBar(steps)
batch_size = image.shape[0]
if batch_size > 1:
positive = [positive] * batch_size
negative = [negative] * batch_size
if tile:
model.vae.config.sample_size = tile_size
model.vae.enable_tiling()
def callback(step, time_step, latent):
throw_exception_if_processing_interrupted()
progress.update_absolute(step)
image = model(
image=image,
prompt=positive,
negative_prompt=negative,
noise_level=noise,
generator=torch.manual_seed(seed),
guidance_scale=cfg,
num_inference_steps=steps,
callback=callback,
output_type="pt",
).images.cpu().float().permute(0, 2, 3, 1)
return (image,)

View File

@ -1,95 +0,0 @@
# Mara Huldra 2023
# SPDX-License-Identifier: MIT
'''
Downscale an image using k-centroid scaling. This is useful for images with a
low number of separate colors, such as those generated by Astropulse's pixel
art model.
'''
from itertools import product
import numpy as np
from PIL import Image
import torch
from comfy.nodes.package_typing import CustomNode
MAX_RESOLUTION = 1024
AUTO_FACTOR = 8
def k_centroid_downscale(images, width, height, centroids=2):
'''k-centroid scaling, based on: https://github.com/Astropulse/stable-diffusion-aseprite/blob/main/scripts/image_server.py.'''
downscaled = np.zeros((images.shape[0], height, width, 3), dtype=np.uint8)
for ii, image in enumerate(images):
i = 255. * image.cpu().numpy()
image = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
factor = (image.width // width, image.height // height)
for x, y in product(range(width), range(height)):
tile = image.crop((x * factor[0], y * factor[1], (x + 1) * factor[0], (y + 1) * factor[1]))
# quantize tile to fixed number of colors (creates palettized image)
tile = tile.quantize(colors=centroids, method=1, kmeans=centroids)
# get most common (median) color
color_counts = tile.getcolors()
most_common_idx = max(color_counts, key=lambda x: x[0])[1]
downscaled[ii, y, x, :] = tile.getpalette()[most_common_idx * 3:(most_common_idx + 1) * 3]
downscaled = downscaled.astype(np.float32) / 255.0
return torch.from_numpy(downscaled)
class ImageKCentroidDownscale(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"width": ("INT", {"default": 64, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
"height": ("INT", {"default": 64, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
"centroids": ("INT", {"default": 2, "min": 1, "max": 256, "step": 1}),
}
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "downscale"
CATEGORY = "image/downscaling"
def downscale(self, image, width, height, centroids):
s = k_centroid_downscale(image, width, height, centroids)
return (s,)
class ImageKCentroidAutoDownscale(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"centroids": ("INT", {"default": 2, "min": 1, "max": 256, "step": 1}),
}
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "downscale"
CATEGORY = "image/downscaling"
def downscale(self, image, centroids):
width = image.shape[2] // AUTO_FACTOR
height = image.shape[1] // AUTO_FACTOR
s = k_centroid_downscale(image, width, height, centroids)
return (s,)
NODE_CLASS_MAPPINGS = {
"ImageKCentroidDownscale": ImageKCentroidDownscale,
"ImageKCentroidAutoDownscale": ImageKCentroidAutoDownscale,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"ImageKCentroidDownscale": "K-Centroid Downscale",
"ImageKCentroidAutoDownscale": "K-Centroid Downscale (autosize)"
}

View File

@ -1,80 +0,0 @@
# Mara Huldra 2023
# SPDX-License-Identifier: MIT
'''
Patches the SD model and VAE to make it possible to generate seamlessly tilable
graphics. Horizontal and vertical direction are configurable separately.
'''
from typing import Optional
import torch
from torch.nn import functional as F
from torch.nn.modules.utils import _pair
from comfy.nodes.package_typing import CustomNode
def flatten_modules(m):
'''Return submodules of module m in flattened form.'''
yield m
for c in m.children():
yield from flatten_modules(c)
# from: https://github.com/Astropulse/stable-diffusion-aseprite/blob/main/scripts/image_server.py
def make_seamless_xy(model, x, y):
for layer in flatten_modules(model):
if type(layer) == torch.nn.Conv2d:
layer.padding_modeX = 'circular' if x else 'constant'
layer.padding_modeY = 'circular' if y else 'constant'
layer.paddingX = (layer._reversed_padding_repeated_twice[0], layer._reversed_padding_repeated_twice[1], 0, 0)
layer.paddingY = (0, 0, layer._reversed_padding_repeated_twice[2], layer._reversed_padding_repeated_twice[3])
layer._conv_forward = __replacementConv2DConvForward.__get__(layer, torch.nn.Conv2d)
def restore_conv2d_methods(model):
for layer in flatten_modules(model):
if type(layer) == torch.nn.Conv2d:
layer._conv_forward = torch.nn.Conv2d._conv_forward.__get__(layer, torch.nn.Conv2d)
def __replacementConv2DConvForward(self, input: torch.Tensor, weight: torch.Tensor, bias: Optional[torch.Tensor]):
working = F.pad(input, self.paddingX, mode=self.padding_modeX)
working = F.pad(working, self.paddingY, mode=self.padding_modeY)
return F.conv2d(working, weight, bias, self.stride, _pair(0), self.dilation, self.groups)
class MakeModelTileable(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("MODEL",),
"vae": ("VAE",),
"tile_x": (["disabled", "enabled"], { "default": "disabled", }),
"tile_y": (["disabled", "enabled"], { "default": "disabled", }),
}
}
RETURN_TYPES = ("MODEL", "VAE")
FUNCTION = "patch_models"
CATEGORY = "advanced/patchers"
def patch_models(self, model, vae, tile_x, tile_y):
tile_x = (tile_x == 'enabled')
tile_y = (tile_y == 'enabled')
# XXX ideally, we'd return a clone of the model, not patch the model itself
#model = model.clone()
#vae = vae.???()
restore_conv2d_methods(model.model)
restore_conv2d_methods(vae.first_stage_model)
make_seamless_xy(model.model, tile_x, tile_y)
make_seamless_xy(vae.first_stage_model, tile_x, tile_y)
return (model, vae)
NODE_CLASS_MAPPINGS = {
"MakeModelTileable": MakeModelTileable,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"MakeModelTileable": "Patch model tileability"
}

View File

@ -1,69 +0,0 @@
# Mara Huldra 2023
# SPDX-License-Identifier: MIT
'''
Extra mask operations.
'''
import numpy as np
import rembg
import torch
from comfy.nodes.package_typing import CustomNode
class BinarizeMask(CustomNode):
'''Binarize (threshold) a mask.'''
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"mask": ("MASK",),
"threshold": ("INT", {
"default": 250,
"min": 0,
"max": 255,
"step": 1,
}),
},
}
RETURN_TYPES = ("MASK",)
FUNCTION = "binarize"
CATEGORY = "mask"
def binarize(self, mask, threshold):
t = torch.Tensor([threshold / 255.])
s = (mask >= t).float()
return (s,)
class ImageCutout(CustomNode):
'''Perform basic image cutout (adds alpha channel from mask).'''
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"mask": ("MASK",),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "cutout"
CATEGORY = "image/postprocessing"
def cutout(self, image, mask):
# XXX check compatible dimensions.
o = np.zeros((image.shape[0], image.shape[1], image.shape[2], 4))
o[:, :, :, 0:3] = image.cpu().numpy()
o[:, :, :, 3] = mask.cpu().numpy()
return (torch.from_numpy(o),)
NODE_CLASS_MAPPINGS = {
"BinarizeMask": BinarizeMask,
"ImageCutout": ImageCutout,
}

View File

@ -1,161 +0,0 @@
# Mara Huldra 2023
# SPDX-License-Identifier: MIT
'''
Palettize an image.
'''
import os
import numpy as np
from PIL import Image
import torch
from comfy.nodes.package_typing import CustomNode
PALETTES_PATH = os.path.join(os.path.dirname(__file__), '../../..', 'palettes')
PAL_EXT = '.png'
QUANTIZE_METHODS = {
'median_cut': Image.Quantize.MEDIANCUT,
'max_coverage': Image.Quantize.MAXCOVERAGE,
'fast_octree': Image.Quantize.FASTOCTREE,
}
# Determine optimal number of colors.
# FROM: astropulse/sd-palettize
#
# Use FASTOCTREE for determining the best k, as it is
# - its faster
# - it does a better job fitting the image to lower color counts than the other options
# Max converge is best for reducing an image's colors more accurately, but
# since for best k we only care about the best number of colors, a faster more
# predictable method is better.
# (Astropulse, 2023-06-05)
def determine_best_k(image, max_k, quantize_method=Image.Quantize.FASTOCTREE):
# Convert the image to RGB mode
image = image.convert("RGB")
# Prepare arrays for distortion calculation
pixels = np.array(image)
pixel_indices = np.reshape(pixels, (-1, 3))
# Calculate distortion for different values of k
distortions = []
for k in range(1, max_k + 1):
quantized_image = image.quantize(colors=k, method=quantize_method, kmeans=k, dither=0)
centroids = np.array(quantized_image.getpalette()[:k * 3]).reshape(-1, 3)
# Calculate distortions
distances = np.linalg.norm(pixel_indices[:, np.newaxis] - centroids, axis=2)
min_distances = np.min(distances, axis=1)
distortions.append(np.sum(min_distances ** 2))
# Calculate the rate of change of distortions
rate_of_change = np.diff(distortions) / np.array(distortions[:-1])
# Find the elbow point (best k value)
if len(rate_of_change) == 0:
best_k = 2
else:
elbow_index = np.argmax(rate_of_change) + 1
best_k = elbow_index + 2
return best_k
palette_warned = False
def list_palettes():
global palette_warned
palettes = []
try:
for filename in os.listdir(PALETTES_PATH):
if filename.endswith(PAL_EXT):
palettes.append(filename[0:-len(PAL_EXT)])
except FileNotFoundError:
pass
if not palettes and not palette_warned:
palette_warned = True
print(
"ImagePalettize warning: no fixed palettes found. You can put these in the palettes/ directory below the ComfyUI root.")
return palettes
def get_image_colors(pal_img):
palette = []
pal_img = pal_img.convert('RGB')
for i in pal_img.getcolors(16777216):
palette.append(i[1][0])
palette.append(i[1][1])
palette.append(i[1][2])
return palette
def load_palette(name):
return get_image_colors(Image.open(os.path.join(PALETTES_PATH, name + PAL_EXT)))
class ImagePalettize(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"palette": (["auto_best_k", "auto_fixed_k"] + list_palettes(), {
"default": "auto_best_k",
}),
"max_k": ("INT", {
"default": 64,
"min": 1,
"max": 256,
"step": 1,
}),
"method": (list(QUANTIZE_METHODS.keys()), {
"default": "max_coverage",
}),
}
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "palettize"
CATEGORY = "image/postprocessing"
def palettize(self, image, palette, max_k, method):
k = None
pal_img = None
if palette not in {'auto_best_k', 'auto_fixed_k'}:
pal_entries = load_palette(palette)
k = len(pal_entries) // 3
pal_img = Image.new('P', (1, 1)) # image size doesn't matter it only holds the palette
pal_img.putpalette(pal_entries)
results = []
for i in image:
i = 255. * i.cpu().numpy()
i = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
if palette == 'auto_best_k':
k = determine_best_k(i, max_k)
print(f'Auto number of colors: {k}')
elif palette == 'auto_fixed_k':
k = max_k
i = i.quantize(colors=k, method=QUANTIZE_METHODS[method], kmeans=k, dither=0, palette=pal_img)
i = i.convert('RGB')
results.append(np.array(i))
result = np.array(results).astype(np.float32) / 255.0
return (torch.from_numpy(result),)
NODE_CLASS_MAPPINGS = {
"ImagePalettize": ImagePalettize,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"ImagePalettize": "ImagePalettize"
}

View File

@ -1,47 +0,0 @@
# Mara Huldra 2023
# SPDX-License-Identifier: MIT
'''
Simple image pattern generators.
'''
import os
import numpy as np
from PIL import Image
import torch
from comfy.nodes.package_typing import CustomNode
MAX_RESOLUTION = 8192
class ImageSolidColor(CustomNode):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"width": ("INT", {"default": 64, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
"height": ("INT", {"default": 64, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
"r": ("INT", {"default": 0, "min": 0, "max": 255, "step": 1}),
"g": ("INT", {"default": 0, "min": 0, "max": 255, "step": 1}),
"b": ("INT", {"default": 0, "min": 0, "max": 255, "step": 1}),
}
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "render"
CATEGORY = "image/pattern"
def render(self, width, height, r, g, b):
color = torch.tensor([r, g, b]) / 255.0
result = color.expand(1, height, width, 3)
return (result,)
NODE_CLASS_MAPPINGS = {
"ImageSolidColor": ImageSolidColor,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"ImageSolidColor": "Solid Color",
}

View File

@ -1,121 +0,0 @@
# Mara Huldra 2023
# SPDX-License-Identifier: MIT
'''
Estimate what pixels belong to the background and perform a cut-out, using the 'rembg' models.
'''
import numpy as np
import rembg
import torch
from comfy.nodes.package_typing import CustomNode
MODELS = rembg.sessions.sessions_names
class ImageRemoveBackground(CustomNode):
'''Remove background from image (adds an alpha channel)'''
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"model": (MODELS, {
"default": "u2net",
}),
"alpha_matting": (["disabled", "enabled"], {
"default": "disabled",
}),
"am_foreground_thr": ("INT", {
"default": 240,
"min": 0,
"max": 255,
"step": 1,
}),
"am_background_thr": ("INT", {
"default": 10,
"min": 0,
"max": 255,
"step": 1,
}),
"am_erode_size": ("INT", {
"default": 10,
"min": 0,
"max": 255,
"step": 1,
}),
},
}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "remove_background"
CATEGORY = "image/postprocessing"
def remove_background(self, image, model, alpha_matting, am_foreground_thr, am_background_thr, am_erode_size):
session = rembg.new_session(model)
results = []
for i in image:
i = 255. * i.cpu().numpy()
i = np.clip(i, 0, 255).astype(np.uint8)
i = rembg.remove(i,
alpha_matting=(alpha_matting == "enabled"),
alpha_matting_foreground_threshold=am_foreground_thr,
alpha_matting_background_threshold=am_background_thr,
alpha_matting_erode_size=am_erode_size,
session=session,
)
results.append(i.astype(np.float32) / 255.0)
s = torch.from_numpy(np.array(results))
return (s,)
class ImageEstimateForegroundMask:
'''
Return a mask of which pixels are estimated to belong to foreground.
Only estimates the mask, does not perform cutout like
ImageRemoveBackground.
'''
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"model": (MODELS, {
"default": "u2net",
}),
},
}
RETURN_TYPES = ("MASK",)
FUNCTION = "estimate_background"
CATEGORY = "image/postprocessing"
def estimate_background(self, image, model):
session = rembg.new_session(model)
results = []
for i in image:
i = 255. * i.cpu().numpy()
i = np.clip(i, 0, 255).astype(np.uint8)
i = rembg.remove(i, only_mask=True, session=session)
results.append(i.astype(np.float32) / 255.0)
s = torch.from_numpy(np.array(results))
print(s.shape)
return (s,)
NODE_CLASS_MAPPINGS = {
"ImageRemoveBackground": ImageRemoveBackground,
"ImageEstimateForegroundMask": ImageEstimateForegroundMask,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"ImageRemoveBackground": "Remove Background (rembg)",
"ImageEstimateForegroundMask": "Estimate Foreground (rembg)",
}