mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-03-02 15:57:40 +08:00
185 lines
6.8 KiB
Python
185 lines
6.8 KiB
Python
import re
|
|
import numbers
|
|
import torch
|
|
from comfy import sd1_clip
|
|
from comfy.text_encoders.qwen_image import Qwen25_7BVLITokenizer, Qwen25_7BVLIModel
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
QUOTE_PAIRS = [("'", "'"), ('"', '"'), ("\u2018", "\u2019"), ("\u201c", "\u201d")]
|
|
QUOTE_PATTERN = "|".join(
|
|
[
|
|
re.escape(q1) + r"[^" + re.escape(q1 + q2) + r"]*?" + re.escape(q2)
|
|
for q1, q2 in QUOTE_PAIRS
|
|
]
|
|
)
|
|
WORD_INTERNAL_QUOTE_RE = re.compile(r"[a-zA-Z]+'[a-zA-Z]+")
|
|
|
|
|
|
def split_quotation(prompt):
|
|
matches = WORD_INTERNAL_QUOTE_RE.findall(prompt)
|
|
mapping = []
|
|
for i, word_src in enumerate(set(matches)):
|
|
word_tgt = "longcat_$##$_longcat" * (i + 1)
|
|
prompt = prompt.replace(word_src, word_tgt)
|
|
mapping.append((word_src, word_tgt))
|
|
|
|
parts = re.split(f"({QUOTE_PATTERN})", prompt)
|
|
result = []
|
|
for part in parts:
|
|
for word_src, word_tgt in mapping:
|
|
part = part.replace(word_tgt, word_src)
|
|
if not part:
|
|
continue
|
|
is_quoted = bool(re.match(QUOTE_PATTERN, part))
|
|
result.append((part, is_quoted))
|
|
return result
|
|
|
|
|
|
class LongCatImageBaseTokenizer(Qwen25_7BVLITokenizer):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.max_length = 512
|
|
|
|
def tokenize_with_weights(self, text, return_word_ids=False, **kwargs):
|
|
parts = split_quotation(text)
|
|
all_tokens = []
|
|
for part_text, is_quoted in parts:
|
|
if is_quoted:
|
|
for char in part_text:
|
|
ids = self.tokenizer(char, add_special_tokens=False)["input_ids"]
|
|
all_tokens.extend(ids)
|
|
else:
|
|
ids = self.tokenizer(part_text, add_special_tokens=False)["input_ids"]
|
|
all_tokens.extend(ids)
|
|
|
|
if len(all_tokens) > self.max_length:
|
|
all_tokens = all_tokens[: self.max_length]
|
|
logger.warning(f"Truncated prompt to {self.max_length} tokens")
|
|
|
|
output = [(t, 1.0) for t in all_tokens]
|
|
# Pad to max length
|
|
self.pad_tokens(output, self.max_length - len(output))
|
|
return [output]
|
|
|
|
|
|
class LongCatImageTokenizer(sd1_clip.SD1Tokenizer):
|
|
def __init__(self, embedding_directory=None, tokenizer_data={}):
|
|
super().__init__(
|
|
embedding_directory=embedding_directory,
|
|
tokenizer_data=tokenizer_data,
|
|
name="qwen25_7b",
|
|
tokenizer=LongCatImageBaseTokenizer,
|
|
)
|
|
self.longcat_template_prefix = "<|im_start|>system\nAs an image captioning expert, generate a descriptive text prompt based on an image content, suitable for input to a text-to-image model.<|im_end|>\n<|im_start|>user\n"
|
|
self.longcat_template_suffix = "<|im_end|>\n<|im_start|>assistant\n"
|
|
|
|
def tokenize_with_weights(self, text, return_word_ids=False, **kwargs):
|
|
skip_template = False
|
|
if text.startswith("<|im_start|>"):
|
|
skip_template = True
|
|
if text.startswith("<|start_header_id|>"):
|
|
skip_template = True
|
|
if text == "":
|
|
text = " "
|
|
|
|
base_tok = getattr(self, "qwen25_7b")
|
|
if skip_template:
|
|
tokens = super().tokenize_with_weights(
|
|
text, return_word_ids=return_word_ids, disable_weights=True, **kwargs
|
|
)
|
|
else:
|
|
prefix_ids = base_tok.tokenizer(
|
|
self.longcat_template_prefix, add_special_tokens=False
|
|
)["input_ids"]
|
|
suffix_ids = base_tok.tokenizer(
|
|
self.longcat_template_suffix, add_special_tokens=False
|
|
)["input_ids"]
|
|
|
|
prompt_tokens = base_tok.tokenize_with_weights(
|
|
text, return_word_ids=return_word_ids, **kwargs
|
|
)
|
|
prompt_pairs = prompt_tokens[0]
|
|
|
|
prefix_pairs = [(t, 1.0) for t in prefix_ids]
|
|
suffix_pairs = [(t, 1.0) for t in suffix_ids]
|
|
|
|
combined = prefix_pairs + prompt_pairs + suffix_pairs
|
|
tokens = {"qwen25_7b": [combined]}
|
|
|
|
return tokens
|
|
|
|
|
|
class LongCatImageTEModel(sd1_clip.SD1ClipModel):
|
|
def __init__(self, device="cpu", dtype=None, model_options={}):
|
|
super().__init__(
|
|
device=device,
|
|
dtype=dtype,
|
|
name="qwen25_7b",
|
|
clip_model=Qwen25_7BVLIModel,
|
|
model_options=model_options,
|
|
)
|
|
|
|
def encode_token_weights(self, token_weight_pairs, template_end=-1):
|
|
out, pooled, extra = super().encode_token_weights(token_weight_pairs)
|
|
tok_pairs = token_weight_pairs["qwen25_7b"][0]
|
|
count_im_start = 0
|
|
if template_end == -1:
|
|
for i, v in enumerate(tok_pairs):
|
|
elem = v[0]
|
|
if not torch.is_tensor(elem):
|
|
if isinstance(elem, numbers.Integral):
|
|
if elem == 151644 and count_im_start < 2:
|
|
template_end = i
|
|
count_im_start += 1
|
|
|
|
if out.shape[1] > (template_end + 3):
|
|
if tok_pairs[template_end + 1][0] == 872:
|
|
if tok_pairs[template_end + 2][0] == 198:
|
|
template_end += 3
|
|
|
|
if template_end == -1:
|
|
template_end = 0
|
|
|
|
suffix_start = None
|
|
for i in range(len(tok_pairs) - 1, -1, -1):
|
|
elem = tok_pairs[i][0]
|
|
if not torch.is_tensor(elem) and isinstance(elem, numbers.Integral):
|
|
if elem == 151645:
|
|
suffix_start = i
|
|
break
|
|
|
|
out = out[:, template_end:]
|
|
|
|
if "attention_mask" in extra:
|
|
extra["attention_mask"] = extra["attention_mask"][:, template_end:]
|
|
if extra["attention_mask"].sum() == torch.numel(extra["attention_mask"]):
|
|
extra.pop("attention_mask")
|
|
|
|
if suffix_start is not None:
|
|
suffix_len = len(tok_pairs) - suffix_start
|
|
if suffix_len > 0 and out.shape[1] > suffix_len:
|
|
out = out[:, :-suffix_len]
|
|
if "attention_mask" in extra:
|
|
extra["attention_mask"] = extra["attention_mask"][:, :-suffix_len]
|
|
if extra["attention_mask"].sum() == torch.numel(
|
|
extra["attention_mask"]
|
|
):
|
|
extra.pop("attention_mask")
|
|
|
|
return out, pooled, extra
|
|
|
|
|
|
def te(dtype_llama=None, llama_quantization_metadata=None):
|
|
class LongCatImageTEModel_(LongCatImageTEModel):
|
|
def __init__(self, device="cpu", dtype=None, model_options={}):
|
|
if llama_quantization_metadata is not None:
|
|
model_options = model_options.copy()
|
|
model_options["quantization_metadata"] = llama_quantization_metadata
|
|
if dtype_llama is not None:
|
|
dtype = dtype_llama
|
|
super().__init__(device=device, dtype=dtype, model_options=model_options)
|
|
|
|
return LongCatImageTEModel_
|