ComfyUI/comfy_extras/nodes_cfg.py
2026-05-23 17:43:25 +03:00

123 lines
4.4 KiB
Python

from typing_extensions import override
import torch
from comfy_api.latest import ComfyExtension, io
# https://github.com/WeichenFan/CFG-Zero-star
def optimized_scale(positive, negative):
positive_flat = positive.reshape(positive.shape[0], -1)
negative_flat = negative.reshape(negative.shape[0], -1)
# Calculate dot production
dot_product = torch.sum(positive_flat * negative_flat, dim=1, keepdim=True)
# Squared norm of uncondition
squared_norm = torch.sum(negative_flat ** 2, dim=1, keepdim=True) + 1e-8
# st_star = v_cond^T * v_uncond / ||v_uncond||^2
st_star = dot_product / squared_norm
return st_star.reshape([positive.shape[0]] + [1] * (positive.ndim - 1))
class CFGZeroStar(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="CFGZeroStar",
category="advanced/guidance",
inputs=[
io.Model.Input("model"),
],
outputs=[io.Model.Output(display_name="patched_model")],
)
@classmethod
def execute(cls, model) -> io.NodeOutput:
m = model.clone()
def cfg_zero_star(args):
guidance_scale = args['cond_scale']
x = args['input']
cond_p = args['cond_denoised']
uncond_p = args['uncond_denoised']
out = args["denoised"]
alpha = optimized_scale(x - cond_p, x - uncond_p)
return out + uncond_p * (alpha - 1.0) + guidance_scale * uncond_p * (1.0 - alpha)
m.set_model_sampler_post_cfg_function(cfg_zero_star)
return io.NodeOutput(m)
class CFGNorm(io.ComfyNode):
@classmethod
def define_schema(cls) -> io.Schema:
return io.Schema(
node_id="CFGNorm",
category="advanced/guidance",
inputs=[
io.Model.Input("model"),
io.Float.Input("strength", default=1.0, min=0.0, max=100.0, step=0.01),
io.Boolean.Input(
"pre_cfg",
default=False,
optional=True,
tooltip=(
"If true, rescale the combined noise BEFORE the sampler's CFG combine, "
"without clamping (can amplify). Matches the norm-scaled CFG used by "
"models like Lens. Default false keeps the original post-CFG x0-space "
"attenuate-only behavior."
),
),
],
outputs=[io.Model.Output(display_name="patched_model")],
is_experimental=True,
)
@classmethod
def execute(cls, model, strength, pre_cfg=False) -> io.NodeOutput:
m = model.clone()
if pre_cfg:
def cfg_norm_pre(args):
cond = args["cond"]
uncond = args["uncond"]
cond_scale = args["cond_scale"]
comb = uncond + cond_scale * (cond - uncond)
cond_norm = torch.linalg.vector_norm(cond, dim=1, keepdim=True)
comb_norm = torch.linalg.vector_norm(comb, dim=1, keepdim=True)
rescale = torch.where(
comb_norm > 0,
cond_norm / comb_norm.clamp_min(1e-12),
torch.ones_like(comb_norm),
)
rescaled = comb * rescale
# strength blends back toward standard linear CFG (1.0 = full rescale).
if strength != 1.0:
rescaled = strength * rescaled + (1.0 - strength) * comb
return rescaled
m.set_model_sampler_cfg_function(cfg_norm_pre)
else:
def cfg_norm(args):
cond_p = args['cond_denoised']
pred_text_ = args["denoised"]
norm_full_cond = torch.norm(cond_p, dim=1, keepdim=True)
norm_pred_text = torch.norm(pred_text_, dim=1, keepdim=True)
scale = (norm_full_cond / (norm_pred_text + 1e-8)).clamp(min=0.0, max=1.0)
return pred_text_ * scale * strength
m.set_model_sampler_post_cfg_function(cfg_norm)
return io.NodeOutput(m)
class CfgExtension(ComfyExtension):
@override
async def get_node_list(self) -> list[type[io.ComfyNode]]:
return [
CFGZeroStar,
CFGNorm,
]
async def comfy_entrypoint() -> CfgExtension:
return CfgExtension()