ComfyUI/custom_nodes/execution-inversion-demo-comfyui/utility_nodes.py
Jacob Segal 41142794ca Add syntactic sugar (for loops and more)
Note that For loops still require WAS-ns. They just expand to using the
integer operation node internally.

Also adds some nodes for accumulation operations.
2023-07-20 00:18:18 -07:00

247 lines
6.8 KiB
Python

import torch
from comfy.graph_utils import GraphBuilder
class AccumulateNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"to_add": ("*",),
},
"optional": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("ACCUMULATION",)
FUNCTION = "accumulate"
CATEGORY = "InversionDemo Nodes"
def accumulate(self, to_add, accumulation = None):
if accumulation is None:
value = [to_add]
else:
value = accumulation["accum"] + [to_add]
return ({"accum": value},)
class AccumulationHeadNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("ACCUMULATION", "*",)
FUNCTION = "accumulation_head"
CATEGORY = "InversionDemo Nodes"
def accumulation_head(self, accumulation):
accum = accumulation["accum"]
if len(accum) == 0:
return (accumulation, None)
else:
return ({"accum": accum[1:]}, accum[0])
class AccumulationTailNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("ACCUMULATION", "*",)
FUNCTION = "accumulation_tail"
CATEGORY = "InversionDemo Nodes"
def accumulation_tail(self, accumulation):
accum = accumulation["accum"]
if len(accum) == 0:
return (None, accumulation)
else:
return ({"accum": accum[:-1]}, accum[-1])
class AccumulationToListNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"accumulation": ("ACCUMULATION",),
},
}
RETURN_TYPES = ("*",)
OUTPUT_IS_LIST = (True,)
FUNCTION = "accumulation_to_list"
CATEGORY = "InversionDemo Nodes"
def accumulation_to_list(self, accumulation):
return (accumulation["accum"],)
class ListToAccumulationNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"list": ("*",),
},
}
RETURN_TYPES = ("*",)
INPUT_IS_LIST = (True,)
FUNCTION = "list_to_accumulation"
CATEGORY = "InversionDemo Nodes"
def accumulation_to_list(self, list):
return ({"accum": list},)
class IsTruthyNode:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"value": ("*",),
},
}
RETURN_TYPES = ("INT",)
FUNCTION = "is_truthy"
CATEGORY = "InversionDemo Nodes"
def is_truthy(self, value):
if isinstance(value, torch.Tensor):
if value.max().item() == 0 and value.min().item() == 0:
return (0,)
else:
return (1,)
try:
return (int(bool(value)),)
except:
# Can't convert it? Well then it's something or other. I dunno, I'm not a Python programmer.
return (1,)
from .flow_control import NUM_FLOW_SOCKETS
class ForLoopOpen:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}),
},
"optional": {
"initial_value%d" % i: ("*",) for i in range(1, NUM_FLOW_SOCKETS)
},
"hidden": {
"initial_value0": ("*",)
}
}
RETURN_TYPES = tuple(["FLOW_CONTROL", "INT",] + ["*"] * (NUM_FLOW_SOCKETS-1))
RETURN_NAMES = tuple(["flow_control", "remaining"] + ["value%d" % i for i in range(1, NUM_FLOW_SOCKETS)])
FUNCTION = "for_loop_open"
CATEGORY = "Flow Control"
def for_loop_open(self, remaining, **kwargs):
graph = GraphBuilder()
if "initial_value0" in kwargs:
remaining = kwargs["initial_value0"]
while_open = graph.node("WhileLoopOpen", condition=remaining, initial_value0=remaining, **{("initial_value%d" % i): kwargs.get("initial_value%d" % i, None) for i in range(1, NUM_FLOW_SOCKETS)})
outputs = [kwargs.get("initial_value%d" % i, None) for i in range(1, NUM_FLOW_SOCKETS)]
return {
"result": tuple(["stub", remaining] + outputs),
"expand": graph.finalize(),
}
class ForLoopClose:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"flow_control": ("FLOW_CONTROL", {"raw_link": True}),
"old_remaining": ("INT", {"default": 1, "min": 0, "max": 100000, "step": 1}),
},
"optional": {
"initial_value%d" % i: ("*",{"raw_link": True}) for i in range(1, NUM_FLOW_SOCKETS)
},
}
RETURN_TYPES = tuple(["*"] * (NUM_FLOW_SOCKETS-1))
RETURN_NAMES = tuple(["value%d" % i for i in range(1, NUM_FLOW_SOCKETS)])
FUNCTION = "for_loop_close"
CATEGORY = "Flow Control"
def for_loop_close(self, flow_control, old_remaining, **kwargs):
graph = GraphBuilder()
while_open = flow_control[0]
# TODO - Requires WAS-ns. Will definitely want to solve before merging
sub = graph.node("Number Operation", operation="subtraction", number_a=[while_open,1], number_b=1)
input_values = {("initial_value%d" % i): kwargs.get("initial_value%d" % i, None) for i in range(1, NUM_FLOW_SOCKETS)}
while_close = graph.node("WhileLoopClose",
flow_control=flow_control,
condition=sub.out(0),
initial_value0=sub.out(0),
**input_values)
return {
"result": tuple([while_close.out(i) for i in range(1, NUM_FLOW_SOCKETS)]),
"expand": graph.finalize(),
}
UTILITY_NODE_CLASS_MAPPINGS = {
"AccumulateNode": AccumulateNode,
"AccumulationHeadNode": AccumulationHeadNode,
"AccumulationTailNode": AccumulationTailNode,
"AccumulationToListNode": AccumulationToListNode,
"ListToAccumulationNode": ListToAccumulationNode,
"IsTruthyNode": IsTruthyNode,
"ForLoopOpen": ForLoopOpen,
"ForLoopClose": ForLoopClose,
}
UTILITY_NODE_DISPLAY_NAME_MAPPINGS = {
"AccumulateNode": "Accumulate",
"AccumulationHeadNode": "Accumulation Head",
"AccumulationTailNode": "Accumulation Tail",
"AccumulationToListNode": "Accumulation to List",
"ListToAccumulationNode": "List to Accumulation",
"IsTruthyNode": "Is Truthy",
"ForLoopOpen": "For Loop Open",
"ForLoopClose": "For Loop Close",
}