Compare commits

...

5 Commits

Author SHA1 Message Date
Miklos Nagy
7a7ae38d3e
Merge c463359308 into 85fc35e8fa 2026-02-03 09:20:01 -08:00
comfyanonymous
85fc35e8fa
Fix mac issue. (#12250)
Some checks are pending
Python Linting / Run Ruff (push) Waiting to run
Python Linting / Run Pylint (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Waiting to run
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Waiting to run
Execution Tests / test (macos-latest) (push) Waiting to run
Execution Tests / test (ubuntu-latest) (push) Waiting to run
Execution Tests / test (windows-latest) (push) Waiting to run
Test server launches without errors / test (push) Waiting to run
Unit Tests / test (macos-latest) (push) Waiting to run
Unit Tests / test (ubuntu-latest) (push) Waiting to run
Unit Tests / test (windows-2022) (push) Waiting to run
2026-02-03 12:19:39 -05:00
comfyanonymous
223364743c
llama: cast logits as a comfy-weight (#12248)
This is using a different layers weight with .to(). Change it to use
the ops caster if the original layer is a comfy weight so that it picks
up dynamic_vram and async_offload functionality in full.

Co-authored-by: Rattus <rattus128@gmail.com>
2026-02-03 11:31:36 -05:00
comfyanonymous
affe881354
Fix some issues with mac. (#12247) 2026-02-03 11:07:04 -05:00
Miklos Nagy
c463359308
Update execution.py
Grouping nodes by input type (revision as my linter got rid of comments in the previous round)
2026-01-28 11:38:01 +01:00
3 changed files with 188 additions and 7 deletions

View File

@ -57,8 +57,9 @@ def sample_manual_loop_no_classes(
if eos_token_id is not None and eos_token_id < audio_start_id and min_tokens < step:
eos_score = cfg_logits[:, eos_token_id].clone()
remove_logit_value = torch.finfo(cfg_logits.dtype).min
# Only generate audio tokens
cfg_logits[:, :audio_start_id] = float('-inf')
cfg_logits[:, :audio_start_id] = remove_logit_value
if eos_token_id is not None and eos_token_id < audio_start_id and min_tokens < step:
cfg_logits[:, eos_token_id] = eos_score
@ -66,7 +67,7 @@ def sample_manual_loop_no_classes(
if top_k is not None and top_k > 0:
top_k_vals, _ = torch.topk(cfg_logits, top_k)
min_val = top_k_vals[..., -1, None]
cfg_logits[cfg_logits < min_val] = float('-inf')
cfg_logits[cfg_logits < min_val] = remove_logit_value
if top_p is not None and top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(cfg_logits, descending=True)
@ -75,7 +76,7 @@ def sample_manual_loop_no_classes(
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove)
cfg_logits[indices_to_remove] = float('-inf')
cfg_logits[indices_to_remove] = remove_logit_value
if temperature > 0:
cfg_logits = cfg_logits / temperature

View File

@ -6,6 +6,7 @@ import math
from comfy.ldm.modules.attention import optimized_attention_for_device
import comfy.model_management
import comfy.ops
import comfy.ldm.common_dit
import comfy.clip_model
@ -627,10 +628,10 @@ class Llama2_(nn.Module):
mask = None
if attention_mask is not None:
mask = 1.0 - attention_mask.to(x.dtype).reshape((attention_mask.shape[0], 1, -1, attention_mask.shape[-1])).expand(attention_mask.shape[0], 1, seq_len, attention_mask.shape[-1])
mask = mask.masked_fill(mask.to(torch.bool), float("-inf"))
mask = mask.masked_fill(mask.to(torch.bool), torch.finfo(x.dtype).min)
if seq_len > 1:
causal_mask = torch.empty(past_len + seq_len, past_len + seq_len, dtype=x.dtype, device=x.device).fill_(float("-inf")).triu_(1)
causal_mask = torch.empty(past_len + seq_len, past_len + seq_len, dtype=x.dtype, device=x.device).fill_(torch.finfo(x.dtype).min).triu_(1)
if mask is not None:
mask += causal_mask
else:
@ -794,7 +795,19 @@ class Qwen3_2B_ACE15_lm(BaseLlama, torch.nn.Module):
self.dtype = dtype
def logits(self, x):
return torch.nn.functional.linear(x[:, -1:], self.model.embed_tokens.weight.to(x), None)
input = x[:, -1:]
module = self.model.embed_tokens
offload_stream = None
if module.comfy_cast_weights:
weight, _, offload_stream = comfy.ops.cast_bias_weight(module, input, offloadable=True)
else:
weight = self.model.embed_tokens.weight.to(x)
x = torch.nn.functional.linear(input, weight, None)
comfy.ops.uncast_bias_weight(module, weight, None, offload_stream)
return x
class Qwen3_4B(BaseLlama, torch.nn.Module):
def __init__(self, config_dict, dtype, device, operations):

View File

@ -48,6 +48,167 @@ class ExecutionResult(Enum):
class DuplicateNodeError(Exception):
pass
# ======================================================================================
# ADDED: Node grouping helpers for "input-type locality" execution ordering
# --------------------------------------------------------------------------------------
# We cluster ready-to-run nodes by a signature derived from:
# - Declared INPUT_TYPES (required/optional socket types)
# - Upstream linked RETURN_TYPES (when available from prompt links)
#
# This is a SCHEDULING optimization only:
# - It must not change correctness or dependency satisfaction.
# - It only reorders nodes that ExecutionList already deems ready/executable.
# - It is stable to avoid churn and to preserve deterministic behavior.
#
# IMPORTANT: ExecutionList is imported from comfy_execution.graph; we avoid invasive
# changes by using a small subclass + defensive introspection of its internal queues.
# ======================================================================================
def _safe_stringify_type(t):
try:
return str(t)
except Exception:
return repr(t)
def _node_input_signature_from_prompt(prompt: dict, node_id: str):
"""
Build a stable, hashable signature representing a node's *input requirements*.
Includes:
- Declared input socket types via INPUT_TYPES() (required + optional)
- Linked upstream output RETURN_TYPES, when input is a link
This signature is used ONLY for grouping/sorting ready nodes.
"""
node = prompt.get(node_id)
if node is None:
return ("<missing-node>", node_id)
class_type = node.get("class_type")
class_def = nodes.NODE_CLASS_MAPPINGS.get(class_type)
if class_def is None:
return ("<missing-class>", class_type, node_id)
sig = []
# Declared socket types (required/optional)
try:
input_types = class_def.INPUT_TYPES()
except Exception:
input_types = {}
for cat in ("required", "optional"):
cat_dict = input_types.get(cat, {})
if isinstance(cat_dict, dict):
# Sort keys for stability
for k in sorted(cat_dict.keys()):
v = cat_dict[k]
sig.append(("decl", cat, k, _safe_stringify_type(v)))
# Linked upstream return types (helps cluster by latent/model flows)
inputs = node.get("inputs", {}) or {}
if isinstance(inputs, dict):
for k in sorted(inputs.keys()):
v = inputs[k]
if is_link(v) and isinstance(v, (list, tuple)) and len(v) == 2:
src_id, out_idx = v[0], v[1]
src_node = prompt.get(src_id)
if src_node is None:
sig.append(("link", k, "<missing-src-node>"))
continue
src_class_type = src_node.get("class_type")
src_class_def = nodes.NODE_CLASS_MAPPINGS.get(src_class_type)
if src_class_def is None:
sig.append(("link", k, "<missing-src-class>", src_class_type))
continue
ret_types = getattr(src_class_def, "RETURN_TYPES", ())
try:
if isinstance(out_idx, int) and out_idx < len(ret_types):
sig.append(("link", k, _safe_stringify_type(ret_types[out_idx])))
else:
sig.append(("link", k, "<bad-out-idx>", _safe_stringify_type(out_idx)))
except Exception:
sig.append(("link", k, "<ret-type-error>"))
return tuple(sig)
def _try_group_sort_execution_list_ready_nodes(execution_list: ExecutionList, prompt: dict):
"""
Attempt to reorder the ExecutionList's *ready* nodes in-place, grouping by input signature.
This is intentionally defensive because ExecutionList is external; we only touch
well-known/observed internal attributes when they match expected shapes.
Supported patterns (best-effort):
- execution_list.nodes_to_execute : list[node_id, ...]
- execution_list._nodes_to_execute : list[node_id, ...] (fallback)
We DO NOT rewrite heaps/tuples with priority keys, because that risks breaking invariants.
If the internal structure is not a simple list of node_ids, we do nothing.
"""
# Candidate attribute names that (in some ComfyUI revisions) hold ready-to-run node IDs
candidates = ("nodes_to_execute", "_nodes_to_execute")
for attr in candidates:
if not hasattr(execution_list, attr):
continue
value = getattr(execution_list, attr)
# Only operate on a plain list of node ids (strings/ints)
if isinstance(value, list) and all(isinstance(x, (str, int)) for x in value):
# Stable grouping sort:
# primary: signature (to cluster similar input requirements)
# secondary: original order (stability)
# NOTE: include length of signature in key to reduce expensive stringification
indexed = list(enumerate(value))
indexed.sort(
key=lambda it: (
# signature key
_node_input_signature_from_prompt(prompt, str(it[1])),
# keep stable within same signature
it[0],
)
)
new_list = [node_id for _, node_id in indexed]
setattr(execution_list, attr, new_list)
return True
return False
class GroupedExecutionList(ExecutionList):
"""
ADDED: Thin wrapper around ExecutionList that reorders *ready* nodes before staging
to improve model/tensor locality (reduce VRAM/RAM chatter).
This does not change dependency logic; it only reorders nodes that are already ready.
"""
def _apply_group_sort_if_possible(self):
try:
# dynprompt.original_prompt is the canonical prompt graph dict
prompt = getattr(self, "dynprompt", None)
prompt_dict = None
if prompt is not None:
prompt_dict = getattr(prompt, "original_prompt", None)
if isinstance(prompt_dict, dict):
_try_group_sort_execution_list_ready_nodes(self, prompt_dict)
except Exception:
# Must never break execution
pass
# NOTE: stage_node_execution is awaited in the caller in this file, so we keep it async-compatible.
async def stage_node_execution(self):
# Group-sort the ready list *before* choosing next node
self._apply_group_sort_if_possible()
return await super().stage_node_execution()
def add_node(self, node_id):
# Keep original behavior, then regroup for future staging
super().add_node(node_id)
self._apply_group_sort_if_possible()
class IsChangedCache:
def __init__(self, prompt_id: str, dynprompt: DynamicPrompt, outputs_cache: BasicCache):
self.prompt_id = prompt_id
@ -721,7 +882,13 @@ class PromptExecutor:
pending_async_nodes = {} # TODO - Unify this with pending_subgraph_results
ui_node_outputs = {}
executed = set()
execution_list = ExecutionList(dynamic_prompt, self.caches.outputs)
# ==================================================================================
# CHANGED: Use GroupedExecutionList to group ready-to-run nodes by input signature.
# This reduces VRAM/RAM chatter when workflows reuse the same models/tensor types.
# ==================================================================================
execution_list = GroupedExecutionList(dynamic_prompt, self.caches.outputs)
current_outputs = self.caches.outputs.all_node_ids()
for node_id in list(execute_outputs):
execution_list.add_node(node_id)