mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-06-11 00:37:53 +08:00
322 lines
17 KiB
Python
322 lines
17 KiB
Python
"""SCAIL / SCAIL-2 nodes: the WanSCAILToVideo conditioning node and the SAM3
|
|
preprocessing that turns video tracks into the bundle the SCAIL-2 model consumes."""
|
|
|
|
from typing_extensions import override
|
|
|
|
import torch
|
|
import torch.nn.functional as F
|
|
|
|
import nodes
|
|
import node_helpers
|
|
import comfy.model_management
|
|
import comfy.utils
|
|
from comfy_api.latest import ComfyExtension, io
|
|
from comfy.ldm.sam3.tracker import unpack_masks
|
|
|
|
SAM3TrackData = io.Custom("SAM3_TRACK_DATA")
|
|
|
|
|
|
# Model was trained on these exact colors; deviating degrades multi-identity quality.
|
|
DEFAULT_PALETTE = [
|
|
(0.0, 0.0, 1.0), # Blue
|
|
(1.0, 0.0, 0.0), # Red
|
|
(0.0, 1.0, 0.0), # Green
|
|
(1.0, 0.0, 1.0), # Magenta
|
|
(0.0, 1.0, 1.0), # Cyan
|
|
(1.0, 1.0, 0.0), # Yellow
|
|
]
|
|
|
|
|
|
def _unpack(track_data):
|
|
packed = track_data["packed_masks"]
|
|
if packed is None or packed.shape[1] == 0:
|
|
return None
|
|
return unpack_masks(packed)
|
|
|
|
|
|
def _first_frame_cx_area(masks_bool):
|
|
first = masks_bool[0].float()
|
|
H, W = first.shape[-2], first.shape[-1]
|
|
n_pixels = H * W
|
|
grid_x = torch.arange(W, device=first.device, dtype=first.dtype).view(1, W)
|
|
area = first.sum(dim=(-1, -2)).clamp_(min=1)
|
|
cx = (first * grid_x).sum(dim=(-1, -2)) / area
|
|
return (cx / W).tolist(), (area / n_pixels).tolist()
|
|
|
|
|
|
def _subset_track_data(track_data, obj_indices):
|
|
out = dict(track_data)
|
|
packed = track_data["packed_masks"]
|
|
if packed is None or not obj_indices:
|
|
out["packed_masks"] = None
|
|
if "scores" in out:
|
|
out["scores"] = []
|
|
return out
|
|
out["packed_masks"] = packed[:, obj_indices].contiguous()
|
|
scores = track_data.get("scores")
|
|
if scores is not None:
|
|
out["scores"] = [scores[i] for i in obj_indices if i < len(scores)]
|
|
return out
|
|
|
|
|
|
def _render_colored_masks(track_data, background="black"):
|
|
packed = track_data["packed_masks"]
|
|
H, W = track_data["orig_size"]
|
|
device = comfy.model_management.intermediate_device()
|
|
dtype = comfy.model_management.intermediate_dtype()
|
|
bg_rgb = (1.0, 1.0, 1.0) if background.startswith("white") else (0.0, 0.0, 0.0)
|
|
if packed is None or packed.shape[1] == 0:
|
|
T = track_data.get("n_frames", 1) if packed is None else packed.shape[0]
|
|
out = torch.empty(T, H, W, 3, device=device, dtype=dtype)
|
|
out[..., 0], out[..., 1], out[..., 2] = bg_rgb[0], bg_rgb[1], bg_rgb[2]
|
|
return out
|
|
T, N_obj = packed.shape[0], packed.shape[1]
|
|
colors = torch.tensor(
|
|
[DEFAULT_PALETTE[i % len(DEFAULT_PALETTE)] for i in range(N_obj)],
|
|
device=device, dtype=dtype,
|
|
)
|
|
masks_full = unpack_masks(packed.to(device)).float()
|
|
Hm, Wm = masks_full.shape[-2], masks_full.shape[-1]
|
|
masks_full = F.interpolate(
|
|
masks_full.view(T * N_obj, 1, Hm, Wm), size=(H, W), mode="nearest"
|
|
).view(T, N_obj, H, W) > 0.5
|
|
any_mask = masks_full.any(dim=1)
|
|
obj_idx_map = masks_full.to(torch.uint8).argmax(dim=1)
|
|
color_overlay = colors[obj_idx_map]
|
|
bg_tensor = torch.tensor(bg_rgb, device=device, dtype=color_overlay.dtype).view(1, 1, 1, 3)
|
|
return torch.where(any_mask.unsqueeze(-1), color_overlay, bg_tensor.expand_as(color_overlay))
|
|
|
|
|
|
def _extract_mask_to_28ch(rgb_video):
|
|
"""Colored RGB mask (T, H, W, 3) in [0, 1] -> SCAIL-2 28-channel binary latent
|
|
(1, T_lat, 28, H_lat, W_lat). 7 per-color binary channels (white/r/g/b/y/m/c)
|
|
threshold-extracted at 225/255, 8x spatial downsample, 4-frame temporal stacking."""
|
|
T, H, W, _ = rgb_video.shape
|
|
_ON_THRESH = 225.0 / 255.0
|
|
mask = rgb_video.movedim(-1, 1).float()
|
|
R = (mask[:, 0:1] > _ON_THRESH).float()
|
|
G = (mask[:, 1:2] > _ON_THRESH).float()
|
|
B = (mask[:, 2:3] > _ON_THRESH).float()
|
|
nR, nG, nB = 1 - R, 1 - G, 1 - B
|
|
binary_7ch = torch.cat([
|
|
R * G * B, # white
|
|
R * nG * nB, # red
|
|
nR * G * nB, # green
|
|
nR * nG * B, # blue
|
|
R * G * nB, # yellow
|
|
R * nG * B, # magenta
|
|
nR * G * B, # cyan
|
|
], dim=1)
|
|
H_lat, W_lat = H, W
|
|
for _ in range(3):
|
|
H_lat = (H_lat + 1) // 2
|
|
W_lat = (W_lat + 1) // 2
|
|
binary_7ch = torch.nn.functional.interpolate(binary_7ch, size=(H_lat, W_lat), mode='area')
|
|
T_latent = (T - 1) // 4 + 1
|
|
padded = torch.cat([binary_7ch[:1].repeat(4, 1, 1, 1), binary_7ch[1:]], dim=0)
|
|
out = padded.view(T_latent, 28, H_lat, W_lat)
|
|
return out.unsqueeze(0)
|
|
|
|
|
|
class WanSCAILToVideo(io.ComfyNode):
|
|
@classmethod
|
|
def define_schema(cls):
|
|
return io.Schema(
|
|
node_id="WanSCAILToVideo",
|
|
category="model/conditioning/video_models",
|
|
inputs=[
|
|
io.Conditioning.Input("positive"),
|
|
io.Conditioning.Input("negative"),
|
|
io.Vae.Input("vae"),
|
|
io.Int.Input("width", default=512, min=32, max=nodes.MAX_RESOLUTION, step=32),
|
|
io.Int.Input("height", default=896, min=32, max=nodes.MAX_RESOLUTION, step=32),
|
|
io.Int.Input("length", default=81, min=1, max=nodes.MAX_RESOLUTION, step=4),
|
|
io.Int.Input("batch_size", default=1, min=1, max=4096),
|
|
io.Image.Input("pose_video", optional=True, tooltip="Video used for pose conditioning. Will be downscaled to half the resolution of the main video."),
|
|
io.Image.Input("pose_video_mask", optional=True, tooltip="SCAIL-2 only. Colored per-identity SAM3 mask video at the same resolution as pose_video."),
|
|
io.Boolean.Input("replacement_mode", default=False, optional=True, tooltip="SCAIL-2 only. False = Animation Mode (pose_video_mask should have black background). True = Replacement Mode (pose_video_mask should have white background)."),
|
|
io.Float.Input("pose_strength", default=1.0, min=0.0, max=10.0, step=0.01, tooltip="Strength of the pose latent."),
|
|
io.Float.Input("pose_start", default=0.0, min=0.0, max=1.0, step=0.01, tooltip="Start step of the pose conditioning."),
|
|
io.Float.Input("pose_end", default=1.0, min=0.0, max=1.0, step=0.01, tooltip="End step of the pose conditioning."),
|
|
io.Image.Input("reference_image", optional=True, tooltip="Reference image, for multiple references composite all on single image."),
|
|
io.Image.Input("reference_image_mask", optional=True, tooltip="SCAIL-2 only. Colored reference mask at the same resolution as reference_image."),
|
|
io.ClipVisionOutput.Input("clip_vision_output", optional=True, tooltip="CLIP vision features for conditioning. Model is trained with stretch resize to aspect ratio."),
|
|
io.Int.Input("video_frame_offset", default=0, min=0, max=nodes.MAX_RESOLUTION, step=1, tooltip="Cumulative output frame this chunk begins at. Wire from the previous chunk's video_frame_offset output."),
|
|
io.Int.Input("previous_frame_count", default=5, min=1, max=nodes.MAX_RESOLUTION, step=4, tooltip="Tail frames of previous_frames to anchor. SCAIL-2 trained at 5 (81-frame chunks, 76-frame step)."),
|
|
io.Image.Input("previous_frames", optional=True, tooltip="SCAIL-2 only. Full decoded output of the previous chunk. Only the last previous_frame_count are used as the extension anchor."),
|
|
],
|
|
outputs=[
|
|
io.Conditioning.Output(display_name="positive"),
|
|
io.Conditioning.Output(display_name="negative"),
|
|
io.Latent.Output(display_name="latent", tooltip="Empty latent of the generation size."),
|
|
io.Int.Output(display_name="video_frame_offset", tooltip="Adjusted offset + length. Wire into the next chunk."),
|
|
],
|
|
is_experimental=True,
|
|
)
|
|
|
|
@classmethod
|
|
def execute(cls, positive, negative, vae, width, height, length, batch_size, pose_strength, pose_start, pose_end,
|
|
video_frame_offset, previous_frame_count, replacement_mode=False, reference_image=None, clip_vision_output=None, pose_video=None,
|
|
pose_video_mask=None, reference_image_mask=None, previous_frames=None) -> io.NodeOutput:
|
|
latent = torch.zeros([batch_size, 16, ((length - 1) // 4) + 1, height // 8, width // 8], device=comfy.model_management.intermediate_device())
|
|
noise_mask = None
|
|
|
|
ref_mask_flag = not replacement_mode
|
|
positive = node_helpers.conditioning_set_values(positive, {"ref_mask_flag": ref_mask_flag})
|
|
negative = node_helpers.conditioning_set_values(negative, {"ref_mask_flag": ref_mask_flag})
|
|
|
|
prev_trimmed = None
|
|
if previous_frames is not None and previous_frames.shape[0] > 0:
|
|
prev_trimmed = previous_frames[-previous_frame_count:]
|
|
video_frame_offset -= prev_trimmed.shape[0]
|
|
video_frame_offset = max(0, video_frame_offset)
|
|
|
|
ref_latent = None
|
|
if reference_image is not None:
|
|
reference_image = comfy.utils.common_upscale(reference_image[:1].movedim(-1, 1), width, height, "bicubic", "center").movedim(1, -1)
|
|
# Replacement Mode: composite ref on black bg using reference_image_mask as alpha matte
|
|
if replacement_mode and reference_image_mask is not None:
|
|
rm = comfy.utils.common_upscale(reference_image_mask[:1].movedim(-1, 1), width, height, "nearest-exact", "center").movedim(1, -1)
|
|
is_char = (rm[..., :3].max(dim=-1, keepdim=True).values > 0.1).to(reference_image.dtype)
|
|
reference_image = reference_image * is_char
|
|
ref_latent = vae.encode(reference_image[:, :, :, :3])
|
|
|
|
if ref_latent is not None:
|
|
positive = node_helpers.conditioning_set_values(positive, {"reference_latents": [ref_latent]}, append=True)
|
|
negative = node_helpers.conditioning_set_values(negative, {"reference_latents": [ref_latent]}, append=True)
|
|
|
|
if clip_vision_output is not None:
|
|
positive = node_helpers.conditioning_set_values(positive, {"clip_vision_output": clip_vision_output})
|
|
negative = node_helpers.conditioning_set_values(negative, {"clip_vision_output": clip_vision_output})
|
|
|
|
if pose_video is not None:
|
|
if pose_video.shape[0] <= video_frame_offset:
|
|
pose_video = None
|
|
else:
|
|
pose_video = pose_video[video_frame_offset:]
|
|
if pose_video_mask is not None:
|
|
if pose_video_mask.shape[0] <= video_frame_offset:
|
|
pose_video_mask = None
|
|
else:
|
|
pose_video_mask = pose_video_mask[video_frame_offset:]
|
|
|
|
# Truncate pose+mask jointly to the shorter of the two, capped at length.
|
|
ts = [v.shape[0] for v in (pose_video, pose_video_mask) if v is not None]
|
|
if ts:
|
|
T_kept = ((min(min(ts), length) - 1) // 4) * 4 + 1
|
|
if pose_video is not None:
|
|
pose_video = pose_video[:T_kept]
|
|
if pose_video_mask is not None:
|
|
pose_video_mask = pose_video_mask[:T_kept]
|
|
|
|
if pose_video is not None:
|
|
pose_video = comfy.utils.common_upscale(pose_video[:length].movedim(-1, 1), width // 2, height // 2, "area", "center").movedim(1, -1)
|
|
pose_video_latent = vae.encode(pose_video[:, :, :, :3]) * pose_strength
|
|
positive = node_helpers.conditioning_set_values_with_timestep_range(positive, {"pose_video_latent": pose_video_latent}, pose_start, pose_end)
|
|
negative = node_helpers.conditioning_set_values_with_timestep_range(negative, {"pose_video_latent": pose_video_latent}, pose_start, pose_end)
|
|
|
|
if pose_video_mask is not None:
|
|
mask_video_hw = comfy.utils.common_upscale(pose_video_mask[:length].movedim(-1, 1), width // 2, height // 2, "area", "center").movedim(1, -1)
|
|
driving_mask_28ch = _extract_mask_to_28ch(mask_video_hw)
|
|
positive = node_helpers.conditioning_set_values(positive, {"driving_mask_28ch": driving_mask_28ch})
|
|
negative = node_helpers.conditioning_set_values(negative, {"driving_mask_28ch": driving_mask_28ch})
|
|
|
|
if reference_image_mask is not None:
|
|
ref_mask_hw = comfy.utils.common_upscale(reference_image_mask[:1].movedim(-1, 1), width, height, "bicubic", "center").movedim(1, -1)
|
|
ref_mask_1f = _extract_mask_to_28ch(ref_mask_hw)
|
|
zeros = torch.zeros((1, latent.shape[2], 28, ref_mask_1f.shape[-2], ref_mask_1f.shape[-1]), device=ref_mask_1f.device, dtype=ref_mask_1f.dtype)
|
|
ref_mask_28ch = torch.cat([ref_mask_1f, zeros], dim=1)
|
|
positive = node_helpers.conditioning_set_values(positive, {"ref_mask_28ch": ref_mask_28ch})
|
|
negative = node_helpers.conditioning_set_values(negative, {"ref_mask_28ch": ref_mask_28ch})
|
|
|
|
if prev_trimmed is not None:
|
|
pf = comfy.utils.common_upscale(prev_trimmed.movedim(-1, 1), width, height, "bicubic", "center").movedim(1, -1)
|
|
prev_latent = vae.encode(pf[:, :, :, :3])
|
|
prev_latent_frames = min(prev_latent.shape[2], latent.shape[2])
|
|
latent[:, :, :prev_latent_frames] = prev_latent[:, :, :prev_latent_frames].to(latent.dtype)
|
|
noise_mask = torch.ones((1, 1, latent.shape[2], latent.shape[-2], latent.shape[-1]), device=latent.device, dtype=latent.dtype)
|
|
noise_mask[:, :, :prev_latent_frames] = 0.0
|
|
|
|
out_latent = {"samples": latent}
|
|
if noise_mask is not None:
|
|
out_latent["noise_mask"] = noise_mask
|
|
return io.NodeOutput(positive, negative, out_latent, video_frame_offset + length)
|
|
|
|
|
|
class SCAIL2ColoredMask(io.ComfyNode):
|
|
"""Render SAM3 tracks for the driving pose video and (optionally) the reference
|
|
image into the two colored masks WanSCAILToVideo consumes. Shared `sort_by`
|
|
across both outputs guarantees identity K maps to the same color on both
|
|
sides, for multi-person workflow consistency.
|
|
reference_image_mask is always rendered black-bg (model convention)
|
|
pose_video_mask bg follows replacement_mode: black = Animation Mode, white = Replacement Mode
|
|
"""
|
|
|
|
@classmethod
|
|
def define_schema(cls):
|
|
return io.Schema(
|
|
node_id="SCAIL2ColoredMask",
|
|
display_name="Create SCAIL-2 Colored Mask",
|
|
category="conditioning/video_models/scail",
|
|
inputs=[
|
|
SAM3TrackData.Input("driving_track_data", tooltip="SAM3 track of the driving pose video. Will be rendered into the pose_video_mask output."),
|
|
SAM3TrackData.Input("ref_track_data", optional=True,
|
|
tooltip="SAM3 track of the reference image."),
|
|
io.String.Input("object_indices", default="",
|
|
tooltip="Comma-separated list of person indices to include (e.g. '0,2,3'). Applied to both reference and pose video masks. Empty = all."),
|
|
io.Combo.Input("sort_by", options=["none", "left_to_right", "area"], default="left_to_right",
|
|
tooltip="Order in which palette colors are assigned to the tracked objects (applied to both reference and pose video so each identity keeps the same color). left_to_right = leftmost object (by first-frame centroid) gets the first color; area = biggest object (by first-frame mask area) gets the first color; none = keep SAM3's order."),
|
|
io.Boolean.Input("replacement_mode", default=False,
|
|
tooltip="False = mask_video has black bg (Animation Mode). True = white bg (Replacement Mode). Set the matching replacement_mode on WanSCAILToVideo. reference_image_mask is always black-bg regardless."),
|
|
],
|
|
outputs=[
|
|
io.Image.Output("pose_video_mask"),
|
|
io.Image.Output("reference_image_mask"),
|
|
],
|
|
is_experimental=True,
|
|
)
|
|
|
|
@classmethod
|
|
def execute(cls, driving_track_data, object_indices, sort_by, replacement_mode, ref_track_data=None):
|
|
def _prep(td):
|
|
masks_bool = _unpack(td)
|
|
if sort_by != "none" and masks_bool is not None:
|
|
cx, area = _first_frame_cx_area(masks_bool)
|
|
if sort_by == "left_to_right":
|
|
order = sorted(range(len(cx)), key=lambda i: cx[i])
|
|
else: # "area"
|
|
order = sorted(range(len(area)), key=lambda i: -area[i])
|
|
td = _subset_track_data(td, order)
|
|
if object_indices.strip():
|
|
indices = [int(i.strip()) for i in object_indices.split(",") if i.strip().isdigit()]
|
|
packed = td.get("packed_masks")
|
|
n_obj = packed.shape[1] if packed is not None else 0
|
|
indices = [i for i in indices if 0 <= i < n_obj]
|
|
td = _subset_track_data(td, indices)
|
|
return td
|
|
|
|
drv = _prep(driving_track_data)
|
|
mask_video = _render_colored_masks(drv, "white" if replacement_mode else "black")
|
|
|
|
if ref_track_data is not None:
|
|
ref = _prep(ref_track_data)
|
|
reference_image_mask = _render_colored_masks(ref, "black")
|
|
else:
|
|
H, W = drv["orig_size"]
|
|
reference_image_mask = torch.zeros(1, H, W, 3, device=comfy.model_management.intermediate_device(), dtype=comfy.model_management.intermediate_dtype())
|
|
|
|
return io.NodeOutput(mask_video, reference_image_mask)
|
|
|
|
|
|
class SCAILExtension(ComfyExtension):
|
|
@override
|
|
async def get_node_list(self) -> list[type[io.ComfyNode]]:
|
|
return [
|
|
WanSCAILToVideo,
|
|
SCAIL2ColoredMask,
|
|
]
|
|
|
|
|
|
async def comfy_entrypoint() -> SCAILExtension:
|
|
return SCAILExtension()
|