mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-11 23:00:51 +08:00
304 lines
12 KiB
Python
304 lines
12 KiB
Python
import os
|
|
import re
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
import numpy as np
|
|
import pytest
|
|
import torch
|
|
from PIL import Image
|
|
from freezegun import freeze_time
|
|
|
|
from comfy.cmd import folder_paths
|
|
from comfy.component_model.executor_types import ValidateInputsTuple
|
|
from comfy_extras.nodes.nodes_open_api import SaveImagesResponse, IntRequestParameter, FloatRequestParameter, \
|
|
StringRequestParameter, HashImage, StringPosixPathJoin, LegacyOutputURIs, DevNullUris, StringJoin, StringToUri, \
|
|
UriFormat, ImageExifMerge, ImageExifCreationDateAndBatchNumber, ImageExif, ImageExifUncommon, \
|
|
StringEnumRequestParameter, ExifContainer, BooleanRequestParameter, ImageRequestParameter
|
|
|
|
_image_1x1 = torch.zeros((1, 1, 3), dtype=torch.float32, device="cpu")
|
|
|
|
|
|
def test_save_image_response():
|
|
assert SaveImagesResponse.INPUT_TYPES() is not None
|
|
n = SaveImagesResponse()
|
|
ui_node_ret_dict = n.execute(images=[_image_1x1], uris=["with_prefix/1.png"], name="test")
|
|
assert os.path.isfile(os.path.join(folder_paths.get_output_directory(), "with_prefix/1.png"))
|
|
assert len(ui_node_ret_dict["result"]) == 1
|
|
assert len(ui_node_ret_dict["ui"]["images"]) == 1
|
|
image_result, = ui_node_ret_dict["result"]
|
|
assert image_result[0]["filename"] == "1.png"
|
|
assert image_result[0]["subfolder"] == "with_prefix"
|
|
assert image_result[0]["name"] == "test"
|
|
|
|
|
|
def test_save_image_response_abs_local_uris():
|
|
assert SaveImagesResponse.INPUT_TYPES() is not None
|
|
n = SaveImagesResponse()
|
|
ui_node_ret_dict = n.execute(images=[_image_1x1], uris=[os.path.join(folder_paths.get_output_directory(), "with_prefix/1.png")], name="test")
|
|
assert os.path.isfile(os.path.join(folder_paths.get_output_directory(), "with_prefix/1.png"))
|
|
assert len(ui_node_ret_dict["result"]) == 1
|
|
assert len(ui_node_ret_dict["ui"]["images"]) == 1
|
|
image_result, = ui_node_ret_dict["result"]
|
|
assert image_result[0]["filename"] == "1.png"
|
|
assert image_result[0]["subfolder"] == "with_prefix"
|
|
assert image_result[0]["name"] == "test"
|
|
|
|
|
|
def test_save_image_response_remote_uris():
|
|
n = SaveImagesResponse()
|
|
uri = "memory://some_folder/1.png"
|
|
ui_node_ret_dict = n.execute(images=[_image_1x1], uris=[uri])
|
|
assert len(ui_node_ret_dict["result"]) == 1
|
|
assert len(ui_node_ret_dict["ui"]["images"]) == 1
|
|
image_result, = ui_node_ret_dict["result"]
|
|
filename_ = image_result[0]["filename"]
|
|
assert filename_ != "1.png"
|
|
assert filename_ != ""
|
|
assert uuid.UUID(filename_.replace(".png", "")) is not None
|
|
assert os.path.isfile(os.path.join(folder_paths.get_output_directory(), filename_))
|
|
assert image_result[0]["abs_path"] == uri
|
|
assert image_result[0]["subfolder"] == ""
|
|
|
|
|
|
def test_save_exif():
|
|
n = SaveImagesResponse()
|
|
filename = "with_prefix/2.png"
|
|
n.execute(images=[_image_1x1], uris=[filename], name="test", exif=[ExifContainer({
|
|
"Title": "test title"
|
|
})])
|
|
filepath = os.path.join(folder_paths.get_output_directory(), filename)
|
|
assert os.path.isfile(filepath)
|
|
with Image.open(filepath) as img:
|
|
assert img.info['Title'] == "test title"
|
|
|
|
|
|
def test_no_local_file():
|
|
n = SaveImagesResponse()
|
|
uri = "memory://some_folder/2.png"
|
|
ui_node_ret_dict = n.execute(images=[_image_1x1], uris=[uri], local_uris=["/dev/null"])
|
|
assert len(ui_node_ret_dict["result"]) == 1
|
|
assert len(ui_node_ret_dict["ui"]["images"]) == 1
|
|
image_result, = ui_node_ret_dict["result"]
|
|
assert image_result[0]["filename"] == ""
|
|
assert not os.path.isfile(os.path.join(folder_paths.get_output_directory(), image_result[0]["filename"]))
|
|
assert image_result[0]["abs_path"] == uri
|
|
assert image_result[0]["subfolder"] == ""
|
|
|
|
|
|
def test_int_request_parameter():
|
|
nt = IntRequestParameter.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = IntRequestParameter()
|
|
v, = n.execute(value=1, name="test")
|
|
assert v == 1
|
|
|
|
|
|
def test_float_request_parameter():
|
|
nt = FloatRequestParameter.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = FloatRequestParameter()
|
|
v, = n.execute(value=3.5, name="test", description="")
|
|
assert v == 3.5
|
|
|
|
|
|
def test_string_request_parameter():
|
|
nt = StringRequestParameter.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = StringRequestParameter()
|
|
v, = n.execute(value="test", name="test")
|
|
assert v == "test"
|
|
|
|
|
|
def test_bool_request_parameter():
|
|
nt = BooleanRequestParameter.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = BooleanRequestParameter()
|
|
v, = n.execute(value=True, name="test")
|
|
assert v == True
|
|
|
|
|
|
def test_string_enum_request_parameter():
|
|
nt = StringEnumRequestParameter.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = StringEnumRequestParameter()
|
|
v, = n.execute(value="test", name="test")
|
|
assert v == "test"
|
|
prompt = {
|
|
"1": {
|
|
"inputs": {
|
|
"value": "euler",
|
|
"name": "sampler_name",
|
|
"title": "KSampler Node Sampler",
|
|
"description":
|
|
"This allows users to select a sampler for generating images with Latent Diffusion Models, including Stable Diffusion, ComfyUI, and SDXL. \n\nChange this only if explicitly requested by the user.\n\nList of sampler choice (this parameter): valid choices for scheduler (value for scheduler parameter).\n\n- euler: normal, karras, exponential, sgm_uniform, simple, ddim_uniform\n- euler_ancestral: normal, karras\n- heun: normal, karras\n- heunpp2: normal, karras\n- dpm_2: normal, karras\n- dpm_2_ancestral: normal, karras\n- lms: normal, karras\n- dpm_fast: normal, exponential\n- dpm_adaptive: normal, exponential\n- dpmpp_2s_ancestral: karras, exponential\n- dpmpp_sde: karras, exponential\n- dpmpp_sde_gpu: karras, exponential\n- dpmpp_2m: karras, sgm_uniform\n- dpmpp_2m_sde: karras, sgm_uniform\n- dpmpp_2m_sde_gpu: karras, sgm_uniform\n- dpmpp_3m_sde: karras, sgm_uniform\n- dpmpp_3m_sde_gpu: karras, sgm_uniform\n- ddpm: normal, simple\n- lcm: normal, exponential\n- ddim: normal, ddim_uniform\n- uni_pc: normal, karras, exponential\n- uni_pc_bh2: normal, karras, exponential",
|
|
"__required": True,
|
|
},
|
|
"class_type": "StringEnumRequestParameter",
|
|
"_meta": {
|
|
"title": "StringEnumRequestParameter",
|
|
},
|
|
},
|
|
"2": {
|
|
"inputs": {
|
|
"sampler_name": ["1", 0],
|
|
},
|
|
"class_type": "KSamplerSelect",
|
|
"_meta": {
|
|
"title": "KSamplerSelect",
|
|
},
|
|
},
|
|
}
|
|
from comfy.cmd.execution import validate_inputs
|
|
validated: dict[str, ValidateInputsTuple] = {}
|
|
validated["1"] = validate_inputs(prompt, "1", validated)
|
|
validated["2"] = validate_inputs(prompt, "2", validated)
|
|
assert validated["2"].valid
|
|
|
|
|
|
@pytest.mark.skip("issues")
|
|
def test_hash_images():
|
|
nt = HashImage.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = HashImage()
|
|
hashes, = n.execute(images=[_image_1x1.clone(), _image_1x1.clone()])
|
|
# same image, same hash
|
|
assert hashes[0] == hashes[1]
|
|
# hash should be a valid sha256 hash
|
|
p = re.compile(r'^[0-9a-fA-F]{64}$')
|
|
for hash in hashes:
|
|
assert p.match(hash)
|
|
|
|
|
|
def test_string_posix_path_join():
|
|
nt = StringPosixPathJoin.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = StringPosixPathJoin()
|
|
joined_path, = n.execute(value2="c", value0="a", value1="b")
|
|
assert joined_path == "a/b/c"
|
|
|
|
|
|
def test_legacy_output_uris(use_temporary_output_directory):
|
|
nt = LegacyOutputURIs.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = LegacyOutputURIs()
|
|
images_ = [_image_1x1, _image_1x1]
|
|
output_paths, = n.execute(images=images_)
|
|
# from SaveImage node
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path("ComfyUI", str(use_temporary_output_directory), images_[0].shape[1], images_[0].shape[0])
|
|
file1 = f"{filename}_{counter:05}_.png"
|
|
file2 = f"{filename}_{counter + 1:05}_.png"
|
|
files = [file1, file2]
|
|
assert os.path.basename(output_paths[0]) == files[0]
|
|
assert os.path.basename(output_paths[1]) == files[1]
|
|
|
|
|
|
def test_null_uris():
|
|
nt = DevNullUris.INPUT_TYPES()
|
|
assert nt is not None
|
|
n = DevNullUris()
|
|
res, = n.execute([_image_1x1, _image_1x1])
|
|
assert all(x == "/dev/null" for x in res)
|
|
|
|
|
|
def test_string_join():
|
|
assert StringJoin.INPUT_TYPES() is not None
|
|
n = StringJoin()
|
|
res, = n.execute(separator="*", value1="b", value3="c", value0="a")
|
|
assert res == "a*b*c"
|
|
|
|
|
|
def test_string_to_uri():
|
|
assert StringToUri.INPUT_TYPES() is not None
|
|
n = StringToUri()
|
|
res, = n.execute("x", batch=3)
|
|
assert res == ["x"] * 3
|
|
|
|
|
|
def test_uri_format(use_temporary_output_directory):
|
|
assert UriFormat.INPUT_TYPES() is not None
|
|
n = UriFormat()
|
|
images = [_image_1x1, _image_1x1]
|
|
# with defaults
|
|
uris, metadata_uris = n.execute(images=images, uri_template="{output}/{uuid}_{batch_index:05d}.png")
|
|
for uri in uris:
|
|
assert os.path.isabs(uri), "uri format returns absolute URIs when output appears"
|
|
assert os.path.commonpath([uri, use_temporary_output_directory]) == str(use_temporary_output_directory), "should be under output dir"
|
|
uris, metadata_uris = n.execute(images=images, uri_template="{output}/{uuid}.png")
|
|
for uri in uris:
|
|
assert os.path.isabs(uri)
|
|
assert os.path.commonpath([uri, use_temporary_output_directory]) == str(use_temporary_output_directory), "should be under output dir"
|
|
|
|
with pytest.raises(KeyError):
|
|
n.execute(images=images, uri_template="{xyz}.png")
|
|
|
|
|
|
def test_image_exif_merge():
|
|
assert ImageExifMerge.INPUT_TYPES() is not None
|
|
n = ImageExifMerge()
|
|
res, = n.execute(value0=[ExifContainer({"a": "1"}), ExifContainer({"a": "1"})], value1=[ExifContainer({"b": "2"}), ExifContainer({"a": "1"})], value2=[ExifContainer({"a": 3}), ExifContainer({})], value4=[ExifContainer({"a": ""}), ExifContainer({})])
|
|
assert res[0].exif["a"] == 3
|
|
assert res[0].exif["b"] == "2"
|
|
assert res[1].exif["a"] == "1"
|
|
|
|
|
|
@freeze_time("2024-01-14 03:21:34", tz_offset=-4)
|
|
@pytest.mark.skipif(True, reason="Time freezing not reliable on many platforms and interacts incorrectly with transformers")
|
|
def test_image_exif_creation_date_and_batch_number():
|
|
assert ImageExifCreationDateAndBatchNumber.INPUT_TYPES() is not None
|
|
n = ImageExifCreationDateAndBatchNumber()
|
|
res, = n.execute(images=[_image_1x1, _image_1x1])
|
|
mock_now = datetime(2024, 1, 13, 23, 21, 34)
|
|
|
|
now_formatted = mock_now.strftime("%Y:%m:%d %H:%M:%S%z")
|
|
assert res[0].exif["ImageNumber"] == "0"
|
|
assert res[1].exif["ImageNumber"] == "1"
|
|
assert res[0].exif["CreationDate"] == res[1].exif["CreationDate"] == now_formatted
|
|
|
|
|
|
def test_image_exif():
|
|
assert ImageExif.INPUT_TYPES() is not None
|
|
n = ImageExif()
|
|
res, = n.execute(images=[_image_1x1], Title="test", Artist="test2")
|
|
assert res[0].exif["Title"] == "test"
|
|
assert res[0].exif["Artist"] == "test2"
|
|
|
|
|
|
def test_image_exif_uncommon():
|
|
assert "DigitalZoomRatio" in ImageExifUncommon.INPUT_TYPES()["optional"]
|
|
ImageExifUncommon().execute(images=[_image_1x1])
|
|
|
|
|
|
def test_posix_join_curly_brackets():
|
|
n = StringPosixPathJoin()
|
|
joined_path, = n.execute(value2="c", value0="a_{test}", value1="b")
|
|
assert joined_path == "a_{test}/b/c"
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA is not available")
|
|
def test_file_request_parameter(use_temporary_input_directory):
|
|
_image_1x1_px = np.array([[[255, 0, 0]]], dtype=np.uint8)
|
|
image_path = os.path.join(use_temporary_input_directory, "test_image.png")
|
|
image = Image.fromarray(_image_1x1_px)
|
|
image.save(image_path)
|
|
|
|
n = ImageRequestParameter()
|
|
loaded_image, = n.execute(value=image_path)
|
|
assert loaded_image.shape == (1, 1, 1, 3)
|
|
from comfy.nodes.base_nodes import LoadImage
|
|
|
|
load_image_node = LoadImage()
|
|
load_image_node_rgb, _ = load_image_node.load_image(image=os.path.basename(image_path))
|
|
|
|
assert loaded_image.shape == load_image_node_rgb.shape
|
|
assert torch.allclose(loaded_image, load_image_node_rgb)
|
|
|
|
|
|
def test_file_request_to_http_url_no_exceptions():
|
|
n = ImageRequestParameter()
|
|
loaded_image, = n.execute(value="https://upload.wikimedia.org/wikipedia/commons/thumb/a/a6/A_rainbow_at_sunset_after_rain_in_Gaziantep%2C_Turkey.IMG_2448.jpg/484px-A_rainbow_at_sunset_after_rain_in_Gaziantep%2C_Turkey.IMG_2448.jpg")
|
|
_, height, width, channels = loaded_image.shape
|
|
assert width == 484
|
|
assert height == 480
|
|
assert channels == 3
|