mirror of
https://github.com/Comfy-Org/ComfyUI-Manager.git
synced 2025-12-15 01:27:05 +08:00
improved: scanner.py, json-checker.py
This commit is contained in:
parent
414557eee0
commit
fe791ccee9
273
json-checker.py
273
json-checker.py
@ -1,25 +1,264 @@
|
||||
import json
|
||||
import argparse
|
||||
#!/usr/bin/env python3
|
||||
"""JSON Entry Validator
|
||||
|
||||
def check_json_syntax(file_path):
|
||||
Validates JSON entries based on content structure.
|
||||
|
||||
Validation rules based on JSON content:
|
||||
- {"custom_nodes": [...]}: Validates required fields (author, title, reference, files, install_type, description)
|
||||
- {"models": [...]}: Validates JSON syntax only (no required fields)
|
||||
- Other JSON structures: Validates JSON syntax only
|
||||
|
||||
Git repository URL validation (for custom_nodes):
|
||||
1. URLs must NOT end with .git
|
||||
2. URLs must follow format: https://github.com/{author}/{reponame}
|
||||
3. .py and .js files are exempt from this check
|
||||
|
||||
Supported formats:
|
||||
- Array format: [{...}, {...}]
|
||||
- Object format: {"custom_nodes": [...]} or {"models": [...]}
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
|
||||
# Required fields for each entry type
|
||||
REQUIRED_FIELDS_CUSTOM_NODE = ['author', 'title', 'reference', 'files', 'install_type', 'description']
|
||||
REQUIRED_FIELDS_MODEL = [] # model-list.json doesn't require field validation
|
||||
|
||||
# Pattern for valid GitHub repository URL (without .git suffix)
|
||||
GITHUB_REPO_PATTERN = re.compile(r'^https://github\.com/[^/]+/[^/]+$')
|
||||
|
||||
|
||||
def get_entry_context(entry: Dict) -> str:
|
||||
"""Get identifying information from entry for error messages
|
||||
|
||||
Args:
|
||||
entry: JSON entry
|
||||
|
||||
Returns:
|
||||
String with author and reference info
|
||||
"""
|
||||
parts = []
|
||||
if 'author' in entry:
|
||||
parts.append(f"author={entry['author']}")
|
||||
if 'reference' in entry:
|
||||
parts.append(f"ref={entry['reference']}")
|
||||
if 'title' in entry:
|
||||
parts.append(f"title={entry['title']}")
|
||||
|
||||
if parts:
|
||||
return " | ".join(parts)
|
||||
else:
|
||||
# No identifying info - show actual entry content (truncated)
|
||||
import json
|
||||
entry_str = json.dumps(entry, ensure_ascii=False)
|
||||
if len(entry_str) > 100:
|
||||
entry_str = entry_str[:100] + "..."
|
||||
return f"content={entry_str}"
|
||||
|
||||
|
||||
def validate_required_fields(entry: Dict, entry_index: int, required_fields: List[str]) -> List[str]:
|
||||
"""Validate that all required fields are present
|
||||
|
||||
Args:
|
||||
entry: JSON entry to validate
|
||||
entry_index: Index of entry in array (for error reporting)
|
||||
required_fields: List of required field names
|
||||
|
||||
Returns:
|
||||
List of error descriptions (without entry prefix/context)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
for field in required_fields:
|
||||
if field not in entry:
|
||||
errors.append(f"Missing required field '{field}'")
|
||||
elif entry[field] is None:
|
||||
errors.append(f"Field '{field}' is null")
|
||||
elif isinstance(entry[field], str) and not entry[field].strip():
|
||||
errors.append(f"Field '{field}' is empty")
|
||||
elif field == 'files' and not entry[field]: # Empty array
|
||||
errors.append(f"Field 'files' is empty array")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_git_repo_urls(entry: Dict, entry_index: int) -> List[str]:
|
||||
"""Validate git repository URLs in 'files' array
|
||||
|
||||
Requirements:
|
||||
- Git repo URLs must NOT end with .git
|
||||
- Must follow format: https://github.com/{author}/{reponame}
|
||||
- .py and .js files are exempt
|
||||
|
||||
Args:
|
||||
entry: JSON entry to validate
|
||||
entry_index: Index of entry in array (for error reporting)
|
||||
|
||||
Returns:
|
||||
List of error descriptions (without entry prefix/context)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
if 'files' not in entry or not isinstance(entry['files'], list):
|
||||
return errors
|
||||
|
||||
for file_url in entry['files']:
|
||||
if not isinstance(file_url, str):
|
||||
continue
|
||||
|
||||
# Skip .py and .js files - they're exempt from git repo validation
|
||||
if file_url.endswith('.py') or file_url.endswith('.js'):
|
||||
continue
|
||||
|
||||
# Check if it's a GitHub URL (likely a git repo)
|
||||
if 'github.com' in file_url:
|
||||
# Error if URL ends with .git
|
||||
if file_url.endswith('.git'):
|
||||
errors.append(f"Git repo URL must NOT end with .git: {file_url}")
|
||||
continue
|
||||
|
||||
# Validate format: https://github.com/{author}/{reponame}
|
||||
if not GITHUB_REPO_PATTERN.match(file_url):
|
||||
errors.append(f"Invalid git repo URL format (expected https://github.com/author/reponame): {file_url}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_entry(entry: Dict, entry_index: int, required_fields: List[str]) -> List[str]:
|
||||
"""Validate a single JSON entry
|
||||
|
||||
Args:
|
||||
entry: JSON entry to validate
|
||||
entry_index: Index of entry in array (for error reporting)
|
||||
required_fields: List of required field names
|
||||
|
||||
Returns:
|
||||
List of error messages (empty if valid)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
# Check required fields
|
||||
errors.extend(validate_required_fields(entry, entry_index, required_fields))
|
||||
|
||||
# Check git repository URLs
|
||||
errors.extend(validate_git_repo_urls(entry, entry_index))
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_json_file(file_path: str) -> Tuple[bool, List[str]]:
|
||||
"""Validate JSON file containing entries
|
||||
|
||||
Args:
|
||||
file_path: Path to JSON file
|
||||
|
||||
Returns:
|
||||
Tuple of (is_valid, error_messages)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
# Check file exists
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
return False, [f"File not found: {file_path}"]
|
||||
|
||||
# Load JSON
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
json_str = file.read()
|
||||
json.loads(json_str)
|
||||
print(f"[ OK ] {file_path}")
|
||||
except UnicodeDecodeError as e:
|
||||
print(f"Unicode decode error: {e}")
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"[FAIL] {file_path}\n\n {e}\n")
|
||||
except FileNotFoundError:
|
||||
print(f"[FAIL] {file_path}\n\n File not found\n")
|
||||
return False, [f"Invalid JSON: {e}"]
|
||||
except Exception as e:
|
||||
return False, [f"Error reading file: {e}"]
|
||||
|
||||
# Determine required fields based on JSON content
|
||||
required_fields = []
|
||||
|
||||
# Validate structure - support both array and object formats
|
||||
entries_to_validate = []
|
||||
|
||||
if isinstance(data, list):
|
||||
# Direct array format: [{...}, {...}]
|
||||
entries_to_validate = data
|
||||
elif isinstance(data, dict):
|
||||
# Object format: {"custom_nodes": [...]} or {"models": [...]}
|
||||
# Determine validation based on keys
|
||||
if 'custom_nodes' in data and isinstance(data['custom_nodes'], list):
|
||||
required_fields = REQUIRED_FIELDS_CUSTOM_NODE
|
||||
entries_to_validate = data['custom_nodes']
|
||||
elif 'models' in data and isinstance(data['models'], list):
|
||||
required_fields = REQUIRED_FIELDS_MODEL
|
||||
entries_to_validate = data['models']
|
||||
else:
|
||||
# Other JSON structures (extension-node-map.json, etc.) - just validate JSON syntax
|
||||
return True, []
|
||||
else:
|
||||
return False, ["JSON root must be either an array or an object containing arrays"]
|
||||
|
||||
# Validate each entry
|
||||
for idx, entry in enumerate(entries_to_validate, start=1):
|
||||
if not isinstance(entry, dict):
|
||||
# Show actual value for type errors
|
||||
entry_str = json.dumps(entry, ensure_ascii=False) if not isinstance(entry, str) else repr(entry)
|
||||
if len(entry_str) > 150:
|
||||
entry_str = entry_str[:150] + "..."
|
||||
errors.append(f"\n❌ Entry #{idx}: Must be an object, got {type(entry).__name__}")
|
||||
errors.append(f" Actual value: {entry_str}")
|
||||
continue
|
||||
|
||||
entry_errors = validate_entry(entry, idx, required_fields)
|
||||
if entry_errors:
|
||||
# Group errors by entry with context
|
||||
context = get_entry_context(entry)
|
||||
errors.append(f"\n❌ Entry #{idx} ({context}):")
|
||||
for error in entry_errors:
|
||||
errors.append(f" - {error}")
|
||||
|
||||
is_valid = len(errors) == 0
|
||||
return is_valid, errors
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="JSON File Syntax Checker")
|
||||
parser.add_argument("file_path", type=str, help="Path to the JSON file for syntax checking")
|
||||
"""Main entry point"""
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python json-checker.py <json-file>")
|
||||
print("\nValidates JSON entries based on content:")
|
||||
print(" - {\"custom_nodes\": [...]}: Validates required fields (author, title, reference, files, install_type, description)")
|
||||
print(" - {\"models\": [...]}: Validates JSON syntax only (no required fields)")
|
||||
print(" - Other JSON structures: Validates JSON syntax only")
|
||||
print("\nGit repo URL validation (for custom_nodes):")
|
||||
print(" - URLs must NOT end with .git")
|
||||
print(" - URLs must follow: https://github.com/{author}/{reponame}")
|
||||
sys.exit(1)
|
||||
|
||||
args = parser.parse_args()
|
||||
check_json_syntax(args.file_path)
|
||||
file_path = sys.argv[1]
|
||||
|
||||
if __name__ == "__main__":
|
||||
is_valid, errors = validate_json_file(file_path)
|
||||
|
||||
if is_valid:
|
||||
print(f"✅ {file_path}: Validation passed")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"Validating: {file_path}")
|
||||
print("=" * 60)
|
||||
print("❌ Validation failed!\n")
|
||||
print("Errors:")
|
||||
# Count actual errors (lines starting with " -")
|
||||
error_count = sum(1 for e in errors if e.strip().startswith('-'))
|
||||
for error in errors:
|
||||
# Don't add ❌ prefix to grouped entries (they already have it)
|
||||
if error.strip().startswith('❌'):
|
||||
print(error)
|
||||
else:
|
||||
print(error)
|
||||
print(f"\nTotal errors: {error_count}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
609
scanner.py
609
scanner.py
@ -16,6 +16,108 @@ import sys
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from github import Github, Auth
|
||||
from pathlib import Path
|
||||
from typing import Set, Dict, Optional
|
||||
|
||||
# Scanner version for cache invalidation
|
||||
SCANNER_VERSION = "2.0.11" # Multi-layer detection: class existence + display names
|
||||
|
||||
# Cache for extract_nodes and extract_nodes_enhanced results
|
||||
_extract_nodes_cache: Dict[str, Set[str]] = {}
|
||||
_extract_nodes_enhanced_cache: Dict[str, Set[str]] = {}
|
||||
_file_mtime_cache: Dict[Path, float] = {}
|
||||
|
||||
|
||||
def _get_repo_root(file_path: Path) -> Optional[Path]:
|
||||
"""Find the repository root directory containing .git"""
|
||||
current = file_path if file_path.is_dir() else file_path.parent
|
||||
while current != current.parent:
|
||||
if (current / ".git").exists():
|
||||
return current
|
||||
current = current.parent
|
||||
return None
|
||||
|
||||
|
||||
def _get_repo_hash(repo_path: Path) -> str:
|
||||
"""Get git commit hash or fallback identifier"""
|
||||
git_dir = repo_path / ".git"
|
||||
if not git_dir.exists():
|
||||
return ""
|
||||
|
||||
try:
|
||||
# Read HEAD to get current commit
|
||||
head_file = git_dir / "HEAD"
|
||||
if head_file.exists():
|
||||
head_content = head_file.read_text().strip()
|
||||
if head_content.startswith("ref:"):
|
||||
# HEAD points to a ref
|
||||
ref_path = git_dir / head_content[5:].strip()
|
||||
if ref_path.exists():
|
||||
commit_hash = ref_path.read_text().strip()
|
||||
return commit_hash[:16] # First 16 chars
|
||||
else:
|
||||
# Detached HEAD
|
||||
return head_content[:16]
|
||||
except:
|
||||
pass
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def _load_per_repo_cache(repo_path: Path) -> Optional[tuple]:
|
||||
"""Load nodes and metadata from per-repo cache
|
||||
|
||||
Returns:
|
||||
tuple: (nodes_set, metadata_dict) or None if cache invalid
|
||||
"""
|
||||
cache_file = repo_path / ".git" / "nodecache.json"
|
||||
|
||||
if not cache_file.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(cache_file, 'r') as f:
|
||||
cache_data = json.load(f)
|
||||
|
||||
# Verify scanner version
|
||||
if cache_data.get('scanner_version') != SCANNER_VERSION:
|
||||
return None
|
||||
|
||||
# Verify git hash
|
||||
current_hash = _get_repo_hash(repo_path)
|
||||
if cache_data.get('git_hash') != current_hash:
|
||||
return None
|
||||
|
||||
# Return nodes and metadata
|
||||
nodes = cache_data.get('nodes', [])
|
||||
metadata = cache_data.get('metadata', {})
|
||||
return (set(nodes) if nodes else set(), metadata)
|
||||
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def _save_per_repo_cache(repo_path: Path, all_nodes: Set[str], metadata: dict = None):
|
||||
"""Save nodes and metadata to per-repo cache"""
|
||||
cache_file = repo_path / ".git" / "nodecache.json"
|
||||
|
||||
if not cache_file.parent.exists():
|
||||
return
|
||||
|
||||
git_hash = _get_repo_hash(repo_path)
|
||||
cache_data = {
|
||||
"scanner_version": SCANNER_VERSION,
|
||||
"git_hash": git_hash,
|
||||
"scanned_at": datetime.datetime.now().isoformat(),
|
||||
"nodes": sorted(list(all_nodes)),
|
||||
"metadata": metadata if metadata else {}
|
||||
}
|
||||
|
||||
try:
|
||||
with open(cache_file, 'w') as f:
|
||||
json.dump(cache_data, f, indent=2)
|
||||
except:
|
||||
pass # Silently fail - cache is optional
|
||||
|
||||
|
||||
def download_url(url, dest_folder, filename=None):
|
||||
@ -51,11 +153,12 @@ Examples:
|
||||
# Standard mode
|
||||
python3 scanner.py
|
||||
python3 scanner.py --skip-update
|
||||
python3 scanner.py --skip-all --force-rescan
|
||||
|
||||
# Scan-only mode
|
||||
python3 scanner.py --scan-only temp-urls-clean.list
|
||||
python3 scanner.py --scan-only urls.list --temp-dir /custom/temp
|
||||
python3 scanner.py --scan-only urls.list --skip-update
|
||||
python3 scanner.py --scan-only urls.list --skip-update --force-rescan
|
||||
'''
|
||||
)
|
||||
|
||||
@ -69,6 +172,8 @@ Examples:
|
||||
help='Skip GitHub stats collection')
|
||||
parser.add_argument('--skip-all', action='store_true',
|
||||
help='Skip all update operations')
|
||||
parser.add_argument('--force-rescan', action='store_true',
|
||||
help='Force rescan all nodes (ignore cache)')
|
||||
|
||||
# Backward compatibility: positional argument for temp_dir
|
||||
parser.add_argument('temp_dir_positional', nargs='?', metavar='TEMP_DIR',
|
||||
@ -94,6 +199,11 @@ parse_cnt = 0
|
||||
def extract_nodes(code_text):
|
||||
global parse_cnt
|
||||
|
||||
# Check cache first
|
||||
cache_key = hash(code_text)
|
||||
if cache_key in _extract_nodes_cache:
|
||||
return _extract_nodes_cache[cache_key].copy()
|
||||
|
||||
try:
|
||||
if parse_cnt % 100 == 0:
|
||||
print(".", end="", flush=True)
|
||||
@ -128,12 +238,458 @@ def extract_nodes(code_text):
|
||||
if key is not None and isinstance(key.value, str):
|
||||
s.add(key.value.strip())
|
||||
|
||||
# Cache the result
|
||||
_extract_nodes_cache[cache_key] = s
|
||||
return s
|
||||
else:
|
||||
# Cache empty result
|
||||
_extract_nodes_cache[cache_key] = set()
|
||||
return set()
|
||||
except:
|
||||
# Cache empty result on error
|
||||
_extract_nodes_cache[cache_key] = set()
|
||||
return set()
|
||||
|
||||
def extract_nodes_from_repo(repo_path: Path, verbose: bool = False, force_rescan: bool = False) -> tuple:
|
||||
"""
|
||||
Extract all nodes and metadata from a repository with per-repo caching.
|
||||
|
||||
Automatically caches results in .git/nodecache.json.
|
||||
Cache is invalidated when:
|
||||
- Git commit hash changes
|
||||
- Scanner version changes
|
||||
- force_rescan flag is True
|
||||
|
||||
Args:
|
||||
repo_path: Path to repository root
|
||||
verbose: If True, print UI-only extension detection messages
|
||||
force_rescan: If True, ignore cache and force fresh scan
|
||||
|
||||
Returns:
|
||||
tuple: (nodes_set, metadata_dict)
|
||||
"""
|
||||
# Ensure path is absolute
|
||||
repo_path = repo_path.resolve()
|
||||
|
||||
# Check per-repo cache first (unless force_rescan is True)
|
||||
if not force_rescan:
|
||||
cached_result = _load_per_repo_cache(repo_path)
|
||||
if cached_result is not None:
|
||||
return cached_result
|
||||
|
||||
# Cache miss - scan all .py files
|
||||
all_nodes = set()
|
||||
all_metadata = {}
|
||||
py_files = list(repo_path.rglob("*.py"))
|
||||
|
||||
# Filter out __pycache__, .git, and other hidden directories
|
||||
filtered_files = []
|
||||
for f in py_files:
|
||||
try:
|
||||
rel_path = f.relative_to(repo_path)
|
||||
# Skip __pycache__, .git, and any directory starting with .
|
||||
if '__pycache__' not in str(rel_path) and not any(part.startswith('.') for part in rel_path.parts):
|
||||
filtered_files.append(f)
|
||||
except:
|
||||
continue
|
||||
py_files = filtered_files
|
||||
|
||||
for py_file in py_files:
|
||||
try:
|
||||
# Read file with proper encoding
|
||||
with open(py_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
code = f.read()
|
||||
|
||||
if code:
|
||||
# Extract nodes using SAME logic as scan_in_file
|
||||
# V1 nodes (enhanced with fallback patterns)
|
||||
nodes = extract_nodes_enhanced(code, py_file, visited=set(), verbose=verbose)
|
||||
all_nodes.update(nodes)
|
||||
|
||||
# V3 nodes detection
|
||||
v3_nodes = extract_v3_nodes(code)
|
||||
all_nodes.update(v3_nodes)
|
||||
|
||||
# Dict parsing - exclude commented NODE_CLASS_MAPPINGS lines
|
||||
pattern = r"_CLASS_MAPPINGS\s*(?::\s*\w+\s*)?=\s*(?:\\\s*)?{([^}]*)}"
|
||||
regex = re.compile(pattern, re.MULTILINE | re.DOTALL)
|
||||
|
||||
for match_obj in regex.finditer(code):
|
||||
# Get the line where NODE_CLASS_MAPPINGS is defined
|
||||
match_start = match_obj.start()
|
||||
line_start = code.rfind('\n', 0, match_start) + 1
|
||||
line_end = code.find('\n', match_start)
|
||||
if line_end == -1:
|
||||
line_end = len(code)
|
||||
line = code[line_start:line_end]
|
||||
|
||||
# Skip if line starts with # (commented)
|
||||
if re.match(r'^\s*#', line):
|
||||
continue
|
||||
|
||||
match = match_obj.group(1)
|
||||
|
||||
# Filter out commented lines from dict content
|
||||
match_lines = match.split('\n')
|
||||
match_filtered = '\n'.join(
|
||||
line for line in match_lines
|
||||
if not re.match(r'^\s*#', line)
|
||||
)
|
||||
|
||||
# Extract key-value pairs with double quotes
|
||||
key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", match_filtered)
|
||||
for key, value in key_value_pairs:
|
||||
all_nodes.add(key.strip())
|
||||
|
||||
# Extract key-value pairs with single quotes
|
||||
key_value_pairs = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", match_filtered)
|
||||
for key, value in key_value_pairs:
|
||||
all_nodes.add(key.strip())
|
||||
|
||||
# Handle .update() pattern (AFTER comment removal)
|
||||
code_cleaned = re.sub(r'^#.*?$', '', code, flags=re.MULTILINE)
|
||||
|
||||
update_pattern = r"_CLASS_MAPPINGS\.update\s*\(\s*{([^}]*)}\s*\)"
|
||||
update_match = re.search(update_pattern, code_cleaned, re.DOTALL)
|
||||
if update_match:
|
||||
update_dict_text = update_match.group(1)
|
||||
# Extract key-value pairs (double quotes)
|
||||
update_pairs = re.findall(r'"([^"]*)"\s*:\s*([^,\n]*)', update_dict_text)
|
||||
for key, value in update_pairs:
|
||||
all_nodes.add(key.strip())
|
||||
# Extract key-value pairs (single quotes)
|
||||
update_pairs_single = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", update_dict_text)
|
||||
for key, value in update_pairs_single:
|
||||
all_nodes.add(key.strip())
|
||||
|
||||
# Additional regex patterns (AFTER comment removal)
|
||||
patterns = [
|
||||
r'^[^=]*_CLASS_MAPPINGS\["(.*?)"\]',
|
||||
r'^[^=]*_CLASS_MAPPINGS\[\'(.*?)\'\]',
|
||||
r'@register_node\("(.+)",\s*\".+"\)',
|
||||
r'"(\w+)"\s*:\s*{"class":\s*\w+\s*'
|
||||
]
|
||||
|
||||
for pattern in patterns:
|
||||
keys = re.findall(pattern, code_cleaned)
|
||||
all_nodes.update(key.strip() for key in keys)
|
||||
|
||||
# Extract metadata from this file
|
||||
metadata = extract_metadata_only(str(py_file))
|
||||
all_metadata.update(metadata)
|
||||
except Exception as e:
|
||||
# Silently skip files that can't be read
|
||||
continue
|
||||
|
||||
# Save to per-repo cache
|
||||
_save_per_repo_cache(repo_path, all_nodes, all_metadata)
|
||||
|
||||
return (all_nodes, all_metadata)
|
||||
|
||||
|
||||
def _verify_class_exists(node_name: str, code_text: str, file_path: Optional[Path] = None) -> tuple[bool, Optional[str], Optional[int]]:
|
||||
"""
|
||||
Verify that a node class exists and has ComfyUI node structure.
|
||||
|
||||
Returns: (exists: bool, file_path: str, line_number: int)
|
||||
|
||||
A valid ComfyUI node must have:
|
||||
- Class definition (not commented)
|
||||
- At least one of: INPUT_TYPES, RETURN_TYPES, FUNCTION method/attribute
|
||||
"""
|
||||
try:
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', category=SyntaxWarning)
|
||||
tree = ast.parse(code_text)
|
||||
except:
|
||||
return (False, None, None)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
if node.name == node_name or node.name.replace('_', '') == node_name.replace('_', ''):
|
||||
# Found class definition - check if it has ComfyUI interface
|
||||
has_input_types = False
|
||||
has_return_types = False
|
||||
has_function = False
|
||||
|
||||
for item in node.body:
|
||||
# Check for INPUT_TYPES method
|
||||
if isinstance(item, ast.FunctionDef) and item.name == 'INPUT_TYPES':
|
||||
has_input_types = True
|
||||
# Check for RETURN_TYPES attribute
|
||||
elif isinstance(item, ast.Assign):
|
||||
for target in item.targets:
|
||||
if isinstance(target, ast.Name):
|
||||
if target.id == 'RETURN_TYPES':
|
||||
has_return_types = True
|
||||
elif target.id == 'FUNCTION':
|
||||
has_function = True
|
||||
# Check for FUNCTION method
|
||||
elif isinstance(item, ast.FunctionDef):
|
||||
has_function = True
|
||||
|
||||
# Valid if has any ComfyUI signature
|
||||
if has_input_types or has_return_types or has_function:
|
||||
file_str = str(file_path) if file_path else None
|
||||
return (True, file_str, node.lineno)
|
||||
|
||||
return (False, None, None)
|
||||
|
||||
|
||||
def _extract_display_name_mappings(code_text: str) -> Set[str]:
|
||||
"""
|
||||
Extract node names from NODE_DISPLAY_NAME_MAPPINGS.
|
||||
|
||||
Pattern:
|
||||
NODE_DISPLAY_NAME_MAPPINGS = {
|
||||
"node_key": "Display Name",
|
||||
...
|
||||
}
|
||||
|
||||
Returns:
|
||||
Set of node keys from NODE_DISPLAY_NAME_MAPPINGS
|
||||
"""
|
||||
try:
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', category=SyntaxWarning)
|
||||
tree = ast.parse(code_text)
|
||||
except:
|
||||
return set()
|
||||
|
||||
nodes = set()
|
||||
|
||||
for node in tree.body:
|
||||
if isinstance(node, ast.Assign):
|
||||
for target in node.targets:
|
||||
if isinstance(target, ast.Name) and target.id == 'NODE_DISPLAY_NAME_MAPPINGS':
|
||||
if isinstance(node.value, ast.Dict):
|
||||
for key in node.value.keys:
|
||||
if isinstance(key, ast.Constant) and isinstance(key.value, str):
|
||||
nodes.add(key.value.strip())
|
||||
|
||||
return nodes
|
||||
|
||||
|
||||
def extract_nodes_enhanced(
|
||||
code_text: str,
|
||||
file_path: Optional[Path] = None,
|
||||
visited: Optional[Set[Path]] = None,
|
||||
verbose: bool = False
|
||||
) -> Set[str]:
|
||||
"""
|
||||
Enhanced node extraction with multi-layer detection system.
|
||||
|
||||
Scanner 2.0.11 - Comprehensive detection strategy:
|
||||
- Phase 1: NODE_CLASS_MAPPINGS dict literal
|
||||
- Phase 2: Class.NAME attribute access (e.g., FreeChat.NAME)
|
||||
- Phase 3: Item assignment (NODE_CLASS_MAPPINGS["key"] = value)
|
||||
- Phase 4: Class existence verification (detects active classes even if registration commented)
|
||||
- Phase 5: NODE_DISPLAY_NAME_MAPPINGS cross-reference
|
||||
- Phase 6: Empty dict detection (UI-only extensions, logging only)
|
||||
|
||||
Fixed Bugs:
|
||||
- Scanner 2.0.9: Fallback cascade prevented Phase 3 execution
|
||||
- Scanner 2.0.10: Missed active classes with commented registrations (15 false negatives)
|
||||
|
||||
Args:
|
||||
code_text: Python source code
|
||||
file_path: Path to file (for logging and caching)
|
||||
visited: Visited paths (for circular import prevention)
|
||||
verbose: If True, print UI-only extension detection messages
|
||||
|
||||
Returns:
|
||||
Set of node names (union of all detected patterns)
|
||||
"""
|
||||
# Check file-based cache if file_path provided
|
||||
if file_path is not None:
|
||||
try:
|
||||
file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path
|
||||
if file_path_obj.exists():
|
||||
current_mtime = file_path_obj.stat().st_mtime
|
||||
|
||||
# Check if we have cached result with matching mtime and scanner version
|
||||
if file_path_obj in _file_mtime_cache:
|
||||
cached_mtime = _file_mtime_cache[file_path_obj]
|
||||
cache_key = (str(file_path_obj), cached_mtime, SCANNER_VERSION)
|
||||
|
||||
if current_mtime == cached_mtime and cache_key in _extract_nodes_enhanced_cache:
|
||||
return _extract_nodes_enhanced_cache[cache_key].copy()
|
||||
except:
|
||||
pass # Ignore cache errors, proceed with normal execution
|
||||
|
||||
# Suppress warnings from AST parsing
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', category=SyntaxWarning)
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
|
||||
# Phase 1: Original extract_nodes() - dict literal
|
||||
phase1_nodes = extract_nodes(code_text)
|
||||
|
||||
# Phase 2: Class.NAME pattern
|
||||
if visited is None:
|
||||
visited = set()
|
||||
phase2_nodes = _fallback_classname_resolver(code_text, file_path)
|
||||
|
||||
# Phase 3: Item assignment pattern
|
||||
phase3_nodes = _fallback_item_assignment(code_text)
|
||||
|
||||
# Phase 4: NODE_DISPLAY_NAME_MAPPINGS cross-reference (NEW in 2.0.11)
|
||||
# This catches nodes that are in display names but not in NODE_CLASS_MAPPINGS
|
||||
phase4_nodes = _extract_display_name_mappings(code_text)
|
||||
|
||||
# Phase 5: Class existence verification ONLY for display name candidates (NEW in 2.0.11)
|
||||
# This phase is CONSERVATIVE - only verify classes that appear in display names
|
||||
# This catches the specific Scanner 2.0.10 bug pattern:
|
||||
# - NODE_CLASS_MAPPINGS registration is commented
|
||||
# - NODE_DISPLAY_NAME_MAPPINGS still has the entry
|
||||
# - Class implementation exists
|
||||
# Example: Bjornulf_ollamaLoader in Bjornulf_custom_nodes
|
||||
phase5_nodes = set()
|
||||
for node_name in phase4_nodes:
|
||||
# Only check classes that appear in display names but not in registrations
|
||||
if node_name not in (phase1_nodes | phase2_nodes | phase3_nodes):
|
||||
exists, _, _ = _verify_class_exists(node_name, code_text, file_path)
|
||||
if exists:
|
||||
phase5_nodes.add(node_name)
|
||||
|
||||
# Union all results (FIX: Scanner 2.0.9 bug + Scanner 2.0.10 bug)
|
||||
# 2.0.9: Used early return which missed Phase 3 nodes
|
||||
# 2.0.10: Only checked registrations, missed classes referenced in display names
|
||||
all_nodes = phase1_nodes | phase2_nodes | phase3_nodes | phase4_nodes | phase5_nodes
|
||||
|
||||
# Phase 6: Empty dict detector (logging only, doesn't add nodes)
|
||||
if not all_nodes:
|
||||
_fallback_empty_dict_detector(code_text, file_path, verbose)
|
||||
|
||||
# Cache the result
|
||||
if file_path is not None:
|
||||
try:
|
||||
file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path
|
||||
if file_path_obj.exists():
|
||||
current_mtime = file_path_obj.stat().st_mtime
|
||||
cache_key = (str(file_path_obj), current_mtime, SCANNER_VERSION)
|
||||
_extract_nodes_enhanced_cache[cache_key] = all_nodes
|
||||
_file_mtime_cache[file_path_obj] = current_mtime
|
||||
except:
|
||||
pass
|
||||
|
||||
return all_nodes
|
||||
|
||||
|
||||
def _fallback_classname_resolver(code_text: str, file_path: Optional[Path]) -> Set[str]:
|
||||
"""
|
||||
Detect Class.NAME pattern in NODE_CLASS_MAPPINGS.
|
||||
|
||||
Pattern:
|
||||
NODE_CLASS_MAPPINGS = {
|
||||
FreeChat.NAME: FreeChat,
|
||||
PaidChat.NAME: PaidChat
|
||||
}
|
||||
"""
|
||||
try:
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', category=SyntaxWarning)
|
||||
parsed = ast.parse(code_text)
|
||||
except:
|
||||
return set()
|
||||
|
||||
nodes = set()
|
||||
|
||||
for node in parsed.body:
|
||||
if isinstance(node, ast.Assign):
|
||||
for target in node.targets:
|
||||
if isinstance(target, ast.Name) and target.id == 'NODE_CLASS_MAPPINGS':
|
||||
if isinstance(node.value, ast.Dict):
|
||||
for key in node.value.keys:
|
||||
# Detect Class.NAME pattern
|
||||
if isinstance(key, ast.Attribute):
|
||||
if isinstance(key.value, ast.Name):
|
||||
# Use class name as node name
|
||||
nodes.add(key.value.id)
|
||||
# Also handle literal strings
|
||||
elif isinstance(key, ast.Constant) and isinstance(key.value, str):
|
||||
nodes.add(key.value.strip())
|
||||
|
||||
return nodes
|
||||
|
||||
|
||||
def _fallback_item_assignment(code_text: str) -> Set[str]:
|
||||
"""
|
||||
Detect item assignment pattern.
|
||||
|
||||
Pattern:
|
||||
NODE_CLASS_MAPPINGS = {}
|
||||
NODE_CLASS_MAPPINGS["MyNode"] = MyNode
|
||||
"""
|
||||
try:
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', category=SyntaxWarning)
|
||||
parsed = ast.parse(code_text)
|
||||
except:
|
||||
return set()
|
||||
|
||||
nodes = set()
|
||||
|
||||
for node in ast.walk(parsed):
|
||||
if isinstance(node, ast.Assign):
|
||||
for target in node.targets:
|
||||
if isinstance(target, ast.Subscript):
|
||||
if (isinstance(target.value, ast.Name) and
|
||||
target.value.id in ['NODE_CLASS_MAPPINGS', 'NODE_CONFIG']):
|
||||
# Extract key
|
||||
if isinstance(target.slice, ast.Constant):
|
||||
if isinstance(target.slice.value, str):
|
||||
nodes.add(target.slice.value)
|
||||
|
||||
return nodes
|
||||
|
||||
|
||||
def _extract_repo_name(file_path: Path) -> str:
|
||||
"""
|
||||
Extract repository name from file path.
|
||||
|
||||
Path structure: /home/rho/.tmp/analysis/temp/{author}_{reponame}/{path/to/file.py}
|
||||
Returns: {author}_{reponame} or filename if extraction fails
|
||||
"""
|
||||
try:
|
||||
parts = file_path.parts
|
||||
# Find 'temp' directory in path
|
||||
if 'temp' in parts:
|
||||
temp_idx = parts.index('temp')
|
||||
if temp_idx + 1 < len(parts):
|
||||
# Next part after 'temp' is the repo directory
|
||||
return parts[temp_idx + 1]
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
# Fallback to filename if extraction fails
|
||||
return file_path.name if hasattr(file_path, 'name') else str(file_path)
|
||||
|
||||
|
||||
def _fallback_empty_dict_detector(code_text: str, file_path: Optional[Path], verbose: bool = False) -> None:
|
||||
"""
|
||||
Detect empty NODE_CLASS_MAPPINGS (UI-only extensions).
|
||||
Logs for documentation purposes only (when verbose=True).
|
||||
|
||||
Args:
|
||||
code_text: Python source code to analyze
|
||||
file_path: Path to the file being analyzed
|
||||
verbose: If True, print detection messages
|
||||
"""
|
||||
empty_patterns = [
|
||||
'NODE_CLASS_MAPPINGS = {}',
|
||||
'NODE_CLASS_MAPPINGS={}',
|
||||
]
|
||||
|
||||
code_normalized = code_text.replace(' ', '').replace('\n', '')
|
||||
|
||||
for pattern in empty_patterns:
|
||||
pattern_normalized = pattern.replace(' ', '')
|
||||
if pattern_normalized in code_normalized:
|
||||
if file_path and verbose:
|
||||
repo_name = _extract_repo_name(file_path)
|
||||
print(f"Info: UI-only extension (empty NODE_CLASS_MAPPINGS): {repo_name}")
|
||||
return
|
||||
|
||||
def has_comfy_node_base(class_node):
|
||||
"""Check if class inherits from io.ComfyNode or ComfyNode"""
|
||||
@ -229,6 +785,25 @@ def extract_v3_nodes(code_text):
|
||||
|
||||
|
||||
# scan
|
||||
def extract_metadata_only(filename):
|
||||
"""Extract only metadata (@author, @title, etc) without node scanning"""
|
||||
try:
|
||||
with open(filename, encoding='utf-8', errors='ignore') as file:
|
||||
code = file.read()
|
||||
|
||||
metadata = {}
|
||||
lines = code.strip().split('\n')
|
||||
for line in lines:
|
||||
if line.startswith('@'):
|
||||
if line.startswith("@author:") or line.startswith("@title:") or line.startswith("@nickname:") or line.startswith("@description:"):
|
||||
key, value = line[1:].strip().split(':', 1)
|
||||
metadata[key.strip()] = value.strip()
|
||||
|
||||
return metadata
|
||||
except:
|
||||
return {}
|
||||
|
||||
|
||||
def scan_in_file(filename, is_builtin=False):
|
||||
global builtin_nodes
|
||||
|
||||
@ -242,8 +817,8 @@ def scan_in_file(filename, is_builtin=False):
|
||||
nodes = set()
|
||||
class_dict = {}
|
||||
|
||||
# V1 nodes detection
|
||||
nodes |= extract_nodes(code)
|
||||
# V1 nodes detection (enhanced with fallback patterns)
|
||||
nodes |= extract_nodes_enhanced(code, file_path=Path(filename), visited=set())
|
||||
|
||||
# V3 nodes detection
|
||||
nodes |= extract_v3_nodes(code)
|
||||
@ -620,13 +1195,14 @@ def update_custom_nodes(scan_only_mode=False, url_list_file=None):
|
||||
return node_info
|
||||
|
||||
|
||||
def gen_json(node_info, scan_only_mode=False):
|
||||
def gen_json(node_info, scan_only_mode=False, force_rescan=False):
|
||||
"""
|
||||
Generate extension-node-map.json from scanned node information
|
||||
|
||||
Args:
|
||||
node_info (dict): Repository metadata mapping
|
||||
scan_only_mode (bool): If True, exclude metadata from output
|
||||
force_rescan (bool): If True, ignore cache and force rescan all nodes
|
||||
"""
|
||||
# scan from .py file
|
||||
node_files, node_dirs = get_nodes(temp_dir)
|
||||
@ -642,13 +1218,17 @@ def gen_json(node_info, scan_only_mode=False):
|
||||
py_files = get_py_file_paths(dirname)
|
||||
metadata = {}
|
||||
|
||||
nodes = set()
|
||||
for py in py_files:
|
||||
nodes_in_file, metadata_in_file = scan_in_file(py, dirname == "ComfyUI")
|
||||
nodes.update(nodes_in_file)
|
||||
# Include metadata from .py files in both modes
|
||||
metadata.update(metadata_in_file)
|
||||
|
||||
# Use per-repo cache for node AND metadata extraction
|
||||
try:
|
||||
nodes, metadata = extract_nodes_from_repo(Path(dirname), verbose=False, force_rescan=force_rescan)
|
||||
except:
|
||||
# Fallback to file-by-file scanning if extract_nodes_from_repo fails
|
||||
nodes = set()
|
||||
for py in py_files:
|
||||
nodes_in_file, metadata_in_file = scan_in_file(py, dirname == "ComfyUI")
|
||||
nodes.update(nodes_in_file)
|
||||
metadata.update(metadata_in_file)
|
||||
|
||||
dirname = os.path.basename(dirname)
|
||||
|
||||
if 'Jovimetrix' in dirname:
|
||||
@ -810,11 +1390,14 @@ if __name__ == "__main__":
|
||||
print("\n# Generating 'extension-node-map.json'...\n")
|
||||
|
||||
# Generate extension-node-map.json
|
||||
gen_json(updated_node_info, scan_only_mode)
|
||||
force_rescan = args.force_rescan if hasattr(args, 'force_rescan') else False
|
||||
if force_rescan:
|
||||
print("⚠️ Force rescan enabled - ignoring all cached results\n")
|
||||
gen_json(updated_node_info, scan_only_mode, force_rescan)
|
||||
|
||||
print("\n✅ DONE.\n")
|
||||
|
||||
if scan_only_mode:
|
||||
print("Output: extension-node-map.json (node mappings only)")
|
||||
else:
|
||||
print("Output: extension-node-map.json (full metadata)")
|
||||
print("Output: extension-node-map.json (full metadata)")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user