Merge pull request #1 from pollockjj/trellis2-review-fixes

Trellis2 review fixes for batch, texture, and memory paths
This commit is contained in:
Yousef R. Gamaleldin 2026-04-24 01:26:33 +03:00 committed by GitHub
commit a8e729b42a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1362 additions and 172 deletions

View File

@ -782,8 +782,11 @@ class Trellis2(nn.Module):
embeds = kwargs.get("embeds")
if embeds is None:
raise ValueError("Trellis2.forward requires 'embeds' in kwargs")
is_1024 = self.img2shape.resolution == 1024
# img2shape.resolution is the latent-grid size, not the input pixel size:
# 32 -> 512px path, 64 -> 1024px path.
uses_1024_conditioning = self.img2shape.resolution == 64
coords = transformer_options.get("coords", None)
coord_counts = transformer_options.get("coord_counts")
mode = transformer_options.get("generation_mode", "structure_generation")
is_512_run = False
timestep = timestep.to(self.dtype)
@ -797,7 +800,7 @@ class Trellis2(nn.Module):
mode = "structure_generation"
not_struct_mode = False
if is_1024 and not_struct_mode and not is_512_run:
if uses_1024_conditioning and not_struct_mode and not is_512_run:
context = embeds
sigmas = transformer_options.get("sigmas")[0].item()
@ -809,15 +812,33 @@ class Trellis2(nn.Module):
cond = context
shape_rule = sigmas < self.guidance_interval[0] or sigmas > self.guidance_interval[1]
txt_rule = sigmas < self.guidance_interval_txt[0] or sigmas > self.guidance_interval_txt[1]
dense_out = None
cond_or_uncond = transformer_options.get("cond_or_uncond") or []
def cond_group_indices(batch_groups):
if len(cond_or_uncond) == batch_groups:
cond_groups = [i for i, marker in enumerate(cond_or_uncond) if marker == 0]
if len(cond_groups) > 0:
return cond_groups
return [batch_groups - 1]
if not_struct_mode:
orig_bsz = x.shape[0]
rule = txt_rule if mode == "texture_generation" else shape_rule
if rule and orig_bsz > 1:
x_eval = x[1].unsqueeze(0)
t_eval = timestep[1].unsqueeze(0) if timestep.shape[0] > 1 else timestep
c_eval = cond
logical_batch = coord_counts.shape[0] if coord_counts is not None else 1
if rule and orig_bsz > logical_batch:
batch_groups = orig_bsz // logical_batch
selected_groups = cond_group_indices(batch_groups)
x_groups = x.reshape(batch_groups, logical_batch, *x.shape[1:])
x_eval = x_groups[selected_groups].reshape(-1, *x.shape[1:])
if timestep.shape[0] > 1:
t_groups = timestep.reshape(batch_groups, logical_batch, *timestep.shape[1:])
t_eval = t_groups[selected_groups].reshape(-1, *timestep.shape[1:])
else:
t_eval = timestep
c_groups = context.reshape(batch_groups, logical_batch, *context.shape[1:])
c_eval = c_groups[selected_groups].reshape(-1, *context.shape[1:])
else:
x_eval = x
t_eval = timestep
@ -826,23 +847,107 @@ class Trellis2(nn.Module):
B, N, C = x_eval.shape
if mode in ["shape_generation", "texture_generation"]:
feats_flat = x_eval.reshape(-1, C)
if coord_counts is not None:
logical_batch = coord_counts.shape[0]
if B % logical_batch != 0:
raise ValueError(
f"Trellis2 coord_counts batch {logical_batch} doesn't divide latent batch {B}"
)
if int(coord_counts.sum().item()) != coords.shape[0]:
raise ValueError(
f"Trellis2 coord_counts total {int(coord_counts.sum().item())} does not match coords rows {coords.shape[0]}"
)
batch_ids = coords[:, 0].to(torch.int64)
order = torch.argsort(batch_ids, stable=True)
sorted_coords = coords.index_select(0, order)
sorted_batch_ids = batch_ids.index_select(0, order)
offsets = coord_counts.cumsum(0) - coord_counts
coords_by_batch = []
for i in range(logical_batch):
count = int(coord_counts[i].item())
start = int(offsets[i].item())
coords_i = sorted_coords[start:start + count]
ids_i = sorted_batch_ids[start:start + count]
if coords_i.shape[0] != count or not torch.all(ids_i == i):
raise ValueError(
f"Trellis2 coords rows for batch {i} expected {count}, got {coords_i.shape[0]}"
)
coords_by_batch.append(coords_i)
repeat_factor = B // logical_batch
sparse_outs = []
active_coord_counts = []
for rep in range(repeat_factor):
for i in range(logical_batch):
out_index = rep * logical_batch + i
count = int(coord_counts[i].item())
if count > N:
raise ValueError(
f"Trellis2 coord count {count} exceeds latent token dimension {N} for batch {i}"
)
coords_i = coords_by_batch[i].clone()
coords_i[:, 0] = 0
feats_i = x_eval[out_index, :count].clone()
x_st_i = SparseTensor(feats=feats_i, coords=coords_i.to(torch.int32))
t_i = t_eval[out_index].unsqueeze(0).clone() if t_eval.shape[0] > 1 else t_eval
c_i = c_eval[out_index].unsqueeze(0).clone() if c_eval.shape[0] > 1 else c_eval
# inflate coords [N, 4] -> [B*N, 4]
coords_list = []
for i in range(B):
c = coords.clone()
c[:, 0] = i
coords_list.append(c)
if mode == "shape_generation":
if is_512_run:
sparse_out = self.img2shape_512(x_st_i, t_i, c_i)
else:
sparse_out = self.img2shape(x_st_i, t_i, c_i)
else:
slat = transformer_options.get("shape_slat")
if slat is None:
raise ValueError("shape_slat can't be None")
if slat.ndim == 3:
if slat.shape[0] != logical_batch:
raise ValueError(
f"shape_slat batch {slat.shape[0]} doesn't match coord_counts batch {logical_batch}"
)
if slat.shape[1] < count:
raise ValueError(
f"shape_slat tokens {slat.shape[1]} can't cover coord count {count} for batch {i}"
)
slat_feats = slat[i, :count].to(x_st_i.device)
else:
slat_feats = slat[:count].to(x_st_i.device)
x_st_i = x_st_i.replace(feats=torch.cat([x_st_i.feats, slat_feats], dim=-1))
sparse_out = self.shape2txt(x_st_i, t_i, c_i)
batched_coords = torch.cat(coords_list, dim=0)
sparse_outs.append(sparse_out.feats)
active_coord_counts.append(count)
out_channels = sparse_outs[0].shape[-1]
padded = sparse_outs[0].new_zeros((B, N, out_channels))
for out_index, (count, feats_i) in enumerate(zip(active_coord_counts, sparse_outs)):
padded[out_index, :count] = feats_i
dense_out = padded.transpose(1, 2).unsqueeze(-1)
elif coords.shape[0] == N:
feats_flat = x_eval.reshape(-1, C)
coords_list = []
for i in range(B):
c = coords.clone()
c[:, 0] = i
coords_list.append(c)
batched_coords = torch.cat(coords_list, dim=0)
elif coords.shape[0] == B * N:
feats_flat = x_eval.reshape(-1, C)
batched_coords = coords
else:
raise ValueError(
f"Trellis2 expected coords rows {N} or {B * N}, got {coords.shape[0]}"
)
else:
batched_coords = coords
feats_flat = x_eval
x_st = SparseTensor(feats=feats_flat, coords=batched_coords.to(torch.int32))
if dense_out is None:
x_st = SparseTensor(feats=feats_flat, coords=batched_coords.to(torch.int32))
if mode == "shape_generation":
if dense_out is not None:
out = dense_out
elif mode == "shape_generation":
if is_512_run:
out = self.img2shape_512(x_st, t_eval, c_eval)
else:
@ -854,22 +959,96 @@ class Trellis2(nn.Module):
if slat is None:
raise ValueError("shape_slat can't be None")
base_slat_feats = slat[:N]
slat_feats_batched = base_slat_feats.repeat(B, 1).to(x_st.device)
if slat.ndim == 3:
if coord_counts is not None:
logical_batch = coord_counts.shape[0]
if slat.shape[0] != logical_batch:
raise ValueError(
f"shape_slat batch {slat.shape[0]} doesn't match coord_counts batch {logical_batch}"
)
if B % logical_batch != 0:
raise ValueError(
f"Trellis2 coord_counts batch {logical_batch} doesn't divide latent batch {B}"
)
repeat_factor = B // logical_batch
slat_list = []
for _ in range(repeat_factor):
for i in range(logical_batch):
count = int(coord_counts[i].item())
if slat.shape[1] < count:
raise ValueError(
f"shape_slat tokens {slat.shape[1]} can't cover coord count {count} for batch {i}"
)
slat_list.append(slat[i, :count])
slat_feats_batched = torch.cat(slat_list, dim=0).to(x_st.device)
else:
if slat.shape[0] != B:
raise ValueError(f"shape_slat batch {slat.shape[0]} doesn't match latent batch {B}")
if slat.shape[1] != N:
raise ValueError(f"shape_slat tokens {slat.shape[1]} doesn't match latent tokens {N}")
slat_feats_batched = slat.reshape(B * N, -1).to(x_st.device)
else:
base_slat_feats = slat[:N]
slat_feats_batched = base_slat_feats.repeat(B, 1).to(x_st.device)
x_st = x_st.replace(feats=torch.cat([x_st.feats, slat_feats_batched], dim=-1))
out = self.shape2txt(x_st, t_eval, c_eval)
else: # structure
orig_bsz = x.shape[0]
if shape_rule:
x = x[1].unsqueeze(0)
timestep = timestep[1].unsqueeze(0)
out = self.structure_model(x, timestep, context if not shape_rule else cond)
if shape_rule:
out = out.repeat(orig_bsz, 1, 1, 1, 1)
batch_groups = len(cond_or_uncond) if len(cond_or_uncond) > 0 and orig_bsz % len(cond_or_uncond) == 0 else 1
logical_batch = orig_bsz // batch_groups
if logical_batch > 1:
x_groups = x.reshape(batch_groups, logical_batch, *x.shape[1:])
if timestep.shape[0] > 1:
t_groups = timestep.reshape(batch_groups, logical_batch, *timestep.shape[1:])
else:
t_groups = timestep
c_groups = context.reshape(batch_groups, logical_batch, *context.shape[1:])
if shape_rule and batch_groups > 1:
selected_group_indices = cond_group_indices(batch_groups)
else:
selected_group_indices = list(range(batch_groups))
out_groups = []
for sample_index in range(logical_batch):
if shape_rule and batch_groups > 1:
x_i = x_groups[selected_group_indices, sample_index]
if timestep.shape[0] > 1:
t_i = t_groups[selected_group_indices, sample_index]
else:
t_i = timestep
c_i = c_groups[selected_group_indices, sample_index]
else:
x_i = x_groups[selected_group_indices, sample_index]
if timestep.shape[0] > 1:
t_i = t_groups[selected_group_indices, sample_index]
else:
t_i = timestep
c_i = c_groups[selected_group_indices, sample_index]
out_groups.append(self.structure_model(x_i, t_i, c_i))
out = out_groups[0].new_zeros((orig_bsz, *out_groups[0].shape[1:]))
for sample_index, out_sample in enumerate(out_groups):
if shape_rule and batch_groups > 1:
repeated = out_sample[0]
for group_index in range(batch_groups):
out[group_index * logical_batch + sample_index] = repeated
else:
for local_group_index, group_index in enumerate(selected_group_indices):
out[group_index * logical_batch + sample_index] = out_sample[local_group_index]
else:
if shape_rule and orig_bsz > 1:
half = orig_bsz // 2
x = x[half:]
timestep = timestep[half:] if timestep.shape[0] > 1 else timestep
out = self.structure_model(x, timestep, cond if shape_rule and orig_bsz > 1 else context)
if shape_rule and orig_bsz > 1:
out = out.repeat(2, 1, 1, 1, 1)
if not_struct_mode:
out = out.feats
out = out.view(B, N, -1).transpose(1, 2).unsqueeze(-1)
if rule and orig_bsz > 1:
out = out.repeat(orig_bsz, 1, 1, 1)
if dense_out is None:
out = out.feats
out = out.view(B, N, -1).transpose(1, 2).unsqueeze(-1)
if rule and orig_bsz > B:
out = out.repeat(orig_bsz // B, 1, 1, 1)
return out

View File

@ -7,6 +7,50 @@ import logging
import comfy.nested_tensor
def prepare_noise_inner(latent_image, generator, noise_inds=None):
coord_counts = getattr(latent_image, "trellis_coord_counts", None)
if coord_counts is not None:
if coord_counts.ndim != 1:
raise ValueError(f"Trellis2 coord_counts must be 1D, got shape {tuple(coord_counts.shape)}")
if coord_counts.shape[0] != latent_image.size(0):
raise ValueError(
f"Trellis2 coord_counts length {coord_counts.shape[0]} does not match latent batch {latent_image.size(0)}"
)
if (coord_counts < 0).any() or (coord_counts > latent_image.size(2)).any():
raise ValueError(
f"Trellis2 coord_counts must be within [0, {latent_image.size(2)}], got {coord_counts.tolist()}"
)
noise = torch.zeros(latent_image.size(), dtype=torch.float32, layout=latent_image.layout, device="cpu")
if noise_inds is None:
noise_inds = np.arange(latent_image.size(0), dtype=np.int64)
else:
noise_inds = np.asarray(noise_inds, dtype=np.int64)
if noise_inds.shape[0] != latent_image.size(0):
raise ValueError(
f"Trellis2 noise_inds length {noise_inds.shape[0]} does not match latent batch {latent_image.size(0)}"
)
base_seed = int(generator.initial_seed())
unique_inds = np.unique(noise_inds)
sample_noises = {}
for noise_index in unique_inds.tolist():
rows = np.flatnonzero(noise_inds == noise_index)
max_count = max(int(coord_counts[row].item()) for row in rows.tolist())
local_generator = torch.Generator(device="cpu")
local_generator.manual_seed(base_seed + int(noise_index))
sample_noises[int(noise_index)] = torch.randn(
[1, latent_image.size(1), max_count, latent_image.size(3)],
dtype=torch.float32,
layout=latent_image.layout,
generator=local_generator,
device="cpu",
)
for batch_index, noise_index in enumerate(noise_inds.tolist()):
count = int(coord_counts[batch_index].item())
sample_noise = sample_noises[int(noise_index)]
noise[batch_index:batch_index + 1, :, :count, :] = sample_noise[:, :, :count, :]
return noise.to(dtype=latent_image.dtype)
if noise_inds is None:
return torch.randn(latent_image.size(), dtype=torch.float32, layout=latent_image.layout, generator=generator, device="cpu").to(dtype=latent_image.dtype)

View File

@ -443,7 +443,9 @@ class VoxelToMeshBasic(IO.ComfyNode):
vertices.append(v)
faces.append(f)
return IO.NodeOutput(Types.MESH(torch.stack(vertices), torch.stack(faces)))
if vertices and all(v.shape == vertices[0].shape for v in vertices) and all(f.shape == faces[0].shape for f in faces):
return IO.NodeOutput(Types.MESH(torch.stack(vertices), torch.stack(faces)))
return IO.NodeOutput(pack_variable_mesh_batch(vertices, faces))
decode = execute # TODO: remove
@ -479,7 +481,9 @@ class VoxelToMesh(IO.ComfyNode):
vertices.append(v)
faces.append(f)
return IO.NodeOutput(Types.MESH(torch.stack(vertices), torch.stack(faces)))
if vertices and all(v.shape == vertices[0].shape for v in vertices) and all(f.shape == faces[0].shape for f in faces):
return IO.NodeOutput(Types.MESH(torch.stack(vertices), torch.stack(faces)))
return IO.NodeOutput(pack_variable_mesh_batch(vertices, faces))
decode = execute # TODO: remove
@ -628,6 +632,56 @@ def save_glb(vertices, faces, filepath, metadata=None, colors=None):
return filepath
def pack_variable_mesh_batch(vertices, faces, colors=None):
batch_size = len(vertices)
max_vertices = max(v.shape[0] for v in vertices)
max_faces = max(f.shape[0] for f in faces)
packed_vertices = vertices[0].new_zeros((batch_size, max_vertices, vertices[0].shape[1]))
packed_faces = faces[0].new_zeros((batch_size, max_faces, faces[0].shape[1]))
vertex_counts = torch.tensor([v.shape[0] for v in vertices], device=vertices[0].device, dtype=torch.int64)
face_counts = torch.tensor([f.shape[0] for f in faces], device=faces[0].device, dtype=torch.int64)
for i, (v, f) in enumerate(zip(vertices, faces)):
packed_vertices[i, :v.shape[0]] = v
packed_faces[i, :f.shape[0]] = f
mesh = Types.MESH(packed_vertices, packed_faces)
mesh.vertex_counts = vertex_counts
mesh.face_counts = face_counts
if colors is not None:
max_colors = max(c.shape[0] for c in colors)
packed_colors = colors[0].new_zeros((batch_size, max_colors, colors[0].shape[1]))
color_counts = torch.tensor([c.shape[0] for c in colors], device=colors[0].device, dtype=torch.int64)
for i, c in enumerate(colors):
packed_colors[i, :c.shape[0]] = c
mesh.colors = packed_colors
mesh.color_counts = color_counts
return mesh
def get_mesh_batch_item(mesh, index):
if hasattr(mesh, "vertex_counts"):
vertex_count = int(mesh.vertex_counts[index].item())
face_count = int(mesh.face_counts[index].item())
vertices = mesh.vertices[index, :vertex_count]
faces = mesh.faces[index, :face_count]
colors = None
if hasattr(mesh, "colors") and mesh.colors is not None:
if hasattr(mesh, "color_counts"):
color_count = int(mesh.color_counts[index].item())
colors = mesh.colors[index, :color_count]
else:
colors = mesh.colors[index, :vertex_count]
return vertices, faces, colors
colors = None
if hasattr(mesh, "colors") and mesh.colors is not None:
colors = mesh.colors[index]
return mesh.vertices[index], mesh.faces[index], colors
class SaveGLB(IO.ComfyNode):
@classmethod
def define_schema(cls):
@ -682,10 +736,11 @@ class SaveGLB(IO.ComfyNode):
})
else:
# Handle Mesh input - save vertices and faces as GLB
for i in range(mesh.vertices.shape[0]):
bsz = mesh.vertices.shape[0]
for i in range(bsz):
f = f"{filename}_{counter:05}_.glb"
v_colors = mesh.colors[i] if hasattr(mesh, "colors") and mesh.colors is not None else None
save_glb(mesh.vertices[i], mesh.faces[i], os.path.join(full_output_folder, f), metadata, v_colors)
vertices, faces, v_colors = get_mesh_batch_item(mesh, i)
save_glb(vertices, faces, os.path.join(full_output_folder, f), metadata, v_colors)
results.append({
"filename": f,
"subfolder": subfolder,

View File

@ -8,6 +8,72 @@ import torch
import scipy
import copy
def prepare_trellis_vae_for_decode(vae, sample_shape):
memory_required = vae.memory_used_decode(sample_shape, vae.vae_dtype)
if len(sample_shape) == 5:
memory_required *= max(1, int(sample_shape[4]))
memory_required = max(1, int(memory_required))
device = comfy.model_management.get_torch_device()
comfy.model_management.load_models_gpu(
[vae.patcher],
memory_required=memory_required,
force_full_load=getattr(vae, "disable_offload", False),
)
free_memory = vae.patcher.get_free_memory(device)
batch_number = max(1, int(free_memory / memory_required))
return batch_number
def pack_variable_mesh_batch(vertices, faces, colors=None):
batch_size = len(vertices)
max_vertices = max(v.shape[0] for v in vertices)
max_faces = max(f.shape[0] for f in faces)
packed_vertices = vertices[0].new_zeros((batch_size, max_vertices, vertices[0].shape[1]))
packed_faces = faces[0].new_zeros((batch_size, max_faces, faces[0].shape[1]))
vertex_counts = torch.tensor([v.shape[0] for v in vertices], device=vertices[0].device, dtype=torch.int64)
face_counts = torch.tensor([f.shape[0] for f in faces], device=faces[0].device, dtype=torch.int64)
for i, (v, f) in enumerate(zip(vertices, faces)):
packed_vertices[i, :v.shape[0]] = v
packed_faces[i, :f.shape[0]] = f
mesh = Types.MESH(packed_vertices, packed_faces)
mesh.vertex_counts = vertex_counts
mesh.face_counts = face_counts
if colors is not None:
max_colors = max(c.shape[0] for c in colors)
packed_colors = colors[0].new_zeros((batch_size, max_colors, colors[0].shape[1]))
color_counts = torch.tensor([c.shape[0] for c in colors], device=colors[0].device, dtype=torch.int64)
for i, c in enumerate(colors):
packed_colors[i, :c.shape[0]] = c
mesh.colors = packed_colors
mesh.color_counts = color_counts
return mesh
def get_mesh_batch_item(mesh, index):
if hasattr(mesh, "vertex_counts"):
vertex_count = int(mesh.vertex_counts[index].item())
face_count = int(mesh.face_counts[index].item())
vertices = mesh.vertices[index, :vertex_count]
faces = mesh.faces[index, :face_count]
colors = None
if hasattr(mesh, "colors") and mesh.colors is not None:
if hasattr(mesh, "color_counts"):
color_count = int(mesh.color_counts[index].item())
colors = mesh.colors[index, :color_count]
else:
colors = mesh.colors[index, :vertex_count]
return vertices, faces, colors
colors = None
if hasattr(mesh, "colors") and mesh.colors is not None:
colors = mesh.colors[index]
return mesh.vertices[index], mesh.faces[index], colors
shape_slat_normalization = {
"mean": torch.tensor([
0.781296, 0.018091, -0.495192, -0.558457, 1.060530, 0.093252, 1.518149, -0.933218,
@ -45,6 +111,114 @@ def shape_norm(shape_latent, coords):
samples = samples * std + mean
return samples
def infer_batched_coord_layout(coords):
if coords.ndim != 2 or coords.shape[1] != 4:
raise ValueError(f"Expected Trellis2 coords with shape [N, 4], got {tuple(coords.shape)}")
if coords.shape[0] == 0:
raise ValueError("Trellis2 coords can't be empty")
batch_ids = coords[:, 0].to(torch.int64)
if (batch_ids < 0).any():
raise ValueError(f"Trellis2 batch ids must be non-negative, got {batch_ids.unique(sorted=True).tolist()}")
batch_size = int(batch_ids.max().item()) + 1
counts = torch.bincount(batch_ids, minlength=batch_size)
if (counts == 0).any():
raise ValueError(f"Non-contiguous Trellis2 batch ids in coords: {batch_ids.unique(sorted=True).tolist()}")
max_tokens = int(counts.max().item())
return batch_size, counts, max_tokens
def split_batched_coords(coords, coord_counts):
if coord_counts.ndim != 1:
raise ValueError(f"Trellis2 coord_counts must be 1D, got shape {tuple(coord_counts.shape)}")
if (coord_counts < 0).any():
raise ValueError(f"Trellis2 coord_counts must be non-negative, got {coord_counts.tolist()}")
if int(coord_counts.sum().item()) != coords.shape[0]:
raise ValueError(
f"Trellis2 coord_counts total {int(coord_counts.sum().item())} does not match coords rows {coords.shape[0]}"
)
batch_ids = coords[:, 0].to(torch.int64)
order = torch.argsort(batch_ids, stable=True)
sorted_coords = coords.index_select(0, order)
sorted_batch_ids = batch_ids.index_select(0, order)
offsets = coord_counts.cumsum(0) - coord_counts
items = []
for i in range(coord_counts.shape[0]):
count = int(coord_counts[i].item())
start = int(offsets[i].item())
coords_i = sorted_coords[start:start + count]
ids_i = sorted_batch_ids[start:start + count]
if coords_i.shape[0] != count or not torch.all(ids_i == i):
raise ValueError(f"Trellis2 coords rows for batch {i} expected {count}, got {coords_i.shape[0]}")
items.append(coords_i)
return items
def normalize_batch_index(batch_index):
if batch_index is None:
return None
if isinstance(batch_index, int):
return [int(batch_index)]
return list(batch_index)
def resolve_sample_indices(batch_index, batch_size):
sample_indices = normalize_batch_index(batch_index)
if sample_indices is None:
return list(range(batch_size))
if len(sample_indices) != batch_size:
raise ValueError(
f"Trellis2 batch_index length {len(sample_indices)} does not match batch size {batch_size}"
)
return sample_indices
def resolve_singleton_sample_index(batch_index):
sample_indices = normalize_batch_index(batch_index)
if sample_indices is None:
return 0
if len(sample_indices) != 1:
raise ValueError(
f"Trellis2 batch_index must be an int or single-element iterable for singleton coords, got {sample_indices}"
)
return int(sample_indices[0])
def flatten_batched_sparse_latent(samples, coords, coord_counts):
samples = samples.squeeze(-1).transpose(1, 2)
if coord_counts is None:
return samples.reshape(-1, samples.shape[-1]), coords
coords_items = split_batched_coords(coords, coord_counts)
feat_list = []
coord_list = []
for i, coords_i in enumerate(coords_items):
count = int(coord_counts[i].item())
feat_list.append(samples[i, :count])
coord_list.append(coords_i)
return torch.cat(feat_list, dim=0), torch.cat(coord_list, dim=0)
def split_batched_sparse_latent(samples, coords, coord_counts):
samples = samples.squeeze(-1).transpose(1, 2)
if coord_counts is None:
return [(samples.reshape(-1, samples.shape[-1]), coords)]
coords_items = split_batched_coords(coords, coord_counts)
items = []
for i, coords_i in enumerate(coords_items):
count = int(coord_counts[i].item())
items.append((samples[i, :count], coords_i))
return items
def paint_mesh_with_voxels(mesh, voxel_coords, voxel_colors, resolution):
"""
Generic function to paint a mesh using nearest-neighbor colors from a sparse voxel field.
@ -58,15 +232,15 @@ def paint_mesh_with_voxels(mesh, voxel_coords, voxel_colors, resolution):
# map voxels
voxel_pos = voxel_coords.to(device).float() * voxel_size + origin
verts = mesh.vertices.to(device).squeeze(0)
voxel_colors = voxel_colors.to(device)
voxel_colors = voxel_colors.cpu()
voxel_pos_np = voxel_pos.numpy()
verts_np = verts.numpy()
voxel_pos_np = voxel_pos.cpu().numpy()
verts_np = verts.cpu().numpy()
tree = scipy.spatial.cKDTree(voxel_pos_np)
# nearest neighbour k=1
_, nearest_idx_np = tree.query(verts_np, k=1, workers=-1)
_, nearest_idx_np = tree.query(verts_np, k=1, workers=1)
nearest_idx = torch.from_numpy(nearest_idx_np).long()
v_colors = voxel_colors[nearest_idx]
@ -79,11 +253,18 @@ def paint_mesh_with_voxels(mesh, voxel_coords, voxel_colors, resolution):
final_colors = linear_colors.unsqueeze(0)
out_mesh = copy.deepcopy(mesh)
out_mesh = copy.copy(mesh)
out_mesh.colors = final_colors
return out_mesh
def paint_mesh_default_colors(mesh):
out_mesh = copy.copy(mesh)
vertex_count = mesh.vertices.shape[1]
out_mesh.colors = mesh.vertices.new_zeros((1, vertex_count, 3))
return out_mesh
class VaeDecodeShapeTrellis(IO.ComfyNode):
@classmethod
def define_schema(cls):
@ -105,21 +286,43 @@ class VaeDecodeShapeTrellis(IO.ComfyNode):
def execute(cls, samples, vae, resolution):
resolution = int(resolution)
patcher = vae.patcher
sample_tensor = samples["samples"]
device = comfy.model_management.get_torch_device()
comfy.model_management.load_model_gpu(patcher)
vae = vae.first_stage_model
coords = samples["coords"]
prepare_trellis_vae_for_decode(vae, sample_tensor.shape)
trellis_vae = vae.first_stage_model
coord_counts = samples.get("coord_counts")
samples = samples["samples"]
samples = samples.squeeze(-1).transpose(1, 2).reshape(-1, 32).to(device)
samples = shape_norm(samples, coords)
if coord_counts is None:
samples, coords = flatten_batched_sparse_latent(samples, coords, coord_counts)
samples = shape_norm(samples.to(device), coords.to(device))
mesh, subs = trellis_vae.decode_shape_slat(samples, resolution)
else:
split_items = split_batched_sparse_latent(samples, coords, coord_counts)
mesh = []
subs_per_sample = []
for feats_i, coords_i in split_items:
coords_i = coords_i.to(device).clone()
coords_i[:, 0] = 0
sample_i = shape_norm(feats_i.to(device), coords_i)
mesh_i, subs_i = trellis_vae.decode_shape_slat(sample_i, resolution)
mesh.append(mesh_i[0])
subs_per_sample.append(subs_i)
mesh, subs = vae.decode_shape_slat(samples, resolution)
faces = torch.stack([m.faces for m in mesh])
verts = torch.stack([m.vertices for m in mesh])
mesh = Types.MESH(vertices=verts, faces=faces)
subs = []
for stage_index in range(len(subs_per_sample[0])):
stage_tensors = [sample_subs[stage_index] for sample_subs in subs_per_sample]
feats_list = [stage_tensor.feats for stage_tensor in stage_tensors]
coords_list = [stage_tensor.coords for stage_tensor in stage_tensors]
subs.append(SparseTensor.from_tensor_list(feats_list, coords_list))
face_list = [m.faces for m in mesh]
vert_list = [m.vertices for m in mesh]
if all(v.shape == vert_list[0].shape for v in vert_list) and all(f.shape == face_list[0].shape for f in face_list):
mesh = Types.MESH(vertices=torch.stack(vert_list), faces=torch.stack(face_list))
else:
mesh = pack_variable_mesh_batch(vert_list, face_list)
return IO.NodeOutput(mesh, subs)
class VaeDecodeTextureTrellis(IO.ComfyNode):
@ -133,6 +336,7 @@ class VaeDecodeTextureTrellis(IO.ComfyNode):
IO.Latent.Input("samples"),
IO.Vae.Input("vae"),
IO.AnyType.Input("shape_subs"),
IO.Combo.Input("resolution", options=["512", "1024"], default="1024")
],
outputs=[
IO.Mesh.Output("mesh"),
@ -140,28 +344,51 @@ class VaeDecodeTextureTrellis(IO.ComfyNode):
)
@classmethod
def execute(cls, shape_mesh, samples, vae, shape_subs):
def execute(cls, shape_mesh, samples, vae, shape_subs, resolution):
resolution = 1024
patcher = vae.patcher
sample_tensor = samples["samples"]
resolution = int(resolution)
device = comfy.model_management.get_torch_device()
comfy.model_management.load_model_gpu(patcher)
vae = vae.first_stage_model
coords = samples["coords"]
prepare_trellis_vae_for_decode(vae, sample_tensor.shape)
trellis_vae = vae.first_stage_model
coord_counts = samples.get("coord_counts")
samples = samples["samples"]
samples = samples.squeeze(-1).transpose(1, 2).reshape(-1, 32).to(device)
samples, coords = flatten_batched_sparse_latent(samples, coords, coord_counts)
samples = samples.to(device)
std = tex_slat_normalization["std"].to(samples)
mean = tex_slat_normalization["mean"].to(samples)
samples = SparseTensor(feats = samples, coords=coords)
samples = SparseTensor(feats = samples, coords=coords.to(device))
samples = samples * std + mean
voxel = vae.decode_tex_slat(samples, shape_subs)
voxel = trellis_vae.decode_tex_slat(samples, shape_subs)
color_feats = voxel.feats[:, :3]
voxel_coords = voxel.coords[:, 1:]
voxel_batch_idx = voxel.coords[:, 0]
out_mesh = paint_mesh_with_voxels(shape_mesh, voxel_coords, color_feats, resolution=resolution)
mesh_batch_size = shape_mesh.vertices.shape[0]
if mesh_batch_size > 1:
out_verts, out_faces, out_colors = [], [], []
for i in range(mesh_batch_size):
sel = voxel_batch_idx == i
item_coords = voxel_coords[sel]
item_colors = color_feats[sel]
item_vertices, item_faces, _ = get_mesh_batch_item(shape_mesh, i)
item_mesh = Types.MESH(vertices=item_vertices.unsqueeze(0), faces=item_faces.unsqueeze(0))
if item_coords.shape[0] == 0:
painted = paint_mesh_default_colors(item_mesh)
else:
painted = paint_mesh_with_voxels(item_mesh, item_coords, item_colors, resolution=resolution)
out_verts.append(painted.vertices.squeeze(0))
out_faces.append(painted.faces.squeeze(0))
out_colors.append(painted.colors.squeeze(0))
out_mesh = pack_variable_mesh_batch(out_verts, out_faces, out_colors)
else:
if voxel_coords.shape[0] == 0:
out_mesh = paint_mesh_default_colors(shape_mesh)
else:
out_mesh = paint_mesh_with_voxels(shape_mesh, voxel_coords, color_feats, resolution=resolution)
return IO.NodeOutput(out_mesh)
class VaeDecodeStructureTrellis2(IO.ComfyNode):
@ -183,21 +410,24 @@ class VaeDecodeStructureTrellis2(IO.ComfyNode):
@classmethod
def execute(cls, samples, vae, resolution):
resolution = int(resolution)
vae = vae.first_stage_model
decoder = vae.struct_dec
sample_tensor = samples["samples"]
batch_number = prepare_trellis_vae_for_decode(vae, sample_tensor.shape)
decoder = vae.first_stage_model.struct_dec
load_device = comfy.model_management.get_torch_device()
offload_device = comfy.model_management.vae_offload_device()
decoder = decoder.to(load_device)
samples = samples["samples"]
samples = samples.to(load_device)
decoded = decoder(samples)>0
decoder.to(offload_device)
batch_index = normalize_batch_index(samples.get("batch_index"))
decoded_batches = []
for start in range(0, sample_tensor.shape[0], batch_number):
sample_chunk = sample_tensor[start:start + batch_number].to(load_device)
decoded_batches.append(decoder(sample_chunk) > 0)
decoded = torch.cat(decoded_batches, dim=0)
current_res = decoded.shape[2]
if current_res != resolution:
ratio = current_res // resolution
decoded = torch.nn.functional.max_pool3d(decoded.float(), ratio, ratio, 0) > 0.5
out = Types.VOXEL(decoded.squeeze(1).float())
if batch_index is not None:
out.batch_index = normalize_batch_index(batch_index)
return IO.NodeOutput(out)
class Trellis2UpsampleCascade(IO.ComfyNode):
@ -220,34 +450,95 @@ class Trellis2UpsampleCascade(IO.ComfyNode):
@classmethod
def execute(cls, shape_latent_512, vae, target_resolution, max_tokens):
device = comfy.model_management.get_torch_device()
comfy.model_management.load_model_gpu(vae.patcher)
feats = shape_latent_512["samples"].squeeze(-1).transpose(1, 2).reshape(-1, 32).to(device)
coords_512 = shape_latent_512["coords"].to(device)
slat = shape_norm(feats, coords_512)
prepare_trellis_vae_for_decode(vae, shape_latent_512["samples"].shape)
coord_counts = shape_latent_512.get("coord_counts")
batch_index = normalize_batch_index(shape_latent_512.get("batch_index"))
decoder = vae.first_stage_model.shape_dec
slat.feats = slat.feats.to(next(decoder.parameters()).dtype)
hr_coords = decoder.upsample(slat, upsample_times=4)
lr_resolution = 512
hr_resolution = int(target_resolution)
target_resolution = int(target_resolution)
if coord_counts is None:
feats, coords_512 = flatten_batched_sparse_latent(
shape_latent_512["samples"],
shape_latent_512["coords"],
coord_counts,
)
feats = feats.to(device)
coords_512 = coords_512.to(device)
slat = shape_norm(feats, coords_512)
slat.feats = slat.feats.to(next(decoder.parameters()).dtype)
hr_coords = decoder.upsample(slat, upsample_times=4)
hr_resolution = target_resolution
while True:
quant_coords = torch.cat([
hr_coords[:, :1],
((hr_coords[:, 1:] + 0.5) / lr_resolution * (hr_resolution // 16)).int(),
], dim=1)
final_coords = quant_coords.unique(dim=0)
num_tokens = final_coords.shape[0]
if num_tokens < max_tokens or hr_resolution <= 1024:
break
hr_resolution -= 128
return IO.NodeOutput(final_coords,)
items = split_batched_sparse_latent(
shape_latent_512["samples"],
shape_latent_512["coords"],
coord_counts,
)
decoder_dtype = next(decoder.parameters()).dtype
sample_hr_coords = []
for feats_i, coords_i in items:
feats_i = feats_i.to(device)
coords_i = coords_i.to(device).clone()
coords_i[:, 0] = 0
slat_i = shape_norm(feats_i, coords_i)
slat_i.feats = slat_i.feats.to(decoder_dtype)
sample_hr_coords.append(decoder.upsample(slat_i, upsample_times=4))
hr_resolution = target_resolution
while True:
quant_coords = torch.cat([
hr_coords[:, :1],
((hr_coords[:, 1:] + 0.5) / lr_resolution * (hr_resolution // 16)).int(),
], dim=1)
final_coords = quant_coords.unique(dim=0)
num_tokens = final_coords.shape[0]
if num_tokens < max_tokens or hr_resolution <= 1024:
exceeds_limit = False
for hr_coords_i in sample_hr_coords:
quant_coords_i = torch.cat([
hr_coords_i[:, :1],
((hr_coords_i[:, 1:] + 0.5) / lr_resolution * (hr_resolution // 16)).int(),
], dim=1)
if quant_coords_i.unique(dim=0).shape[0] >= max_tokens:
exceeds_limit = True
break
if not exceeds_limit or hr_resolution <= 1024:
break
hr_resolution -= 128
return IO.NodeOutput(final_coords,)
final_coords_list = []
output_coord_counts = []
for sample_offset, hr_coords_i in enumerate(sample_hr_coords):
quant_coords_i = torch.cat([
hr_coords_i[:, :1],
((hr_coords_i[:, 1:] + 0.5) / lr_resolution * (hr_resolution // 16)).int(),
], dim=1)
final_coords_i = quant_coords_i.unique(dim=0)
final_coords_i = final_coords_i.clone()
final_coords_i[:, 0] = sample_offset
final_coords_list.append(final_coords_i)
output_coord_counts.append(int(final_coords_i.shape[0]))
normalized_batch_index = normalize_batch_index(batch_index)
output = {
"coords": torch.cat(final_coords_list, dim=0),
"coord_counts": torch.tensor(output_coord_counts, dtype=torch.int64),
"resolutions": torch.full((len(final_coords_list),), int(hr_resolution), dtype=torch.int64),
}
if normalized_batch_index is not None:
output["batch_index"] = normalized_batch_index
return IO.NodeOutput(output,)
dino_mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1)
dino_std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1)
@ -256,6 +547,8 @@ def run_conditioning(model, cropped_img_tensor, include_1024=True):
model_internal = model.model
device = comfy.model_management.intermediate_device()
torch_device = comfy.model_management.get_torch_device()
had_image_size = hasattr(model_internal, "image_size")
original_image_size = getattr(model_internal, "image_size", None)
def prepare_tensor(pil_img, size):
resized_pil = pil_img.resize((size, size), Image.Resampling.LANCZOS)
@ -263,15 +556,21 @@ def run_conditioning(model, cropped_img_tensor, include_1024=True):
img_t = torch.from_numpy(img_np).permute(2, 0, 1).unsqueeze(0).to(torch_device)
return (img_t - dino_mean.to(torch_device)) / dino_std.to(torch_device)
model_internal.image_size = 512
input_512 = prepare_tensor(cropped_img_tensor, 512)
cond_512 = model_internal(input_512, skip_norm_elementwise=True)[0]
cond_1024 = None
if include_1024:
model_internal.image_size = 1024
input_1024 = prepare_tensor(cropped_img_tensor, 1024)
cond_1024 = model_internal(input_1024, skip_norm_elementwise=True)[0]
try:
model_internal.image_size = 512
input_512 = prepare_tensor(cropped_img_tensor, 512)
cond_512 = model_internal(input_512, skip_norm_elementwise=True)[0]
if include_1024:
model_internal.image_size = 1024
input_1024 = prepare_tensor(cropped_img_tensor, 1024)
cond_1024 = model_internal(input_1024, skip_norm_elementwise=True)[0]
finally:
if not had_image_size:
delattr(model_internal, "image_size")
else:
model_internal.image_size = original_image_size
conditioning = {
'cond_512': cond_512.to(device),
@ -302,69 +601,87 @@ class Trellis2Conditioning(IO.ComfyNode):
@classmethod
def execute(cls, clip_vision_model, image, mask, background_color) -> IO.NodeOutput:
# Normalize to batched form so per-image conditioning loop below is uniform.
if image.ndim == 3:
image = image.unsqueeze(0)
if mask.ndim == 2:
mask = mask.unsqueeze(0)
batch_size = image.shape[0]
if mask.shape[0] == 1 and batch_size > 1:
mask = mask.expand(batch_size, -1, -1)
elif mask.shape[0] != batch_size:
raise ValueError(f"Trellis2Conditioning mask batch {mask.shape[0]} does not match image batch {batch_size}")
if image.ndim == 4:
image = image[0]
if mask.ndim == 3:
mask = mask[0]
cond_512_list = []
cond_1024_list = []
img_np = (image.cpu().numpy() * 255).clip(0, 255).astype(np.uint8)
mask_np = (mask.cpu().numpy() * 255).clip(0, 255).astype(np.uint8)
for b in range(batch_size):
item_image = image[b]
item_mask = mask[b]
pil_img = Image.fromarray(img_np)
pil_mask = Image.fromarray(mask_np)
img_np = (item_image.cpu().numpy() * 255).clip(0, 255).astype(np.uint8)
mask_np = (item_mask.cpu().numpy() * 255).clip(0, 255).astype(np.uint8)
max_size = max(pil_img.size)
scale = min(1.0, 1024 / max_size)
if scale < 1.0:
new_w, new_h = int(pil_img.width * scale), int(pil_img.height * scale)
pil_img = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS)
pil_mask = pil_mask.resize((new_w, new_h), Image.Resampling.NEAREST)
pil_img = Image.fromarray(img_np)
pil_mask = Image.fromarray(mask_np)
rgba_np = np.zeros((pil_img.height, pil_img.width, 4), dtype=np.uint8)
rgba_np[:, :, :3] = np.array(pil_img)
rgba_np[:, :, 3] = np.array(pil_mask)
max_size = max(pil_img.size)
scale = min(1.0, 1024 / max_size)
if scale < 1.0:
new_w, new_h = int(pil_img.width * scale), int(pil_img.height * scale)
pil_img = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS)
pil_mask = pil_mask.resize((new_w, new_h), Image.Resampling.NEAREST)
alpha = rgba_np[:, :, 3]
bbox_coords = np.argwhere(alpha > 0.8 * 255)
rgba_np = np.zeros((pil_img.height, pil_img.width, 4), dtype=np.uint8)
rgba_np[:, :, :3] = np.array(pil_img)
rgba_np[:, :, 3] = np.array(pil_mask)
if len(bbox_coords) > 0:
y_min, x_min = np.min(bbox_coords[:, 0]), np.min(bbox_coords[:, 1])
y_max, x_max = np.max(bbox_coords[:, 0]), np.max(bbox_coords[:, 1])
alpha = rgba_np[:, :, 3]
bbox_coords = np.argwhere(alpha > 0.8 * 255)
center_y, center_x = (y_min + y_max) / 2.0, (x_min + x_max) / 2.0
size = max(y_max - y_min, x_max - x_min)
if len(bbox_coords) > 0:
y_min, x_min = np.min(bbox_coords[:, 0]), np.min(bbox_coords[:, 1])
y_max, x_max = np.max(bbox_coords[:, 0]), np.max(bbox_coords[:, 1])
crop_x1 = int(center_x - size // 2)
crop_y1 = int(center_y - size // 2)
crop_x2 = int(center_x + size // 2)
crop_y2 = int(center_y + size // 2)
center_y, center_x = (y_min + y_max) / 2.0, (x_min + x_max) / 2.0
size = max(y_max - y_min, x_max - x_min)
rgba_pil = Image.fromarray(rgba_np)
cropped_rgba = rgba_pil.crop((crop_x1, crop_y1, crop_x2, crop_y2))
cropped_np = np.array(cropped_rgba).astype(np.float32) / 255.0
else:
import logging
logging.warning("Mask for the image is empty. Trellis2 requires an image with a mask for the best mesh quality.")
cropped_np = rgba_np.astype(np.float32) / 255.0
crop_x1 = int(center_x - size // 2)
crop_y1 = int(center_y - size // 2)
crop_x2 = int(center_x + size // 2)
crop_y2 = int(center_y + size // 2)
bg_colors = {"black":[0.0, 0.0, 0.0], "gray":[0.5, 0.5, 0.5], "white":[1.0, 1.0, 1.0]}
bg_rgb = np.array(bg_colors.get(background_color, [0.0, 0.0, 0.0]), dtype=np.float32)
rgba_pil = Image.fromarray(rgba_np)
cropped_rgba = rgba_pil.crop((crop_x1, crop_y1, crop_x2, crop_y2))
cropped_np = np.array(cropped_rgba).astype(np.float32) / 255.0
else:
import logging
logging.warning("Mask for the image is empty. Trellis2 requires an image with a mask for the best mesh quality.")
cropped_np = rgba_np.astype(np.float32) / 255.0
fg = cropped_np[:, :, :3]
alpha_float = cropped_np[:, :, 3:4]
composite_np = fg * alpha_float + bg_rgb * (1.0 - alpha_float)
bg_colors = {"black":[0.0, 0.0, 0.0], "gray":[0.5, 0.5, 0.5], "white":[1.0, 1.0, 1.0]}
bg_rgb = np.array(bg_colors.get(background_color, [0.0, 0.0, 0.0]), dtype=np.float32)
# to match trellis2 code (quantize -> dequantize)
composite_uint8 = (composite_np * 255.0).round().clip(0, 255).astype(np.uint8)
fg = cropped_np[:, :, :3]
alpha_float = cropped_np[:, :, 3:4]
composite_np = fg * alpha_float + bg_rgb * (1.0 - alpha_float)
cropped_pil = Image.fromarray(composite_uint8)
# to match trellis2 code (quantize -> dequantize)
composite_uint8 = (composite_np * 255.0).round().clip(0, 255).astype(np.uint8)
conditioning = run_conditioning(clip_vision_model, cropped_pil, include_1024=True)
cropped_pil = Image.fromarray(composite_uint8)
embeds = conditioning["cond_1024"]
positive = [[conditioning["cond_512"], {"embeds": embeds}]]
negative = [[conditioning["neg_cond"], {"embeds": torch.zeros_like(embeds)}]]
item_conditioning = run_conditioning(clip_vision_model, cropped_pil, include_1024=True)
cond_512_list.append(item_conditioning["cond_512"])
cond_1024_list.append(item_conditioning["cond_1024"])
cond_512_batched = torch.cat(cond_512_list, dim=0)
cond_1024_batched = torch.cat(cond_1024_list, dim=0)
neg_cond_batched = torch.zeros_like(cond_512_batched)
neg_embeds_batched = torch.zeros_like(cond_1024_batched)
positive = [[cond_512_batched, {"embeds": cond_1024_batched}]]
negative = [[neg_cond_batched, {"embeds": neg_embeds_batched}]]
return IO.NodeOutput(positive, negative)
class EmptyShapeLatentTrellis2(IO.ComfyNode):
@ -375,7 +692,8 @@ class EmptyShapeLatentTrellis2(IO.ComfyNode):
category="latent/3d",
inputs=[
IO.AnyType.Input("structure_or_coords"),
IO.Model.Input("model")
IO.Model.Input("model"),
IO.Int.Input("seed", default=0, min=0, max=0xffffffffffffffff),
],
outputs=[
IO.Latent.Output(),
@ -384,14 +702,25 @@ class EmptyShapeLatentTrellis2(IO.ComfyNode):
)
@classmethod
def execute(cls, structure_or_coords, model):
def execute(cls, structure_or_coords, model, seed):
# to accept the upscaled coords
is_512_pass = False
coord_counts = None
coord_resolutions = None
batch_index = None
if hasattr(structure_or_coords, "data") and structure_or_coords.data.ndim == 4:
decoded = structure_or_coords.data.unsqueeze(1)
coords = torch.argwhere(decoded.bool())[:, [0, 2, 3, 4]].int()
is_512_pass = True
batch_index = normalize_batch_index(getattr(structure_or_coords, "batch_index", None))
elif isinstance(structure_or_coords, dict):
coords = structure_or_coords["coords"].int()
coord_counts = structure_or_coords.get("coord_counts")
coord_resolutions = structure_or_coords.get("resolutions")
batch_index = normalize_batch_index(structure_or_coords.get("batch_index"))
is_512_pass = False
elif isinstance(structure_or_coords, torch.Tensor) and structure_or_coords.ndim == 2:
coords = structure_or_coords.int()
@ -400,8 +729,31 @@ class EmptyShapeLatentTrellis2(IO.ComfyNode):
else:
raise ValueError(f"Invalid input to EmptyShapeLatent: {type(structure_or_coords)}")
in_channels = 32
# image like format
latent = torch.randn(1, in_channels, coords.shape[0], 1)
batch_size, inferred_coord_counts, max_tokens = infer_batched_coord_layout(coords)
if coord_counts is not None:
coord_counts = coord_counts.to(dtype=torch.int64, device=coords.device)
if coord_counts.shape != inferred_coord_counts.shape or not torch.equal(coord_counts, inferred_coord_counts):
raise ValueError(
f"Trellis2 coord_counts metadata {coord_counts.tolist()} does not match coords layout {inferred_coord_counts.tolist()}"
)
else:
coord_counts = inferred_coord_counts
if batch_size == 1:
sample_index = resolve_singleton_sample_index(batch_index)
generator = torch.Generator(device="cpu")
generator.manual_seed(int(seed) + sample_index)
latent = torch.randn(1, in_channels, coords.shape[0], 1, generator=generator)
else:
sample_indices = resolve_sample_indices(batch_index, batch_size)
latent = torch.zeros(batch_size, in_channels, max_tokens, 1)
for i, sample_index in enumerate(sample_indices):
count = int(coord_counts[i].item())
generator = torch.Generator(device="cpu")
generator.manual_seed(int(seed) + int(sample_index))
latent_i = torch.randn(1, in_channels, count, 1, generator=generator)
latent[i, :, :count] = latent_i[0]
if coord_counts is not None:
latent.trellis_coord_counts = coord_counts.clone()
model = model.clone()
model.model_options = model.model_options.copy()
if "transformer_options" in model.model_options:
@ -410,11 +762,20 @@ class EmptyShapeLatentTrellis2(IO.ComfyNode):
model.model_options["transformer_options"] = {}
model.model_options["transformer_options"]["coords"] = coords
if coord_counts is not None:
model.model_options["transformer_options"]["coord_counts"] = coord_counts
if is_512_pass:
model.model_options["transformer_options"]["generation_mode"] = "shape_generation_512"
else:
model.model_options["transformer_options"]["generation_mode"] = "shape_generation"
return IO.NodeOutput({"samples": latent, "coords": coords, "type": "trellis2"}, model)
output = {"samples": latent, "coords": coords, "type": "trellis2"}
if batch_index is not None:
output["batch_index"] = normalize_batch_index(batch_index)
if coord_counts is not None:
output["coord_counts"] = coord_counts
if coord_resolutions is not None:
output["resolutions"] = coord_resolutions
return IO.NodeOutput(output, model)
class EmptyTextureLatentTrellis2(IO.ComfyNode):
@classmethod
@ -425,7 +786,8 @@ class EmptyTextureLatentTrellis2(IO.ComfyNode):
inputs=[
IO.Voxel.Input("structure_or_coords"),
IO.Latent.Input("shape_latent"),
IO.Model.Input("model")
IO.Model.Input("model"),
IO.Int.Input("seed", default=0, min=0, max=0xffffffffffffffff),
],
outputs=[
IO.Latent.Output(),
@ -434,20 +796,68 @@ class EmptyTextureLatentTrellis2(IO.ComfyNode):
)
@classmethod
def execute(cls, structure_or_coords, shape_latent, model):
def execute(cls, structure_or_coords, shape_latent, model, seed):
channels = 32
coord_counts = None
batch_index = None
if hasattr(structure_or_coords, "data") and structure_or_coords.data.ndim == 4:
decoded = structure_or_coords.data.unsqueeze(1)
coords = torch.argwhere(decoded.bool())[:, [0, 2, 3, 4]].int()
batch_index = normalize_batch_index(getattr(structure_or_coords, "batch_index", None))
elif isinstance(structure_or_coords, dict):
coords = structure_or_coords["coords"].int()
coord_counts = structure_or_coords.get("coord_counts")
batch_index = normalize_batch_index(structure_or_coords.get("batch_index"))
elif isinstance(structure_or_coords, torch.Tensor) and structure_or_coords.ndim == 2:
coords = structure_or_coords.int()
else:
raise ValueError(
"structure_or_coords must be a voxel input with data.ndim == 4, "
f'a dict containing "coords", or a 2D torch.Tensor; got {type(structure_or_coords).__name__}'
)
shape_batch_index = normalize_batch_index(shape_latent.get("batch_index"))
if batch_index is None:
batch_index = shape_batch_index
shape_latent = shape_latent["samples"]
batch_size, inferred_coord_counts, max_tokens = infer_batched_coord_layout(coords)
if coord_counts is not None:
coord_counts = coord_counts.to(dtype=torch.int64, device=coords.device)
if coord_counts.shape != inferred_coord_counts.shape or not torch.equal(coord_counts, inferred_coord_counts):
raise ValueError(
f"Trellis2 coord_counts metadata {coord_counts.tolist()} does not match coords layout {inferred_coord_counts.tolist()}"
)
else:
coord_counts = inferred_coord_counts
if shape_latent.ndim == 4:
shape_latent = shape_latent.squeeze(-1).transpose(1, 2).reshape(-1, channels)
if shape_latent.shape[0] != batch_size:
raise ValueError(
f"shape_latent batch {shape_latent.shape[0]} doesn't match coords batch {batch_size}"
)
shape_latent = shape_latent.squeeze(-1).transpose(1, 2)
if shape_latent.shape[1] < max_tokens:
raise ValueError(
f"shape_latent tokens {shape_latent.shape[1]} can't cover coords max tokens {max_tokens}"
)
latent = torch.randn(1, channels, coords.shape[0], 1)
if batch_size == 1:
sample_index = resolve_singleton_sample_index(batch_index)
generator = torch.Generator(device="cpu")
generator.manual_seed(int(seed) + sample_index)
latent = torch.randn(1, channels, coords.shape[0], 1, generator=generator)
else:
sample_indices = resolve_sample_indices(batch_index, batch_size)
latent = torch.zeros(batch_size, channels, max_tokens, 1)
for i, sample_index in enumerate(sample_indices):
count = int(coord_counts[i].item())
generator = torch.Generator(device="cpu")
generator.manual_seed(int(seed) + int(sample_index))
latent_i = torch.randn(1, channels, count, 1, generator=generator)
latent[i, :, :count] = latent_i[0]
if coord_counts is not None:
latent.trellis_coord_counts = coord_counts.clone()
model = model.clone()
model.model_options = model.model_options.copy()
if "transformer_options" in model.model_options:
@ -456,9 +866,16 @@ class EmptyTextureLatentTrellis2(IO.ComfyNode):
model.model_options["transformer_options"] = {}
model.model_options["transformer_options"]["coords"] = coords
if coord_counts is not None:
model.model_options["transformer_options"]["coord_counts"] = coord_counts
model.model_options["transformer_options"]["generation_mode"] = "texture_generation"
model.model_options["transformer_options"]["shape_slat"] = shape_latent
return IO.NodeOutput({"samples": latent, "coords": coords, "type": "trellis2"}, model)
output = {"samples": latent, "coords": coords, "type": "trellis2"}
if batch_index is not None:
output["batch_index"] = normalize_batch_index(batch_index)
if coord_counts is not None:
output["coord_counts"] = coord_counts
return IO.NodeOutput(output, model)
class EmptyStructureLatentTrellis2(IO.ComfyNode):
@ -469,17 +886,30 @@ class EmptyStructureLatentTrellis2(IO.ComfyNode):
category="latent/3d",
inputs=[
IO.Int.Input("batch_size", default=1, min=1, max=4096, tooltip="The number of latent images in the batch."),
IO.Int.Input("batch_index_start", default=0, min=0, max=4096, tooltip="Starting sample index for per-sample sampler noise."),
IO.Int.Input("seed", default=0, min=0, max=0xffffffffffffffff),
],
outputs=[
IO.Latent.Output(),
]
)
@classmethod
def execute(cls, batch_size):
def execute(cls, batch_size, batch_index_start, seed):
in_channels = 8
resolution = 16
latent = torch.randn(batch_size, in_channels, resolution, resolution, resolution)
return IO.NodeOutput({"samples": latent, "type": "trellis2"})
sample_indices = [int(batch_index_start) + i for i in range(batch_size)]
latent = torch.zeros(batch_size, in_channels, resolution, resolution, resolution)
for i, sample_index in enumerate(sample_indices):
generator = torch.Generator(device="cpu")
generator.manual_seed(int(seed) + sample_index)
latent[i] = torch.randn(1, in_channels, resolution, resolution, resolution, generator=generator)[0]
output = {
"samples": latent,
"type": "trellis2",
}
if batch_size > 1 or batch_index_start != 0:
output["batch_index"] = sample_indices
return IO.NodeOutput(output)
def simplify_fn(vertices, faces, colors=None, target=100000):
if vertices.ndim == 3:
@ -508,34 +938,49 @@ def simplify_fn(vertices, faces, colors=None, target=100000):
volume = (extent[0] * extent[1] * extent[2]).clamp(min=1e-8)
cell_size = (volume / target_v) ** (1/3.0)
quantized = ((vertices - min_v) / cell_size).round().long()
unique_coords, inverse_indices = torch.unique(quantized, dim=0, return_inverse=True)
# Use CPU-side ordered reductions here so repeated runs produce identical
# simplified meshes instead of relying on GPU scatter-add accumulation order.
vertices_np = vertices.detach().cpu().numpy()
faces_np = faces.detach().cpu().numpy()
colors_np = colors.detach().cpu().numpy() if colors is not None else None
min_v_np = min_v.detach().cpu().numpy()
cell_size_value = float(cell_size.detach().cpu())
quantized = np.rint((vertices_np - min_v_np) / cell_size_value).astype(np.int64)
unique_coords, inverse_indices = np.unique(quantized, axis=0, return_inverse=True)
num_cells = unique_coords.shape[0]
new_vertices = torch.zeros((num_cells, 3), dtype=vertices.dtype, device=device)
counts = torch.zeros((num_cells, 1), dtype=vertices.dtype, device=device)
new_vertices.scatter_add_(0, inverse_indices.unsqueeze(1).expand(-1, 3), vertices)
counts.scatter_add_(0, inverse_indices.unsqueeze(1), torch.ones_like(vertices[:, :1]))
new_vertices = new_vertices / counts.clamp(min=1)
new_vertices_np = np.zeros((num_cells, 3), dtype=vertices_np.dtype)
np.add.at(new_vertices_np, inverse_indices, vertices_np)
counts_np = np.bincount(inverse_indices, minlength=num_cells).astype(vertices_np.dtype).reshape(-1, 1)
new_vertices_np = new_vertices_np / np.clip(counts_np, 1, None)
new_colors = None
if colors is not None:
new_colors = torch.zeros((num_cells, colors.shape[1]), dtype=colors.dtype, device=device)
new_colors.scatter_add_(0, inverse_indices.unsqueeze(1).expand(-1, colors.shape[1]), colors)
new_colors = new_colors / counts.clamp(min=1)
if colors_np is not None:
new_colors_np = np.zeros((num_cells, colors_np.shape[1]), dtype=colors_np.dtype)
np.add.at(new_colors_np, inverse_indices, colors_np)
new_colors = new_colors_np / np.clip(counts_np, 1, None)
new_faces = inverse_indices[faces]
new_faces = inverse_indices[faces_np]
valid_mask = (new_faces[:, 0] != new_faces[:, 1]) & \
(new_faces[:, 1] != new_faces[:, 2]) & \
(new_faces[:, 2] != new_faces[:, 0])
new_faces = new_faces[valid_mask]
unique_face_indices, inv_face = torch.unique(new_faces.reshape(-1), return_inverse=True)
final_vertices = new_vertices[unique_face_indices]
final_faces = inv_face.reshape(-1, 3)
if new_faces.size == 0:
final_vertices_np = new_vertices_np[:0]
final_faces_np = np.empty((0, 3), dtype=np.int64)
final_colors_np = new_colors[:0] if new_colors is not None else None
else:
unique_face_indices, inv_face = np.unique(new_faces.reshape(-1), return_inverse=True)
final_vertices_np = new_vertices_np[unique_face_indices]
final_faces_np = inv_face.reshape(-1, 3).astype(np.int64)
final_colors_np = new_colors[unique_face_indices] if new_colors is not None else None
# assign colors
final_colors = new_colors[unique_face_indices] if new_colors is not None else None
final_vertices = torch.from_numpy(final_vertices_np).to(device=device, dtype=vertices.dtype)
final_faces = torch.from_numpy(final_faces_np).to(device=device, dtype=faces.dtype)
final_colors = torch.from_numpy(final_colors_np).to(device=device, dtype=colors.dtype) if final_colors_np is not None else None
return final_vertices, final_faces, final_colors
@ -651,7 +1096,22 @@ class PostProcessMesh(IO.ComfyNode):
@classmethod
def execute(cls, mesh, simplify, fill_holes_perimeter):
# TODO: batched mode may break
if hasattr(mesh, "vertex_counts"):
out_verts, out_faces, out_colors = [], [], []
for i in range(mesh.vertices.shape[0]):
v_i, f_i, c_i = get_mesh_batch_item(mesh, i)
actual_face_count = f_i.shape[0]
if fill_holes_perimeter > 0:
v_i, f_i = fill_holes_fn(v_i, f_i, max_perimeter=fill_holes_perimeter)
if simplify > 0 and actual_face_count > simplify:
v_i, f_i, c_i = simplify_fn(v_i, f_i, target=simplify, colors=c_i)
v_i, f_i = make_double_sided(v_i, f_i)
out_verts.append(v_i)
out_faces.append(f_i)
if c_i is not None:
out_colors.append(c_i)
out_mesh = pack_variable_mesh_batch(out_verts, out_faces, out_colors if len(out_colors) == len(out_verts) else None)
return IO.NodeOutput(out_mesh)
verts, faces = mesh.vertices, mesh.faces
colors = None
if hasattr(mesh, "colors"):

View File

@ -0,0 +1,376 @@
import importlib
import sys
import types
import unittest
from unittest.mock import patch
import torch
from PIL import Image
class _DummyPort:
@staticmethod
def Input(*args, **kwargs):
return None
@staticmethod
def Output(*args, **kwargs):
return None
class _DummyIO:
ComfyNode = object
@staticmethod
def Schema(*args, **kwargs):
return None
@staticmethod
def NodeOutput(*args, **kwargs):
return args
def __getattr__(self, name):
return _DummyPort
class _DummyTypes:
def __getattr__(self, name):
return lambda *args, **kwargs: None
dummy_comfy_api_latest = types.SimpleNamespace(
ComfyExtension=object,
IO=_DummyIO(),
Types=_DummyTypes(),
)
dummy_sparse_tensor = type("SparseTensor", (), {})
dummy_trellis_vae = types.SimpleNamespace(SparseTensor=dummy_sparse_tensor)
with patch.dict(sys.modules, {
"comfy_api.latest": dummy_comfy_api_latest,
"comfy.ldm.trellis2.vae": dummy_trellis_vae,
}):
nodes_trellis2 = importlib.import_module("comfy_extras.nodes_trellis2")
class DummyInnerModel:
def __init__(self, image_size=..., fail_on_call=None):
self.call_count = 0
self.fail_on_call = fail_on_call
if image_size is not ...:
self.image_size = image_size
def __call__(self, input_tensor, skip_norm_elementwise=True):
self.call_count += 1
if self.fail_on_call == self.call_count:
raise RuntimeError("expected conditioning failure")
return (torch.ones((1, 4), dtype=torch.float32),)
class DummyModel:
def __init__(self, inner_model):
self.model = inner_model
class DummyPatcher:
def __init__(self, free_memory):
self.free_memory = free_memory
def get_free_memory(self, device):
return self.free_memory
class DummyVAE:
vae_dtype = torch.float16
def __init__(self, free_memory, memory_factor=2):
self.patcher = DummyPatcher(free_memory)
self.memory_factor = memory_factor
def memory_used_decode(self, shape, dtype):
return shape[2] * shape[3] * self.memory_factor
class TestPrepareTrellisVaeForDecode(unittest.TestCase):
def test_uses_load_models_gpu_without_pre_freeing_memory(self):
vae = DummyVAE(free_memory=1000)
with patch.object(nodes_trellis2.comfy.model_management, "get_torch_device", return_value="cuda"):
with patch.object(nodes_trellis2.comfy.model_management, "free_memory") as free_memory:
with patch.object(nodes_trellis2.comfy.model_management, "load_models_gpu") as load_models_gpu:
batch_number = nodes_trellis2.prepare_trellis_vae_for_decode(vae, (3, 32, 10, 1))
free_memory.assert_not_called()
load_models_gpu.assert_called_once_with(
[vae.patcher],
memory_required=20,
force_full_load=False,
)
self.assertEqual(batch_number, 50)
def test_scales_memory_estimate_for_5d_structure_latents(self):
vae = DummyVAE(free_memory=40960, memory_factor=1)
with patch.object(nodes_trellis2.comfy.model_management, "get_torch_device", return_value="cuda"):
with patch.object(nodes_trellis2.comfy.model_management, "load_models_gpu") as load_models_gpu:
batch_number = nodes_trellis2.prepare_trellis_vae_for_decode(vae, (2, 8, 16, 16, 16))
load_models_gpu.assert_called_once_with(
[vae.patcher],
memory_required=4096,
force_full_load=False,
)
self.assertEqual(batch_number, 10)
class TestRunConditioningRestore(unittest.TestCase):
def setUp(self):
self.intermediate_patch = patch.object(
nodes_trellis2.comfy.model_management, "intermediate_device", lambda: "cpu"
)
self.torch_device_patch = patch.object(
nodes_trellis2.comfy.model_management, "get_torch_device", lambda: "cpu"
)
self.intermediate_patch.start()
self.torch_device_patch.start()
def tearDown(self):
self.intermediate_patch.stop()
self.torch_device_patch.stop()
@staticmethod
def make_test_image():
return Image.new("RGB", (8, 8), color="white")
def test_restores_existing_image_size_after_success(self):
inner_model = DummyInnerModel(image_size=777)
nodes_trellis2.run_conditioning(DummyModel(inner_model), self.make_test_image(), include_1024=True)
self.assertEqual(inner_model.image_size, 777)
def test_deletes_missing_image_size_after_success(self):
inner_model = DummyInnerModel()
nodes_trellis2.run_conditioning(DummyModel(inner_model), self.make_test_image(), include_1024=True)
self.assertFalse(hasattr(inner_model, "image_size"))
def test_restores_existing_image_size_after_512_failure(self):
inner_model = DummyInnerModel(image_size=777, fail_on_call=1)
with self.assertRaisesRegex(RuntimeError, "expected conditioning failure"):
nodes_trellis2.run_conditioning(DummyModel(inner_model), self.make_test_image(), include_1024=True)
self.assertEqual(inner_model.image_size, 777)
def test_deletes_missing_image_size_after_1024_failure(self):
inner_model = DummyInnerModel(fail_on_call=2)
with self.assertRaisesRegex(RuntimeError, "expected conditioning failure"):
nodes_trellis2.run_conditioning(DummyModel(inner_model), self.make_test_image(), include_1024=True)
self.assertFalse(hasattr(inner_model, "image_size"))
class DummyCloneModel:
def __init__(self):
self.model_options = {}
def clone(self):
cloned = DummyCloneModel()
cloned.model_options = self.model_options.copy()
return cloned
class TestTrellisBatchSemantics(unittest.TestCase):
def test_empty_structure_latent_is_deterministic_and_propagates_sample_indices(self):
batch_output = nodes_trellis2.EmptyStructureLatentTrellis2.execute(2, 0, 17)[0]
single_output = nodes_trellis2.EmptyStructureLatentTrellis2.execute(1, 5, 17)[0]
expected_batch = torch.zeros(2, 8, 16, 16, 16)
expected_batch[0] = torch.randn(1, 8, 16, 16, 16, generator=torch.Generator(device="cpu").manual_seed(17))[0]
expected_batch[1] = torch.randn(1, 8, 16, 16, 16, generator=torch.Generator(device="cpu").manual_seed(18))[0]
expected_single = torch.randn(1, 8, 16, 16, 16, generator=torch.Generator(device="cpu").manual_seed(22))
self.assertTrue(torch.equal(batch_output["samples"], expected_batch))
self.assertEqual(batch_output["batch_index"], [0, 1])
self.assertTrue(torch.equal(single_output["samples"], expected_single))
self.assertEqual(single_output["batch_index"], [5])
def test_empty_shape_latent_is_deterministic_and_propagates_batch_index(self):
coords = torch.tensor(
[
[1, 5, 5, 5],
[0, 1, 1, 1],
[1, 6, 6, 6],
[0, 2, 2, 2],
[1, 7, 7, 7],
],
dtype=torch.int32,
)
structure = {
"coords": coords,
"coord_counts": torch.tensor([2, 3], dtype=torch.int64),
"batch_index": [4, 9],
}
output, _ = nodes_trellis2.EmptyShapeLatentTrellis2.execute(structure, DummyCloneModel(), 23)
expected = torch.zeros(2, 32, 3, 1)
expected[0, :, :2, :] = torch.randn(1, 32, 2, 1, generator=torch.Generator(device="cpu").manual_seed(27))[0]
expected[1, :, :3, :] = torch.randn(1, 32, 3, 1, generator=torch.Generator(device="cpu").manual_seed(32))[0]
self.assertTrue(torch.equal(output["samples"], expected))
self.assertTrue(torch.equal(output["coord_counts"], torch.tensor([2, 3], dtype=torch.int64)))
self.assertEqual(output["batch_index"], [4, 9])
def test_empty_shape_latent_keeps_singleton_coord_counts(self):
structure = {
"coords": torch.tensor(
[
[0, 1, 1, 1],
[0, 2, 2, 2],
],
dtype=torch.int32,
),
}
output, _ = nodes_trellis2.EmptyShapeLatentTrellis2.execute(structure, DummyCloneModel(), 11)
self.assertTrue(torch.equal(output["coord_counts"], torch.tensor([2], dtype=torch.int64)))
def test_empty_shape_latent_rejects_multi_index_singleton(self):
structure = {
"coords": torch.tensor(
[
[0, 1, 1, 1],
[0, 2, 2, 2],
],
dtype=torch.int32,
),
"batch_index": [5, 6],
}
with self.assertRaises(ValueError):
nodes_trellis2.EmptyShapeLatentTrellis2.execute(structure, DummyCloneModel(), 11)
def test_empty_texture_latent_rejects_multi_index_singleton(self):
coords = torch.tensor(
[
[0, 1, 1, 1],
[0, 2, 2, 2],
],
dtype=torch.int32,
)
structure = {"coords": coords, "batch_index": [7, 8]}
shape_latent = {"samples": torch.zeros(1, 32, 2, 1)}
with self.assertRaises(ValueError):
nodes_trellis2.EmptyTextureLatentTrellis2.execute(
structure,
shape_latent,
DummyCloneModel(),
13,
)
def test_empty_texture_latent_rejects_invalid_structure_input(self):
with self.assertRaises(ValueError):
nodes_trellis2.EmptyTextureLatentTrellis2.execute(
"bad-input",
{"samples": torch.zeros(1, 32, 2, 1)},
DummyCloneModel(),
13,
)
def test_empty_texture_latent_uses_shape_batch_index_for_seed_fallback(self):
coords = torch.tensor(
[
[0, 1, 1, 1],
[1, 2, 2, 2],
[1, 3, 3, 3],
],
dtype=torch.int32,
)
structure = {"coords": coords}
shape_latent = {
"samples": torch.zeros(2, 32, 2, 1),
"batch_index": [4, 9],
}
output, _ = nodes_trellis2.EmptyTextureLatentTrellis2.execute(
structure,
shape_latent,
DummyCloneModel(),
13,
)
expected = torch.zeros(2, 32, 2, 1)
expected[0, :, :1, :] = torch.randn(1, 32, 1, 1, generator=torch.Generator(device="cpu").manual_seed(17))[0]
expected[1, :, :2, :] = torch.randn(1, 32, 2, 1, generator=torch.Generator(device="cpu").manual_seed(22))[0]
self.assertTrue(torch.equal(output["samples"], expected))
self.assertEqual(output["batch_index"], [4, 9])
def test_flatten_batched_sparse_latent_validates_coord_counts(self):
samples = torch.zeros(2, 32, 3, 1)
coords = torch.tensor(
[
[0, 1, 1, 1],
[1, 2, 2, 2],
[1, 3, 3, 3],
],
dtype=torch.int32,
)
coord_counts = torch.tensor([2, 1], dtype=torch.int64)
with self.assertRaises(ValueError):
nodes_trellis2.flatten_batched_sparse_latent(samples, coords, coord_counts)
def test_infer_batched_coord_layout_rejects_negative_batch_ids(self):
coords = torch.tensor(
[
[-1, 1, 1, 1],
[0, 2, 2, 2],
],
dtype=torch.int32,
)
with self.assertRaises(ValueError):
nodes_trellis2.infer_batched_coord_layout(coords)
def test_split_batched_coords_validates_total_count(self):
coords = torch.tensor(
[
[0, 1, 1, 1],
[1, 2, 2, 2],
[1, 3, 3, 3],
],
dtype=torch.int32,
)
coord_counts = torch.tensor([1, 1], dtype=torch.int64)
with self.assertRaises(ValueError):
nodes_trellis2.split_batched_coords(coords, coord_counts)
def test_empty_shape_latent_preserves_resolutions_key(self):
structure = {
"coords": torch.tensor(
[
[0, 1, 1, 1],
[0, 2, 2, 2],
],
dtype=torch.int32,
),
"resolutions": torch.tensor([1024], dtype=torch.int64),
}
output, model = nodes_trellis2.EmptyShapeLatentTrellis2.execute(structure, DummyCloneModel(), 11)
self.assertTrue(torch.equal(output["resolutions"], torch.tensor([1024], dtype=torch.int64)))
self.assertNotIn("coord_resolutions", model.model_options["transformer_options"])
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,76 @@
import unittest
import torch
import comfy.sample
class TestPrepareNoiseInnerTrellis(unittest.TestCase):
def test_coord_counts_noise_matches_per_index_prefix_draws(self):
latent = torch.zeros(2, 4, 5, 1)
latent.trellis_coord_counts = torch.tensor([3, 5], dtype=torch.int64)
generator = torch.Generator(device="cpu")
generator.manual_seed(123)
noise = comfy.sample.prepare_noise_inner(latent, generator)
expected = torch.zeros_like(noise, dtype=torch.float32)
row0 = torch.Generator(device="cpu")
row0.manual_seed(123)
expected[0, :, :3, :] = torch.randn(1, 4, 3, 1, generator=row0)[0]
row1 = torch.Generator(device="cpu")
row1.manual_seed(124)
expected[1] = torch.randn(1, 4, 5, 1, generator=row1)[0]
self.assertTrue(torch.equal(noise.float(), expected))
self.assertTrue(torch.equal(noise[0, :, 3:, :], torch.zeros_like(noise[0, :, 3:, :])))
def test_coord_counts_noise_inds_share_prefixes_for_duplicates(self):
latent = torch.zeros(2, 4, 5, 1)
latent.trellis_coord_counts = torch.tensor([3, 5], dtype=torch.int64)
generator = torch.Generator(device="cpu")
generator.manual_seed(456)
noise = comfy.sample.prepare_noise_inner(latent, generator, noise_inds=[7, 7])
replay = torch.Generator(device="cpu")
replay.manual_seed(463)
expected1 = torch.randn(1, 4, 5, 1, generator=replay)
expected0 = expected1[:, :, :3, :]
self.assertTrue(torch.equal(noise[0:1, :, :3, :], expected0))
self.assertTrue(torch.equal(noise[1:2, :, :5, :], expected1))
self.assertTrue(torch.equal(noise[0, :, 3:, :], torch.zeros_like(noise[0, :, 3:, :])))
def test_coord_counts_noise_inds_length_must_match_batch(self):
latent = torch.zeros(2, 4, 5, 1)
latent.trellis_coord_counts = torch.tensor([3, 5], dtype=torch.int64)
generator = torch.Generator(device="cpu")
generator.manual_seed(456)
with self.assertRaises(ValueError):
comfy.sample.prepare_noise_inner(latent, generator, noise_inds=[7])
def test_coord_counts_metadata_must_match_batch_and_bounds(self):
generator = torch.Generator(device="cpu")
generator.manual_seed(456)
latent = torch.zeros(2, 4, 5, 1)
latent.trellis_coord_counts = torch.tensor([[3, 5]], dtype=torch.int64)
with self.assertRaises(ValueError):
comfy.sample.prepare_noise_inner(latent, generator)
latent = torch.zeros(2, 4, 5, 1)
latent.trellis_coord_counts = torch.tensor([3], dtype=torch.int64)
with self.assertRaises(ValueError):
comfy.sample.prepare_noise_inner(latent, generator)
latent = torch.zeros(2, 4, 5, 1)
latent.trellis_coord_counts = torch.tensor([3, 6], dtype=torch.int64)
with self.assertRaises(ValueError):
comfy.sample.prepare_noise_inner(latent, generator)
if __name__ == "__main__":
unittest.main()