From fe791ccee9bbace4cec32a5ae4a825bb16b95dd9 Mon Sep 17 00:00:00 2001 From: "Dr.Lt.Data" Date: Wed, 10 Dec 2025 18:39:02 +0900 Subject: [PATCH] improved: scanner.py, json-checker.py --- json-checker.py | 273 ++++++++++++++++++++-- scanner.py | 609 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 852 insertions(+), 30 deletions(-) diff --git a/json-checker.py b/json-checker.py index 1961ee68..c35e90eb 100644 --- a/json-checker.py +++ b/json-checker.py @@ -1,25 +1,264 @@ -import json -import argparse +#!/usr/bin/env python3 +"""JSON Entry Validator -def check_json_syntax(file_path): +Validates JSON entries based on content structure. + +Validation rules based on JSON content: +- {"custom_nodes": [...]}: Validates required fields (author, title, reference, files, install_type, description) +- {"models": [...]}: Validates JSON syntax only (no required fields) +- Other JSON structures: Validates JSON syntax only + +Git repository URL validation (for custom_nodes): +1. URLs must NOT end with .git +2. URLs must follow format: https://github.com/{author}/{reponame} +3. .py and .js files are exempt from this check + +Supported formats: +- Array format: [{...}, {...}] +- Object format: {"custom_nodes": [...]} or {"models": [...]} +""" + +import json +import re +import sys +from pathlib import Path +from typing import Dict, List, Tuple + + +# Required fields for each entry type +REQUIRED_FIELDS_CUSTOM_NODE = ['author', 'title', 'reference', 'files', 'install_type', 'description'] +REQUIRED_FIELDS_MODEL = [] # model-list.json doesn't require field validation + +# Pattern for valid GitHub repository URL (without .git suffix) +GITHUB_REPO_PATTERN = re.compile(r'^https://github\.com/[^/]+/[^/]+$') + + +def get_entry_context(entry: Dict) -> str: + """Get identifying information from entry for error messages + + Args: + entry: JSON entry + + Returns: + String with author and reference info + """ + parts = [] + if 'author' in entry: + parts.append(f"author={entry['author']}") + if 'reference' in entry: + parts.append(f"ref={entry['reference']}") + if 'title' in entry: + parts.append(f"title={entry['title']}") + + if parts: + return " | ".join(parts) + else: + # No identifying info - show actual entry content (truncated) + import json + entry_str = json.dumps(entry, ensure_ascii=False) + if len(entry_str) > 100: + entry_str = entry_str[:100] + "..." + return f"content={entry_str}" + + +def validate_required_fields(entry: Dict, entry_index: int, required_fields: List[str]) -> List[str]: + """Validate that all required fields are present + + Args: + entry: JSON entry to validate + entry_index: Index of entry in array (for error reporting) + required_fields: List of required field names + + Returns: + List of error descriptions (without entry prefix/context) + """ + errors = [] + + for field in required_fields: + if field not in entry: + errors.append(f"Missing required field '{field}'") + elif entry[field] is None: + errors.append(f"Field '{field}' is null") + elif isinstance(entry[field], str) and not entry[field].strip(): + errors.append(f"Field '{field}' is empty") + elif field == 'files' and not entry[field]: # Empty array + errors.append(f"Field 'files' is empty array") + + return errors + + +def validate_git_repo_urls(entry: Dict, entry_index: int) -> List[str]: + """Validate git repository URLs in 'files' array + + Requirements: + - Git repo URLs must NOT end with .git + - Must follow format: https://github.com/{author}/{reponame} + - .py and .js files are exempt + + Args: + entry: JSON entry to validate + entry_index: Index of entry in array (for error reporting) + + Returns: + List of error descriptions (without entry prefix/context) + """ + errors = [] + + if 'files' not in entry or not isinstance(entry['files'], list): + return errors + + for file_url in entry['files']: + if not isinstance(file_url, str): + continue + + # Skip .py and .js files - they're exempt from git repo validation + if file_url.endswith('.py') or file_url.endswith('.js'): + continue + + # Check if it's a GitHub URL (likely a git repo) + if 'github.com' in file_url: + # Error if URL ends with .git + if file_url.endswith('.git'): + errors.append(f"Git repo URL must NOT end with .git: {file_url}") + continue + + # Validate format: https://github.com/{author}/{reponame} + if not GITHUB_REPO_PATTERN.match(file_url): + errors.append(f"Invalid git repo URL format (expected https://github.com/author/reponame): {file_url}") + + return errors + + +def validate_entry(entry: Dict, entry_index: int, required_fields: List[str]) -> List[str]: + """Validate a single JSON entry + + Args: + entry: JSON entry to validate + entry_index: Index of entry in array (for error reporting) + required_fields: List of required field names + + Returns: + List of error messages (empty if valid) + """ + errors = [] + + # Check required fields + errors.extend(validate_required_fields(entry, entry_index, required_fields)) + + # Check git repository URLs + errors.extend(validate_git_repo_urls(entry, entry_index)) + + return errors + + +def validate_json_file(file_path: str) -> Tuple[bool, List[str]]: + """Validate JSON file containing entries + + Args: + file_path: Path to JSON file + + Returns: + Tuple of (is_valid, error_messages) + """ + errors = [] + + # Check file exists + path = Path(file_path) + if not path.exists(): + return False, [f"File not found: {file_path}"] + + # Load JSON try: - with open(file_path, 'r', encoding='utf-8') as file: - json_str = file.read() - json.loads(json_str) - print(f"[ OK ] {file_path}") - except UnicodeDecodeError as e: - print(f"Unicode decode error: {e}") + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) except json.JSONDecodeError as e: - print(f"[FAIL] {file_path}\n\n {e}\n") - except FileNotFoundError: - print(f"[FAIL] {file_path}\n\n File not found\n") + return False, [f"Invalid JSON: {e}"] + except Exception as e: + return False, [f"Error reading file: {e}"] + + # Determine required fields based on JSON content + required_fields = [] + + # Validate structure - support both array and object formats + entries_to_validate = [] + + if isinstance(data, list): + # Direct array format: [{...}, {...}] + entries_to_validate = data + elif isinstance(data, dict): + # Object format: {"custom_nodes": [...]} or {"models": [...]} + # Determine validation based on keys + if 'custom_nodes' in data and isinstance(data['custom_nodes'], list): + required_fields = REQUIRED_FIELDS_CUSTOM_NODE + entries_to_validate = data['custom_nodes'] + elif 'models' in data and isinstance(data['models'], list): + required_fields = REQUIRED_FIELDS_MODEL + entries_to_validate = data['models'] + else: + # Other JSON structures (extension-node-map.json, etc.) - just validate JSON syntax + return True, [] + else: + return False, ["JSON root must be either an array or an object containing arrays"] + + # Validate each entry + for idx, entry in enumerate(entries_to_validate, start=1): + if not isinstance(entry, dict): + # Show actual value for type errors + entry_str = json.dumps(entry, ensure_ascii=False) if not isinstance(entry, str) else repr(entry) + if len(entry_str) > 150: + entry_str = entry_str[:150] + "..." + errors.append(f"\n❌ Entry #{idx}: Must be an object, got {type(entry).__name__}") + errors.append(f" Actual value: {entry_str}") + continue + + entry_errors = validate_entry(entry, idx, required_fields) + if entry_errors: + # Group errors by entry with context + context = get_entry_context(entry) + errors.append(f"\n❌ Entry #{idx} ({context}):") + for error in entry_errors: + errors.append(f" - {error}") + + is_valid = len(errors) == 0 + return is_valid, errors + def main(): - parser = argparse.ArgumentParser(description="JSON File Syntax Checker") - parser.add_argument("file_path", type=str, help="Path to the JSON file for syntax checking") + """Main entry point""" + if len(sys.argv) < 2: + print("Usage: python json-checker.py ") + print("\nValidates JSON entries based on content:") + print(" - {\"custom_nodes\": [...]}: Validates required fields (author, title, reference, files, install_type, description)") + print(" - {\"models\": [...]}: Validates JSON syntax only (no required fields)") + print(" - Other JSON structures: Validates JSON syntax only") + print("\nGit repo URL validation (for custom_nodes):") + print(" - URLs must NOT end with .git") + print(" - URLs must follow: https://github.com/{author}/{reponame}") + sys.exit(1) - args = parser.parse_args() - check_json_syntax(args.file_path) + file_path = sys.argv[1] -if __name__ == "__main__": + is_valid, errors = validate_json_file(file_path) + + if is_valid: + print(f"✅ {file_path}: Validation passed") + sys.exit(0) + else: + print(f"Validating: {file_path}") + print("=" * 60) + print("❌ Validation failed!\n") + print("Errors:") + # Count actual errors (lines starting with " -") + error_count = sum(1 for e in errors if e.strip().startswith('-')) + for error in errors: + # Don't add ❌ prefix to grouped entries (they already have it) + if error.strip().startswith('❌'): + print(error) + else: + print(error) + print(f"\nTotal errors: {error_count}") + sys.exit(1) + + +if __name__ == '__main__': main() diff --git a/scanner.py b/scanner.py index 69539af9..7b7984a9 100644 --- a/scanner.py +++ b/scanner.py @@ -16,6 +16,108 @@ import sys from urllib.parse import urlparse from github import Github, Auth +from pathlib import Path +from typing import Set, Dict, Optional + +# Scanner version for cache invalidation +SCANNER_VERSION = "2.0.11" # Multi-layer detection: class existence + display names + +# Cache for extract_nodes and extract_nodes_enhanced results +_extract_nodes_cache: Dict[str, Set[str]] = {} +_extract_nodes_enhanced_cache: Dict[str, Set[str]] = {} +_file_mtime_cache: Dict[Path, float] = {} + + +def _get_repo_root(file_path: Path) -> Optional[Path]: + """Find the repository root directory containing .git""" + current = file_path if file_path.is_dir() else file_path.parent + while current != current.parent: + if (current / ".git").exists(): + return current + current = current.parent + return None + + +def _get_repo_hash(repo_path: Path) -> str: + """Get git commit hash or fallback identifier""" + git_dir = repo_path / ".git" + if not git_dir.exists(): + return "" + + try: + # Read HEAD to get current commit + head_file = git_dir / "HEAD" + if head_file.exists(): + head_content = head_file.read_text().strip() + if head_content.startswith("ref:"): + # HEAD points to a ref + ref_path = git_dir / head_content[5:].strip() + if ref_path.exists(): + commit_hash = ref_path.read_text().strip() + return commit_hash[:16] # First 16 chars + else: + # Detached HEAD + return head_content[:16] + except: + pass + + return "" + + +def _load_per_repo_cache(repo_path: Path) -> Optional[tuple]: + """Load nodes and metadata from per-repo cache + + Returns: + tuple: (nodes_set, metadata_dict) or None if cache invalid + """ + cache_file = repo_path / ".git" / "nodecache.json" + + if not cache_file.exists(): + return None + + try: + with open(cache_file, 'r') as f: + cache_data = json.load(f) + + # Verify scanner version + if cache_data.get('scanner_version') != SCANNER_VERSION: + return None + + # Verify git hash + current_hash = _get_repo_hash(repo_path) + if cache_data.get('git_hash') != current_hash: + return None + + # Return nodes and metadata + nodes = cache_data.get('nodes', []) + metadata = cache_data.get('metadata', {}) + return (set(nodes) if nodes else set(), metadata) + + except: + return None + + +def _save_per_repo_cache(repo_path: Path, all_nodes: Set[str], metadata: dict = None): + """Save nodes and metadata to per-repo cache""" + cache_file = repo_path / ".git" / "nodecache.json" + + if not cache_file.parent.exists(): + return + + git_hash = _get_repo_hash(repo_path) + cache_data = { + "scanner_version": SCANNER_VERSION, + "git_hash": git_hash, + "scanned_at": datetime.datetime.now().isoformat(), + "nodes": sorted(list(all_nodes)), + "metadata": metadata if metadata else {} + } + + try: + with open(cache_file, 'w') as f: + json.dump(cache_data, f, indent=2) + except: + pass # Silently fail - cache is optional def download_url(url, dest_folder, filename=None): @@ -51,11 +153,12 @@ Examples: # Standard mode python3 scanner.py python3 scanner.py --skip-update + python3 scanner.py --skip-all --force-rescan # Scan-only mode python3 scanner.py --scan-only temp-urls-clean.list python3 scanner.py --scan-only urls.list --temp-dir /custom/temp - python3 scanner.py --scan-only urls.list --skip-update + python3 scanner.py --scan-only urls.list --skip-update --force-rescan ''' ) @@ -69,6 +172,8 @@ Examples: help='Skip GitHub stats collection') parser.add_argument('--skip-all', action='store_true', help='Skip all update operations') + parser.add_argument('--force-rescan', action='store_true', + help='Force rescan all nodes (ignore cache)') # Backward compatibility: positional argument for temp_dir parser.add_argument('temp_dir_positional', nargs='?', metavar='TEMP_DIR', @@ -94,6 +199,11 @@ parse_cnt = 0 def extract_nodes(code_text): global parse_cnt + # Check cache first + cache_key = hash(code_text) + if cache_key in _extract_nodes_cache: + return _extract_nodes_cache[cache_key].copy() + try: if parse_cnt % 100 == 0: print(".", end="", flush=True) @@ -128,12 +238,458 @@ def extract_nodes(code_text): if key is not None and isinstance(key.value, str): s.add(key.value.strip()) + # Cache the result + _extract_nodes_cache[cache_key] = s return s else: + # Cache empty result + _extract_nodes_cache[cache_key] = set() return set() + except: + # Cache empty result on error + _extract_nodes_cache[cache_key] = set() + return set() + +def extract_nodes_from_repo(repo_path: Path, verbose: bool = False, force_rescan: bool = False) -> tuple: + """ + Extract all nodes and metadata from a repository with per-repo caching. + + Automatically caches results in .git/nodecache.json. + Cache is invalidated when: + - Git commit hash changes + - Scanner version changes + - force_rescan flag is True + + Args: + repo_path: Path to repository root + verbose: If True, print UI-only extension detection messages + force_rescan: If True, ignore cache and force fresh scan + + Returns: + tuple: (nodes_set, metadata_dict) + """ + # Ensure path is absolute + repo_path = repo_path.resolve() + + # Check per-repo cache first (unless force_rescan is True) + if not force_rescan: + cached_result = _load_per_repo_cache(repo_path) + if cached_result is not None: + return cached_result + + # Cache miss - scan all .py files + all_nodes = set() + all_metadata = {} + py_files = list(repo_path.rglob("*.py")) + + # Filter out __pycache__, .git, and other hidden directories + filtered_files = [] + for f in py_files: + try: + rel_path = f.relative_to(repo_path) + # Skip __pycache__, .git, and any directory starting with . + if '__pycache__' not in str(rel_path) and not any(part.startswith('.') for part in rel_path.parts): + filtered_files.append(f) + except: + continue + py_files = filtered_files + + for py_file in py_files: + try: + # Read file with proper encoding + with open(py_file, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + if code: + # Extract nodes using SAME logic as scan_in_file + # V1 nodes (enhanced with fallback patterns) + nodes = extract_nodes_enhanced(code, py_file, visited=set(), verbose=verbose) + all_nodes.update(nodes) + + # V3 nodes detection + v3_nodes = extract_v3_nodes(code) + all_nodes.update(v3_nodes) + + # Dict parsing - exclude commented NODE_CLASS_MAPPINGS lines + pattern = r"_CLASS_MAPPINGS\s*(?::\s*\w+\s*)?=\s*(?:\\\s*)?{([^}]*)}" + regex = re.compile(pattern, re.MULTILINE | re.DOTALL) + + for match_obj in regex.finditer(code): + # Get the line where NODE_CLASS_MAPPINGS is defined + match_start = match_obj.start() + line_start = code.rfind('\n', 0, match_start) + 1 + line_end = code.find('\n', match_start) + if line_end == -1: + line_end = len(code) + line = code[line_start:line_end] + + # Skip if line starts with # (commented) + if re.match(r'^\s*#', line): + continue + + match = match_obj.group(1) + + # Filter out commented lines from dict content + match_lines = match.split('\n') + match_filtered = '\n'.join( + line for line in match_lines + if not re.match(r'^\s*#', line) + ) + + # Extract key-value pairs with double quotes + key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", match_filtered) + for key, value in key_value_pairs: + all_nodes.add(key.strip()) + + # Extract key-value pairs with single quotes + key_value_pairs = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", match_filtered) + for key, value in key_value_pairs: + all_nodes.add(key.strip()) + + # Handle .update() pattern (AFTER comment removal) + code_cleaned = re.sub(r'^#.*?$', '', code, flags=re.MULTILINE) + + update_pattern = r"_CLASS_MAPPINGS\.update\s*\(\s*{([^}]*)}\s*\)" + update_match = re.search(update_pattern, code_cleaned, re.DOTALL) + if update_match: + update_dict_text = update_match.group(1) + # Extract key-value pairs (double quotes) + update_pairs = re.findall(r'"([^"]*)"\s*:\s*([^,\n]*)', update_dict_text) + for key, value in update_pairs: + all_nodes.add(key.strip()) + # Extract key-value pairs (single quotes) + update_pairs_single = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", update_dict_text) + for key, value in update_pairs_single: + all_nodes.add(key.strip()) + + # Additional regex patterns (AFTER comment removal) + patterns = [ + r'^[^=]*_CLASS_MAPPINGS\["(.*?)"\]', + r'^[^=]*_CLASS_MAPPINGS\[\'(.*?)\'\]', + r'@register_node\("(.+)",\s*\".+"\)', + r'"(\w+)"\s*:\s*{"class":\s*\w+\s*' + ] + + for pattern in patterns: + keys = re.findall(pattern, code_cleaned) + all_nodes.update(key.strip() for key in keys) + + # Extract metadata from this file + metadata = extract_metadata_only(str(py_file)) + all_metadata.update(metadata) + except Exception as e: + # Silently skip files that can't be read + continue + + # Save to per-repo cache + _save_per_repo_cache(repo_path, all_nodes, all_metadata) + + return (all_nodes, all_metadata) + + +def _verify_class_exists(node_name: str, code_text: str, file_path: Optional[Path] = None) -> tuple[bool, Optional[str], Optional[int]]: + """ + Verify that a node class exists and has ComfyUI node structure. + + Returns: (exists: bool, file_path: str, line_number: int) + + A valid ComfyUI node must have: + - Class definition (not commented) + - At least one of: INPUT_TYPES, RETURN_TYPES, FUNCTION method/attribute + """ + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=SyntaxWarning) + tree = ast.parse(code_text) + except: + return (False, None, None) + + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + if node.name == node_name or node.name.replace('_', '') == node_name.replace('_', ''): + # Found class definition - check if it has ComfyUI interface + has_input_types = False + has_return_types = False + has_function = False + + for item in node.body: + # Check for INPUT_TYPES method + if isinstance(item, ast.FunctionDef) and item.name == 'INPUT_TYPES': + has_input_types = True + # Check for RETURN_TYPES attribute + elif isinstance(item, ast.Assign): + for target in item.targets: + if isinstance(target, ast.Name): + if target.id == 'RETURN_TYPES': + has_return_types = True + elif target.id == 'FUNCTION': + has_function = True + # Check for FUNCTION method + elif isinstance(item, ast.FunctionDef): + has_function = True + + # Valid if has any ComfyUI signature + if has_input_types or has_return_types or has_function: + file_str = str(file_path) if file_path else None + return (True, file_str, node.lineno) + + return (False, None, None) + + +def _extract_display_name_mappings(code_text: str) -> Set[str]: + """ + Extract node names from NODE_DISPLAY_NAME_MAPPINGS. + + Pattern: + NODE_DISPLAY_NAME_MAPPINGS = { + "node_key": "Display Name", + ... + } + + Returns: + Set of node keys from NODE_DISPLAY_NAME_MAPPINGS + """ + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=SyntaxWarning) + tree = ast.parse(code_text) except: return set() + nodes = set() + + for node in tree.body: + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == 'NODE_DISPLAY_NAME_MAPPINGS': + if isinstance(node.value, ast.Dict): + for key in node.value.keys: + if isinstance(key, ast.Constant) and isinstance(key.value, str): + nodes.add(key.value.strip()) + + return nodes + + +def extract_nodes_enhanced( + code_text: str, + file_path: Optional[Path] = None, + visited: Optional[Set[Path]] = None, + verbose: bool = False +) -> Set[str]: + """ + Enhanced node extraction with multi-layer detection system. + + Scanner 2.0.11 - Comprehensive detection strategy: + - Phase 1: NODE_CLASS_MAPPINGS dict literal + - Phase 2: Class.NAME attribute access (e.g., FreeChat.NAME) + - Phase 3: Item assignment (NODE_CLASS_MAPPINGS["key"] = value) + - Phase 4: Class existence verification (detects active classes even if registration commented) + - Phase 5: NODE_DISPLAY_NAME_MAPPINGS cross-reference + - Phase 6: Empty dict detection (UI-only extensions, logging only) + + Fixed Bugs: + - Scanner 2.0.9: Fallback cascade prevented Phase 3 execution + - Scanner 2.0.10: Missed active classes with commented registrations (15 false negatives) + + Args: + code_text: Python source code + file_path: Path to file (for logging and caching) + visited: Visited paths (for circular import prevention) + verbose: If True, print UI-only extension detection messages + + Returns: + Set of node names (union of all detected patterns) + """ + # Check file-based cache if file_path provided + if file_path is not None: + try: + file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path + if file_path_obj.exists(): + current_mtime = file_path_obj.stat().st_mtime + + # Check if we have cached result with matching mtime and scanner version + if file_path_obj in _file_mtime_cache: + cached_mtime = _file_mtime_cache[file_path_obj] + cache_key = (str(file_path_obj), cached_mtime, SCANNER_VERSION) + + if current_mtime == cached_mtime and cache_key in _extract_nodes_enhanced_cache: + return _extract_nodes_enhanced_cache[cache_key].copy() + except: + pass # Ignore cache errors, proceed with normal execution + + # Suppress warnings from AST parsing + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=SyntaxWarning) + warnings.filterwarnings('ignore', category=DeprecationWarning) + + # Phase 1: Original extract_nodes() - dict literal + phase1_nodes = extract_nodes(code_text) + + # Phase 2: Class.NAME pattern + if visited is None: + visited = set() + phase2_nodes = _fallback_classname_resolver(code_text, file_path) + + # Phase 3: Item assignment pattern + phase3_nodes = _fallback_item_assignment(code_text) + + # Phase 4: NODE_DISPLAY_NAME_MAPPINGS cross-reference (NEW in 2.0.11) + # This catches nodes that are in display names but not in NODE_CLASS_MAPPINGS + phase4_nodes = _extract_display_name_mappings(code_text) + + # Phase 5: Class existence verification ONLY for display name candidates (NEW in 2.0.11) + # This phase is CONSERVATIVE - only verify classes that appear in display names + # This catches the specific Scanner 2.0.10 bug pattern: + # - NODE_CLASS_MAPPINGS registration is commented + # - NODE_DISPLAY_NAME_MAPPINGS still has the entry + # - Class implementation exists + # Example: Bjornulf_ollamaLoader in Bjornulf_custom_nodes + phase5_nodes = set() + for node_name in phase4_nodes: + # Only check classes that appear in display names but not in registrations + if node_name not in (phase1_nodes | phase2_nodes | phase3_nodes): + exists, _, _ = _verify_class_exists(node_name, code_text, file_path) + if exists: + phase5_nodes.add(node_name) + + # Union all results (FIX: Scanner 2.0.9 bug + Scanner 2.0.10 bug) + # 2.0.9: Used early return which missed Phase 3 nodes + # 2.0.10: Only checked registrations, missed classes referenced in display names + all_nodes = phase1_nodes | phase2_nodes | phase3_nodes | phase4_nodes | phase5_nodes + + # Phase 6: Empty dict detector (logging only, doesn't add nodes) + if not all_nodes: + _fallback_empty_dict_detector(code_text, file_path, verbose) + + # Cache the result + if file_path is not None: + try: + file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path + if file_path_obj.exists(): + current_mtime = file_path_obj.stat().st_mtime + cache_key = (str(file_path_obj), current_mtime, SCANNER_VERSION) + _extract_nodes_enhanced_cache[cache_key] = all_nodes + _file_mtime_cache[file_path_obj] = current_mtime + except: + pass + + return all_nodes + + +def _fallback_classname_resolver(code_text: str, file_path: Optional[Path]) -> Set[str]: + """ + Detect Class.NAME pattern in NODE_CLASS_MAPPINGS. + + Pattern: + NODE_CLASS_MAPPINGS = { + FreeChat.NAME: FreeChat, + PaidChat.NAME: PaidChat + } + """ + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=SyntaxWarning) + parsed = ast.parse(code_text) + except: + return set() + + nodes = set() + + for node in parsed.body: + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == 'NODE_CLASS_MAPPINGS': + if isinstance(node.value, ast.Dict): + for key in node.value.keys: + # Detect Class.NAME pattern + if isinstance(key, ast.Attribute): + if isinstance(key.value, ast.Name): + # Use class name as node name + nodes.add(key.value.id) + # Also handle literal strings + elif isinstance(key, ast.Constant) and isinstance(key.value, str): + nodes.add(key.value.strip()) + + return nodes + + +def _fallback_item_assignment(code_text: str) -> Set[str]: + """ + Detect item assignment pattern. + + Pattern: + NODE_CLASS_MAPPINGS = {} + NODE_CLASS_MAPPINGS["MyNode"] = MyNode + """ + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=SyntaxWarning) + parsed = ast.parse(code_text) + except: + return set() + + nodes = set() + + for node in ast.walk(parsed): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Subscript): + if (isinstance(target.value, ast.Name) and + target.value.id in ['NODE_CLASS_MAPPINGS', 'NODE_CONFIG']): + # Extract key + if isinstance(target.slice, ast.Constant): + if isinstance(target.slice.value, str): + nodes.add(target.slice.value) + + return nodes + + +def _extract_repo_name(file_path: Path) -> str: + """ + Extract repository name from file path. + + Path structure: /home/rho/.tmp/analysis/temp/{author}_{reponame}/{path/to/file.py} + Returns: {author}_{reponame} or filename if extraction fails + """ + try: + parts = file_path.parts + # Find 'temp' directory in path + if 'temp' in parts: + temp_idx = parts.index('temp') + if temp_idx + 1 < len(parts): + # Next part after 'temp' is the repo directory + return parts[temp_idx + 1] + except (ValueError, IndexError): + pass + + # Fallback to filename if extraction fails + return file_path.name if hasattr(file_path, 'name') else str(file_path) + + +def _fallback_empty_dict_detector(code_text: str, file_path: Optional[Path], verbose: bool = False) -> None: + """ + Detect empty NODE_CLASS_MAPPINGS (UI-only extensions). + Logs for documentation purposes only (when verbose=True). + + Args: + code_text: Python source code to analyze + file_path: Path to the file being analyzed + verbose: If True, print detection messages + """ + empty_patterns = [ + 'NODE_CLASS_MAPPINGS = {}', + 'NODE_CLASS_MAPPINGS={}', + ] + + code_normalized = code_text.replace(' ', '').replace('\n', '') + + for pattern in empty_patterns: + pattern_normalized = pattern.replace(' ', '') + if pattern_normalized in code_normalized: + if file_path and verbose: + repo_name = _extract_repo_name(file_path) + print(f"Info: UI-only extension (empty NODE_CLASS_MAPPINGS): {repo_name}") + return def has_comfy_node_base(class_node): """Check if class inherits from io.ComfyNode or ComfyNode""" @@ -229,6 +785,25 @@ def extract_v3_nodes(code_text): # scan +def extract_metadata_only(filename): + """Extract only metadata (@author, @title, etc) without node scanning""" + try: + with open(filename, encoding='utf-8', errors='ignore') as file: + code = file.read() + + metadata = {} + lines = code.strip().split('\n') + for line in lines: + if line.startswith('@'): + if line.startswith("@author:") or line.startswith("@title:") or line.startswith("@nickname:") or line.startswith("@description:"): + key, value = line[1:].strip().split(':', 1) + metadata[key.strip()] = value.strip() + + return metadata + except: + return {} + + def scan_in_file(filename, is_builtin=False): global builtin_nodes @@ -242,8 +817,8 @@ def scan_in_file(filename, is_builtin=False): nodes = set() class_dict = {} - # V1 nodes detection - nodes |= extract_nodes(code) + # V1 nodes detection (enhanced with fallback patterns) + nodes |= extract_nodes_enhanced(code, file_path=Path(filename), visited=set()) # V3 nodes detection nodes |= extract_v3_nodes(code) @@ -620,13 +1195,14 @@ def update_custom_nodes(scan_only_mode=False, url_list_file=None): return node_info -def gen_json(node_info, scan_only_mode=False): +def gen_json(node_info, scan_only_mode=False, force_rescan=False): """ Generate extension-node-map.json from scanned node information Args: node_info (dict): Repository metadata mapping scan_only_mode (bool): If True, exclude metadata from output + force_rescan (bool): If True, ignore cache and force rescan all nodes """ # scan from .py file node_files, node_dirs = get_nodes(temp_dir) @@ -642,13 +1218,17 @@ def gen_json(node_info, scan_only_mode=False): py_files = get_py_file_paths(dirname) metadata = {} - nodes = set() - for py in py_files: - nodes_in_file, metadata_in_file = scan_in_file(py, dirname == "ComfyUI") - nodes.update(nodes_in_file) - # Include metadata from .py files in both modes - metadata.update(metadata_in_file) - + # Use per-repo cache for node AND metadata extraction + try: + nodes, metadata = extract_nodes_from_repo(Path(dirname), verbose=False, force_rescan=force_rescan) + except: + # Fallback to file-by-file scanning if extract_nodes_from_repo fails + nodes = set() + for py in py_files: + nodes_in_file, metadata_in_file = scan_in_file(py, dirname == "ComfyUI") + nodes.update(nodes_in_file) + metadata.update(metadata_in_file) + dirname = os.path.basename(dirname) if 'Jovimetrix' in dirname: @@ -810,11 +1390,14 @@ if __name__ == "__main__": print("\n# Generating 'extension-node-map.json'...\n") # Generate extension-node-map.json - gen_json(updated_node_info, scan_only_mode) + force_rescan = args.force_rescan if hasattr(args, 'force_rescan') else False + if force_rescan: + print("⚠️ Force rescan enabled - ignoring all cached results\n") + gen_json(updated_node_info, scan_only_mode, force_rescan) print("\n✅ DONE.\n") if scan_only_mode: print("Output: extension-node-map.json (node mappings only)") else: - print("Output: extension-node-map.json (full metadata)") \ No newline at end of file + print("Output: extension-node-map.json (full metadata)")