import ast import re import os import json from git import Repo import concurrent import datetime import concurrent.futures import requests import warnings import argparse builtin_nodes = set() import sys from urllib.parse import urlparse from github import Github, Auth from pathlib import Path from typing import Set, Dict, Optional # Scanner version for cache invalidation SCANNER_VERSION = "2.0.11" # Multi-layer detection: class existence + display names # Cache for extract_nodes and extract_nodes_enhanced results _extract_nodes_cache: Dict[str, Set[str]] = {} _extract_nodes_enhanced_cache: Dict[str, Set[str]] = {} _file_mtime_cache: Dict[Path, float] = {} def _get_repo_root(file_path: Path) -> Optional[Path]: """Find the repository root directory containing .git""" current = file_path if file_path.is_dir() else file_path.parent while current != current.parent: if (current / ".git").exists(): return current current = current.parent return None def _get_repo_hash(repo_path: Path) -> str: """Get git commit hash or fallback identifier""" git_dir = repo_path / ".git" if not git_dir.exists(): return "" try: # Read HEAD to get current commit head_file = git_dir / "HEAD" if head_file.exists(): head_content = head_file.read_text().strip() if head_content.startswith("ref:"): # HEAD points to a ref ref_path = git_dir / head_content[5:].strip() if ref_path.exists(): commit_hash = ref_path.read_text().strip() return commit_hash[:16] # First 16 chars else: # Detached HEAD return head_content[:16] except: pass return "" def _load_per_repo_cache(repo_path: Path) -> Optional[tuple]: """Load nodes and metadata from per-repo cache Returns: tuple: (nodes_set, metadata_dict) or None if cache invalid """ cache_file = repo_path / ".git" / "nodecache.json" if not cache_file.exists(): return None try: with open(cache_file, 'r') as f: cache_data = json.load(f) # Verify scanner version if cache_data.get('scanner_version') != SCANNER_VERSION: return None # Verify git hash current_hash = _get_repo_hash(repo_path) if cache_data.get('git_hash') != current_hash: return None # Return nodes and metadata nodes = cache_data.get('nodes', []) metadata = cache_data.get('metadata', {}) return (set(nodes) if nodes else set(), metadata) except: return None def _save_per_repo_cache(repo_path: Path, all_nodes: Set[str], metadata: dict = None): """Save nodes and metadata to per-repo cache""" cache_file = repo_path / ".git" / "nodecache.json" if not cache_file.parent.exists(): return git_hash = _get_repo_hash(repo_path) cache_data = { "scanner_version": SCANNER_VERSION, "git_hash": git_hash, "scanned_at": datetime.datetime.now().isoformat(), "nodes": sorted(list(all_nodes)), "metadata": metadata if metadata else {} } try: with open(cache_file, 'w') as f: json.dump(cache_data, f, indent=2) except: pass # Silently fail - cache is optional def download_url(url, dest_folder, filename=None): # Ensure the destination folder exists if not os.path.exists(dest_folder): os.makedirs(dest_folder) # Extract filename from URL if not provided if filename is None: filename = os.path.basename(url) # Full path to save the file dest_path = os.path.join(dest_folder, filename) # Download the file response = requests.get(url, stream=True) if response.status_code == 200: with open(dest_path, 'wb') as file: for chunk in response.iter_content(chunk_size=1024): if chunk: file.write(chunk) else: raise Exception(f"Failed to download file from {url}") def parse_arguments(): """Parse command-line arguments""" parser = argparse.ArgumentParser( description='ComfyUI Manager Node Scanner', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: # Standard mode python3 scanner.py python3 scanner.py --skip-update python3 scanner.py --skip-all --force-rescan # Scan-only mode python3 scanner.py --scan-only temp-urls-clean.list python3 scanner.py --scan-only urls.list --temp-dir /custom/temp python3 scanner.py --scan-only urls.list --skip-update --force-rescan ''' ) parser.add_argument('--scan-only', type=str, metavar='URL_LIST_FILE', help='Scan-only mode: provide URL list file (one URL per line)') parser.add_argument('--temp-dir', type=str, metavar='DIR', help='Temporary directory for cloned repositories') parser.add_argument('--skip-update', action='store_true', help='Skip git clone/pull operations') parser.add_argument('--skip-stat-update', action='store_true', help='Skip GitHub stats collection') parser.add_argument('--skip-all', action='store_true', help='Skip all update operations') parser.add_argument('--force-rescan', action='store_true', help='Force rescan all nodes (ignore cache)') # Backward compatibility: positional argument for temp_dir parser.add_argument('temp_dir_positional', nargs='?', metavar='TEMP_DIR', help='(Legacy) Temporary directory path') args = parser.parse_args() return args # Module-level variables (will be set in main if running as script) args = None scan_only_mode = False url_list_file = None temp_dir = None skip_update = False skip_stat_update = True g = None parse_cnt = 0 def extract_nodes(code_text): global parse_cnt # Check cache first cache_key = hash(code_text) if cache_key in _extract_nodes_cache: return _extract_nodes_cache[cache_key].copy() try: if parse_cnt % 100 == 0: print(".", end="", flush=True) parse_cnt += 1 code_text = re.sub(r'\\[^"\']', '', code_text) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) warnings.filterwarnings('ignore', category=DeprecationWarning) parsed_code = ast.parse(code_text) # Support both ast.Assign and ast.AnnAssign (for type-annotated assignments) assignments = (node for node in parsed_code.body if isinstance(node, (ast.Assign, ast.AnnAssign))) for assignment in assignments: # Handle ast.AnnAssign (e.g., NODE_CLASS_MAPPINGS: Type = {...}) if isinstance(assignment, ast.AnnAssign): if isinstance(assignment.target, ast.Name) and assignment.target.id in ['NODE_CONFIG', 'NODE_CLASS_MAPPINGS']: node_class_mappings = assignment.value break # Handle ast.Assign (e.g., NODE_CLASS_MAPPINGS = {...}) elif isinstance(assignment.targets[0], ast.Name) and assignment.targets[0].id in ['NODE_CONFIG', 'NODE_CLASS_MAPPINGS']: node_class_mappings = assignment.value break else: node_class_mappings = None if node_class_mappings: s = set() for key in node_class_mappings.keys: if key is not None and isinstance(key.value, str): s.add(key.value.strip()) # Cache the result _extract_nodes_cache[cache_key] = s return s else: # Cache empty result _extract_nodes_cache[cache_key] = set() return set() except: # Cache empty result on error _extract_nodes_cache[cache_key] = set() return set() def extract_nodes_from_repo(repo_path: Path, verbose: bool = False, force_rescan: bool = False) -> tuple: """ Extract all nodes and metadata from a repository with per-repo caching. Automatically caches results in .git/nodecache.json. Cache is invalidated when: - Git commit hash changes - Scanner version changes - force_rescan flag is True Args: repo_path: Path to repository root verbose: If True, print UI-only extension detection messages force_rescan: If True, ignore cache and force fresh scan Returns: tuple: (nodes_set, metadata_dict) """ # Ensure path is absolute repo_path = repo_path.resolve() # Check per-repo cache first (unless force_rescan is True) if not force_rescan: cached_result = _load_per_repo_cache(repo_path) if cached_result is not None: return cached_result # Cache miss - scan all .py files all_nodes = set() all_metadata = {} py_files = list(repo_path.rglob("*.py")) # Filter out __pycache__, .git, and other hidden directories filtered_files = [] for f in py_files: try: rel_path = f.relative_to(repo_path) # Skip __pycache__, .git, and any directory starting with . if '__pycache__' not in str(rel_path) and not any(part.startswith('.') for part in rel_path.parts): filtered_files.append(f) except: continue py_files = filtered_files for py_file in py_files: try: # Read file with proper encoding with open(py_file, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() if code: # Extract nodes using SAME logic as scan_in_file # V1 nodes (enhanced with fallback patterns) nodes = extract_nodes_enhanced(code, py_file, visited=set(), verbose=verbose) all_nodes.update(nodes) # V3 nodes detection v3_nodes = extract_v3_nodes(code) all_nodes.update(v3_nodes) # Dict parsing - exclude commented NODE_CLASS_MAPPINGS lines pattern = r"_CLASS_MAPPINGS\s*(?::\s*\w+\s*)?=\s*(?:\\\s*)?{([^}]*)}" regex = re.compile(pattern, re.MULTILINE | re.DOTALL) for match_obj in regex.finditer(code): # Get the line where NODE_CLASS_MAPPINGS is defined match_start = match_obj.start() line_start = code.rfind('\n', 0, match_start) + 1 line_end = code.find('\n', match_start) if line_end == -1: line_end = len(code) line = code[line_start:line_end] # Skip if line starts with # (commented) if re.match(r'^\s*#', line): continue match = match_obj.group(1) # Filter out commented lines from dict content match_lines = match.split('\n') match_filtered = '\n'.join( line for line in match_lines if not re.match(r'^\s*#', line) ) # Extract key-value pairs with double quotes key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", match_filtered) for key, value in key_value_pairs: all_nodes.add(key.strip()) # Extract key-value pairs with single quotes key_value_pairs = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", match_filtered) for key, value in key_value_pairs: all_nodes.add(key.strip()) # Handle .update() pattern (AFTER comment removal) code_cleaned = re.sub(r'^#.*?$', '', code, flags=re.MULTILINE) update_pattern = r"_CLASS_MAPPINGS\.update\s*\(\s*{([^}]*)}\s*\)" update_match = re.search(update_pattern, code_cleaned, re.DOTALL) if update_match: update_dict_text = update_match.group(1) # Extract key-value pairs (double quotes) update_pairs = re.findall(r'"([^"]*)"\s*:\s*([^,\n]*)', update_dict_text) for key, value in update_pairs: all_nodes.add(key.strip()) # Extract key-value pairs (single quotes) update_pairs_single = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", update_dict_text) for key, value in update_pairs_single: all_nodes.add(key.strip()) # Additional regex patterns (AFTER comment removal) patterns = [ r'^[^=]*_CLASS_MAPPINGS\["(.*?)"\]', r'^[^=]*_CLASS_MAPPINGS\[\'(.*?)\'\]', r'@register_node\("(.+)",\s*\".+"\)', r'"(\w+)"\s*:\s*{"class":\s*\w+\s*' ] for pattern in patterns: keys = re.findall(pattern, code_cleaned) all_nodes.update(key.strip() for key in keys) # Extract metadata from this file metadata = extract_metadata_only(str(py_file)) all_metadata.update(metadata) except Exception: # Silently skip files that can't be read continue # Save to per-repo cache _save_per_repo_cache(repo_path, all_nodes, all_metadata) return (all_nodes, all_metadata) def _verify_class_exists(node_name: str, code_text: str, file_path: Optional[Path] = None) -> tuple[bool, Optional[str], Optional[int]]: """ Verify that a node class exists and has ComfyUI node structure. Returns: (exists: bool, file_path: str, line_number: int) A valid ComfyUI node must have: - Class definition (not commented) - At least one of: INPUT_TYPES, RETURN_TYPES, FUNCTION method/attribute """ try: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) tree = ast.parse(code_text) except: return (False, None, None) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): if node.name == node_name or node.name.replace('_', '') == node_name.replace('_', ''): # Found class definition - check if it has ComfyUI interface has_input_types = False has_return_types = False has_function = False for item in node.body: # Check for INPUT_TYPES method if isinstance(item, ast.FunctionDef) and item.name == 'INPUT_TYPES': has_input_types = True # Check for RETURN_TYPES attribute elif isinstance(item, ast.Assign): for target in item.targets: if isinstance(target, ast.Name): if target.id == 'RETURN_TYPES': has_return_types = True elif target.id == 'FUNCTION': has_function = True # Check for FUNCTION method elif isinstance(item, ast.FunctionDef): has_function = True # Valid if has any ComfyUI signature if has_input_types or has_return_types or has_function: file_str = str(file_path) if file_path else None return (True, file_str, node.lineno) return (False, None, None) def _extract_display_name_mappings(code_text: str) -> Set[str]: """ Extract node names from NODE_DISPLAY_NAME_MAPPINGS. Pattern: NODE_DISPLAY_NAME_MAPPINGS = { "node_key": "Display Name", ... } Returns: Set of node keys from NODE_DISPLAY_NAME_MAPPINGS """ try: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) tree = ast.parse(code_text) except: return set() nodes = set() for node in tree.body: if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name) and target.id == 'NODE_DISPLAY_NAME_MAPPINGS': if isinstance(node.value, ast.Dict): for key in node.value.keys: if isinstance(key, ast.Constant) and isinstance(key.value, str): nodes.add(key.value.strip()) return nodes def extract_nodes_enhanced( code_text: str, file_path: Optional[Path] = None, visited: Optional[Set[Path]] = None, verbose: bool = False ) -> Set[str]: """ Enhanced node extraction with multi-layer detection system. Scanner 2.0.11 - Comprehensive detection strategy: - Phase 1: NODE_CLASS_MAPPINGS dict literal - Phase 2: Class.NAME attribute access (e.g., FreeChat.NAME) - Phase 3: Item assignment (NODE_CLASS_MAPPINGS["key"] = value) - Phase 4: Class existence verification (detects active classes even if registration commented) - Phase 5: NODE_DISPLAY_NAME_MAPPINGS cross-reference - Phase 6: Empty dict detection (UI-only extensions, logging only) Fixed Bugs: - Scanner 2.0.9: Fallback cascade prevented Phase 3 execution - Scanner 2.0.10: Missed active classes with commented registrations (15 false negatives) Args: code_text: Python source code file_path: Path to file (for logging and caching) visited: Visited paths (for circular import prevention) verbose: If True, print UI-only extension detection messages Returns: Set of node names (union of all detected patterns) """ # Check file-based cache if file_path provided if file_path is not None: try: file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path if file_path_obj.exists(): current_mtime = file_path_obj.stat().st_mtime # Check if we have cached result with matching mtime and scanner version if file_path_obj in _file_mtime_cache: cached_mtime = _file_mtime_cache[file_path_obj] cache_key = (str(file_path_obj), cached_mtime, SCANNER_VERSION) if current_mtime == cached_mtime and cache_key in _extract_nodes_enhanced_cache: return _extract_nodes_enhanced_cache[cache_key].copy() except: pass # Ignore cache errors, proceed with normal execution # Suppress warnings from AST parsing with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) warnings.filterwarnings('ignore', category=DeprecationWarning) # Phase 1: Original extract_nodes() - dict literal phase1_nodes = extract_nodes(code_text) # Phase 2: Class.NAME pattern if visited is None: visited = set() phase2_nodes = _fallback_classname_resolver(code_text, file_path) # Phase 3: Item assignment pattern phase3_nodes = _fallback_item_assignment(code_text) # Phase 4: NODE_DISPLAY_NAME_MAPPINGS cross-reference (NEW in 2.0.11) # This catches nodes that are in display names but not in NODE_CLASS_MAPPINGS phase4_nodes = _extract_display_name_mappings(code_text) # Phase 5: Class existence verification ONLY for display name candidates (NEW in 2.0.11) # This phase is CONSERVATIVE - only verify classes that appear in display names # This catches the specific Scanner 2.0.10 bug pattern: # - NODE_CLASS_MAPPINGS registration is commented # - NODE_DISPLAY_NAME_MAPPINGS still has the entry # - Class implementation exists # Example: Bjornulf_ollamaLoader in Bjornulf_custom_nodes phase5_nodes = set() for node_name in phase4_nodes: # Only check classes that appear in display names but not in registrations if node_name not in (phase1_nodes | phase2_nodes | phase3_nodes): exists, _, _ = _verify_class_exists(node_name, code_text, file_path) if exists: phase5_nodes.add(node_name) # Union all results (FIX: Scanner 2.0.9 bug + Scanner 2.0.10 bug) # 2.0.9: Used early return which missed Phase 3 nodes # 2.0.10: Only checked registrations, missed classes referenced in display names all_nodes = phase1_nodes | phase2_nodes | phase3_nodes | phase4_nodes | phase5_nodes # Phase 6: Empty dict detector (logging only, doesn't add nodes) if not all_nodes: _fallback_empty_dict_detector(code_text, file_path, verbose) # Cache the result if file_path is not None: try: file_path_obj = Path(file_path) if not isinstance(file_path, Path) else file_path if file_path_obj.exists(): current_mtime = file_path_obj.stat().st_mtime cache_key = (str(file_path_obj), current_mtime, SCANNER_VERSION) _extract_nodes_enhanced_cache[cache_key] = all_nodes _file_mtime_cache[file_path_obj] = current_mtime except: pass return all_nodes def _fallback_classname_resolver(code_text: str, file_path: Optional[Path]) -> Set[str]: """ Detect Class.NAME pattern in NODE_CLASS_MAPPINGS. Pattern: NODE_CLASS_MAPPINGS = { FreeChat.NAME: FreeChat, PaidChat.NAME: PaidChat } """ try: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) parsed = ast.parse(code_text) except: return set() nodes = set() for node in parsed.body: if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name) and target.id == 'NODE_CLASS_MAPPINGS': if isinstance(node.value, ast.Dict): for key in node.value.keys: # Detect Class.NAME pattern if isinstance(key, ast.Attribute): if isinstance(key.value, ast.Name): # Use class name as node name nodes.add(key.value.id) # Also handle literal strings elif isinstance(key, ast.Constant) and isinstance(key.value, str): nodes.add(key.value.strip()) return nodes def _fallback_item_assignment(code_text: str) -> Set[str]: """ Detect item assignment pattern. Pattern: NODE_CLASS_MAPPINGS = {} NODE_CLASS_MAPPINGS["MyNode"] = MyNode """ try: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) parsed = ast.parse(code_text) except: return set() nodes = set() for node in ast.walk(parsed): if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Subscript): if (isinstance(target.value, ast.Name) and target.value.id in ['NODE_CLASS_MAPPINGS', 'NODE_CONFIG']): # Extract key if isinstance(target.slice, ast.Constant): if isinstance(target.slice.value, str): nodes.add(target.slice.value) return nodes def _extract_repo_name(file_path: Path) -> str: """ Extract repository name from file path. Path structure: /home/rho/.tmp/analysis/temp/{author}_{reponame}/{path/to/file.py} Returns: {author}_{reponame} or filename if extraction fails """ try: parts = file_path.parts # Find 'temp' directory in path if 'temp' in parts: temp_idx = parts.index('temp') if temp_idx + 1 < len(parts): # Next part after 'temp' is the repo directory return parts[temp_idx + 1] except (ValueError, IndexError): pass # Fallback to filename if extraction fails return file_path.name if hasattr(file_path, 'name') else str(file_path) def _fallback_empty_dict_detector(code_text: str, file_path: Optional[Path], verbose: bool = False) -> None: """ Detect empty NODE_CLASS_MAPPINGS (UI-only extensions). Logs for documentation purposes only (when verbose=True). Args: code_text: Python source code to analyze file_path: Path to the file being analyzed verbose: If True, print detection messages """ empty_patterns = [ 'NODE_CLASS_MAPPINGS = {}', 'NODE_CLASS_MAPPINGS={}', ] code_normalized = code_text.replace(' ', '').replace('\n', '') for pattern in empty_patterns: pattern_normalized = pattern.replace(' ', '') if pattern_normalized in code_normalized: if file_path and verbose: repo_name = _extract_repo_name(file_path) print(f"Info: UI-only extension (empty NODE_CLASS_MAPPINGS): {repo_name}") return def has_comfy_node_base(class_node): """Check if class inherits from io.ComfyNode or ComfyNode""" for base in class_node.bases: # Case 1: ComfyNode if isinstance(base, ast.Name) and base.id == 'ComfyNode': return True # Case 2: io.ComfyNode elif isinstance(base, ast.Attribute): if base.attr == 'ComfyNode': return True return False def extract_keyword_value(call_node, keyword): """ Extract string value of keyword argument Schema(node_id="MyNode") -> "MyNode" """ for kw in call_node.keywords: if kw.arg == keyword: # ast.Constant (Python 3.8+) if isinstance(kw.value, ast.Constant): if isinstance(kw.value.value, str): return kw.value.value # ast.Str (Python 3.7-) - suppress deprecation warning else: with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=DeprecationWarning) if hasattr(ast, 'Str') and isinstance(kw.value, ast.Str): return kw.value.s return None def is_schema_call(call_node): """Check if ast.Call is io.Schema() or Schema()""" func = call_node.func if isinstance(func, ast.Name) and func.id == 'Schema': return True elif isinstance(func, ast.Attribute) and func.attr == 'Schema': return True return False def extract_node_id_from_schema(class_node): """ Extract node_id from define_schema() method """ for item in class_node.body: if isinstance(item, ast.FunctionDef) and item.name == 'define_schema': # Walk through function body for stmt in ast.walk(item): if isinstance(stmt, ast.Call): # Check if it's Schema() call if is_schema_call(stmt): node_id = extract_keyword_value(stmt, 'node_id') if node_id: return node_id return None def extract_v3_nodes(code_text): """ Extract V3 node IDs using AST parsing Returns: set of node_id strings """ global parse_cnt try: if parse_cnt % 100 == 0: print(".", end="", flush=True) parse_cnt += 1 with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=SyntaxWarning) warnings.filterwarnings('ignore', category=DeprecationWarning) tree = ast.parse(code_text) except (SyntaxError, UnicodeDecodeError): return set() nodes = set() # Find io.ComfyNode subclasses for node in ast.walk(tree): if isinstance(node, ast.ClassDef): # Check if inherits from ComfyNode if has_comfy_node_base(node): node_id = extract_node_id_from_schema(node) if node_id: nodes.add(node_id) return nodes # scan def extract_metadata_only(filename): """Extract only metadata (@author, @title, etc) without node scanning""" try: with open(filename, encoding='utf-8', errors='ignore') as file: code = file.read() metadata = {} lines = code.strip().split('\n') for line in lines: if line.startswith('@'): if line.startswith("@author:") or line.startswith("@title:") or line.startswith("@nickname:") or line.startswith("@description:"): key, value = line[1:].strip().split(':', 1) metadata[key.strip()] = value.strip() return metadata except: return {} def scan_in_file(filename, is_builtin=False): global builtin_nodes with open(filename, encoding='utf-8', errors='ignore') as file: code = file.read() # Support type annotations (e.g., NODE_CLASS_MAPPINGS: Type = {...}) and line continuations (\) pattern = r"_CLASS_MAPPINGS\s*(?::\s*\w+\s*)?=\s*(?:\\\s*)?{([^}]*)}" regex = re.compile(pattern, re.MULTILINE | re.DOTALL) nodes = set() class_dict = {} # V1 nodes detection (enhanced with fallback patterns) nodes |= extract_nodes_enhanced(code, file_path=Path(filename), visited=set()) # V3 nodes detection nodes |= extract_v3_nodes(code) code = re.sub(r'^#.*?$', '', code, flags=re.MULTILINE) def extract_keys(pattern, code): keys = re.findall(pattern, code) return {key.strip() for key in keys} def update_nodes(nodes, new_keys): nodes |= new_keys patterns = [ r'^[^=]*_CLASS_MAPPINGS\["(.*?)"\]', r'^[^=]*_CLASS_MAPPINGS\[\'(.*?)\'\]', r'@register_node\("(.+)",\s*\".+"\)', r'"(\w+)"\s*:\s*{"class":\s*\w+\s*' ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(extract_keys, pattern, code): pattern for pattern in patterns} for future in concurrent.futures.as_completed(futures): update_nodes(nodes, future.result()) matches = regex.findall(code) for match in matches: dict_text = match key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", dict_text) for key, value in key_value_pairs: class_dict[key.strip()] = value.strip() key_value_pairs = re.findall(r"'([^']*)'\s*:\s*([^,\n]*)", dict_text) for key, value in key_value_pairs: class_dict[key.strip()] = value.strip() for key, value in class_dict.items(): nodes.add(key.strip()) update_pattern = r"_CLASS_MAPPINGS.update\s*\({([^}]*)}\)" update_match = re.search(update_pattern, code) if update_match: update_dict_text = update_match.group(1) update_key_value_pairs = re.findall(r"\"([^\"]*)\"\s*:\s*([^,\n]*)", update_dict_text) for key, value in update_key_value_pairs: class_dict[key.strip()] = value.strip() nodes.add(key.strip()) metadata = {} lines = code.strip().split('\n') for line in lines: if line.startswith('@'): if line.startswith("@author:") or line.startswith("@title:") or line.startswith("@nickname:") or line.startswith("@description:"): key, value = line[1:].strip().split(':', 1) metadata[key.strip()] = value.strip() if is_builtin: builtin_nodes += set(nodes) else: for x in builtin_nodes: if x in nodes: nodes.remove(x) return nodes, metadata def get_py_file_paths(dirname): file_paths = [] for root, dirs, files in os.walk(dirname): if ".git" in root or "__pycache__" in root: continue for file in files: if file.endswith(".py"): file_path = os.path.join(root, file) file_paths.append(file_path) return file_paths def get_nodes(target_dir): py_files = [] directories = [] for item in os.listdir(target_dir): if ".git" in item or "__pycache__" in item: continue path = os.path.abspath(os.path.join(target_dir, item)) if os.path.isfile(path) and item.endswith(".py"): py_files.append(path) elif os.path.isdir(path): directories.append(path) return py_files, directories def get_urls_from_list_file(list_file): """ Read URLs from list file for scan-only mode Args: list_file (str): Path to URL list file (one URL per line) Returns: list of tuples: [(url, "", None, None), ...] Format: (url, title, preemptions, nodename_pattern) - title: Empty string - preemptions: None - nodename_pattern: None File format: https://github.com/owner/repo1 https://github.com/owner/repo2 # Comments starting with # are ignored Raises: FileNotFoundError: If list_file does not exist """ if not os.path.exists(list_file): raise FileNotFoundError(f"URL list file not found: {list_file}") urls = [] with open(list_file, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Validate URL format (basic check) if not (line.startswith('http://') or line.startswith('https://')): print(f"WARNING: Line {line_num} is not a valid URL: {line}") continue # Add URL with empty metadata # (url, title, preemptions, nodename_pattern) urls.append((line, "", None, None)) print(f"Loaded {len(urls)} URLs from {list_file}") return urls def get_git_urls_from_json(json_file): with open(json_file, encoding='utf-8') as file: data = json.load(file) custom_nodes = data.get('custom_nodes', []) git_clone_files = [] for node in custom_nodes: if node.get('install_type') == 'git-clone': files = node.get('files', []) if files: git_clone_files.append((files[0], node.get('title'), node.get('preemptions'), node.get('nodename_pattern'))) git_clone_files.append(("https://github.com/comfyanonymous/ComfyUI", "ComfyUI", None, None)) return git_clone_files def get_py_urls_from_json(json_file): with open(json_file, encoding='utf-8') as file: data = json.load(file) custom_nodes = data.get('custom_nodes', []) py_files = [] for node in custom_nodes: if node.get('install_type') == 'copy': files = node.get('files', []) if files: py_files.append((files[0], node.get('title'), node.get('preemptions'), node.get('nodename_pattern'))) return py_files def clone_or_pull_git_repository(git_url): repo_name = git_url.split("/")[-1] if repo_name.endswith(".git"): repo_name = repo_name[:-4] repo_dir = os.path.join(temp_dir, repo_name) if os.path.exists(repo_dir): try: repo = Repo(repo_dir) origin = repo.remote(name="origin") origin.pull() repo.git.submodule('update', '--init', '--recursive') print(f"Pulling {repo_name}...") except Exception as e: print(f"Failed to pull '{repo_name}': {e}") else: try: Repo.clone_from(git_url, repo_dir, recursive=True) print(f"Cloning {repo_name}...") except Exception as e: print(f"Failed to clone '{repo_name}': {e}") def update_custom_nodes(scan_only_mode=False, url_list_file=None): """ Update custom nodes by cloning/pulling repositories Args: scan_only_mode (bool): If True, use URL list file instead of custom-node-list.json url_list_file (str): Path to URL list file (required if scan_only_mode=True) Returns: dict: node_info mapping {repo_name: (url, title, preemptions, node_pattern)} """ if not os.path.exists(temp_dir): os.makedirs(temp_dir) node_info = {} # Select URL source based on mode if scan_only_mode: if not url_list_file: raise ValueError("url_list_file is required in scan-only mode") git_url_titles_preemptions = get_urls_from_list_file(url_list_file) print("\n[Scan-Only Mode]") print(f" - URL source: {url_list_file}") print(" - GitHub stats: DISABLED") print(f" - Git clone/pull: {'ENABLED' if not skip_update else 'DISABLED'}") print(" - Metadata: EMPTY") else: if not os.path.exists('custom-node-list.json'): raise FileNotFoundError("custom-node-list.json not found") git_url_titles_preemptions = get_git_urls_from_json('custom-node-list.json') print("\n[Standard Mode]") print(" - URL source: custom-node-list.json") print(f" - GitHub stats: {'ENABLED' if not skip_stat_update else 'DISABLED'}") print(f" - Git clone/pull: {'ENABLED' if not skip_update else 'DISABLED'}") print(" - Metadata: FULL") def process_git_url_title(url, title, preemptions, node_pattern): name = os.path.basename(url) if name.endswith(".git"): name = name[:-4] node_info[name] = (url, title, preemptions, node_pattern) if not skip_update: clone_or_pull_git_repository(url) def process_git_stats(git_url_titles_preemptions): GITHUB_STATS_CACHE_FILENAME = 'github-stats-cache.json' GITHUB_STATS_FILENAME = 'github-stats.json' github_stats = {} try: with open(GITHUB_STATS_CACHE_FILENAME, 'r', encoding='utf-8') as file: github_stats = json.load(file) except FileNotFoundError: pass def is_rate_limit_exceeded(): return g.rate_limiting[0] <= 20 if is_rate_limit_exceeded(): print(f"GitHub API Rate Limit Exceeded: remained - {(g.rate_limiting_resettime - datetime.datetime.now().timestamp())/60:.2f} min") else: def renew_stat(url): if is_rate_limit_exceeded(): return if 'github.com' not in url: return None print('.', end="") sys.stdout.flush() try: # Parsing the URL parsed_url = urlparse(url) domain = parsed_url.netloc path = parsed_url.path path_parts = path.strip("/").split("/") if len(path_parts) >= 2 and domain == "github.com": owner_repo = "/".join(path_parts[-2:]) repo = g.get_repo(owner_repo) owner = repo.owner now = datetime.datetime.now(datetime.timezone.utc) author_time_diff = now - owner.created_at last_update = repo.pushed_at.strftime("%Y-%m-%d %H:%M:%S") if repo.pushed_at else 'N/A' item = { "stars": repo.stargazers_count, "last_update": last_update, "cached_time": now.timestamp(), "author_account_age_days": author_time_diff.days, } return url, item else: print(f"\nInvalid URL format for GitHub repository: {url}\n") except Exception as e: print(f"\nERROR on {url}\n{e}") return None # resolve unresolved urls with concurrent.futures.ThreadPoolExecutor(11) as executor: futures = [] for url, title, preemptions, node_pattern in git_url_titles_preemptions: if url not in github_stats: futures.append(executor.submit(renew_stat, url)) for future in concurrent.futures.as_completed(futures): url_item = future.result() if url_item is not None: url, item = url_item github_stats[url] = item # renew outdated cache outdated_urls = [] for k, v in github_stats.items(): elapsed = (datetime.datetime.now().timestamp() - v['cached_time']) if elapsed > 60*60*12: # 12 hours outdated_urls.append(k) with concurrent.futures.ThreadPoolExecutor(11) as executor: for url in outdated_urls: futures.append(executor.submit(renew_stat, url)) for future in concurrent.futures.as_completed(futures): url_item = future.result() if url_item is not None: url, item = url_item github_stats[url] = item with open('github-stats-cache.json', 'w', encoding='utf-8') as file: json.dump(github_stats, file, ensure_ascii=False, indent=4) with open(GITHUB_STATS_FILENAME, 'w', encoding='utf-8') as file: for v in github_stats.values(): if "cached_time" in v: del v["cached_time"] github_stats = dict(sorted(github_stats.items())) json.dump(github_stats, file, ensure_ascii=False, indent=4) print(f"Successfully written to {GITHUB_STATS_FILENAME}.") if not skip_stat_update: process_git_stats(git_url_titles_preemptions) # Git clone/pull for all repositories with concurrent.futures.ThreadPoolExecutor(11) as executor: for url, title, preemptions, node_pattern in git_url_titles_preemptions: executor.submit(process_git_url_title, url, title, preemptions, node_pattern) # .py file download (skip in scan-only mode - only process git repos) if not scan_only_mode: py_url_titles_and_pattern = get_py_urls_from_json('custom-node-list.json') def download_and_store_info(url_title_preemptions_and_pattern): url, title, preemptions, node_pattern = url_title_preemptions_and_pattern name = os.path.basename(url) if name.endswith(".py"): node_info[name] = (url, title, preemptions, node_pattern) try: download_url(url, temp_dir) except: print(f"[ERROR] Cannot download '{url}'") with concurrent.futures.ThreadPoolExecutor(10) as executor: executor.map(download_and_store_info, py_url_titles_and_pattern) return node_info def gen_json(node_info, scan_only_mode=False, force_rescan=False): """ Generate extension-node-map.json from scanned node information Args: node_info (dict): Repository metadata mapping scan_only_mode (bool): If True, exclude metadata from output force_rescan (bool): If True, ignore cache and force rescan all nodes """ # scan from .py file node_files, node_dirs = get_nodes(temp_dir) comfyui_path = os.path.abspath(os.path.join(temp_dir, "ComfyUI")) # Only reorder if ComfyUI exists in the list if comfyui_path in node_dirs: node_dirs.remove(comfyui_path) node_dirs = [comfyui_path] + node_dirs data = {} for dirname in node_dirs: py_files = get_py_file_paths(dirname) metadata = {} # Use per-repo cache for node AND metadata extraction try: nodes, metadata = extract_nodes_from_repo(Path(dirname), verbose=False, force_rescan=force_rescan) except: # Fallback to file-by-file scanning if extract_nodes_from_repo fails nodes = set() for py in py_files: nodes_in_file, metadata_in_file = scan_in_file(py, dirname == "ComfyUI") nodes.update(nodes_in_file) metadata.update(metadata_in_file) dirname = os.path.basename(dirname) if 'Jovimetrix' in dirname: pass if len(nodes) > 0 or (dirname in node_info and node_info[dirname][3] is not None): nodes = list(nodes) nodes.sort() if dirname in node_info: git_url, title, preemptions, node_pattern = node_info[dirname] # Conditionally add metadata based on mode if not scan_only_mode: # Standard mode: include all metadata metadata['title_aux'] = title if preemptions is not None: metadata['preemptions'] = preemptions if node_pattern is not None: metadata['nodename_pattern'] = node_pattern # Scan-only mode: metadata remains empty data[git_url] = (nodes, metadata) else: # Scan-only mode: Repository not in node_info (expected behavior) # Construct URL from dirname (author_repo format) if '_' in dirname: parts = dirname.split('_', 1) git_url = f"https://github.com/{parts[0]}/{parts[1]}" data[git_url] = (nodes, metadata) else: print(f"WARN: {dirname} is removed from custom-node-list.json") for file in node_files: nodes, metadata = scan_in_file(file) if len(nodes) > 0 or (dirname in node_info and node_info[dirname][3] is not None): nodes = list(nodes) nodes.sort() file = os.path.basename(file) if file in node_info: url, title, preemptions, node_pattern = node_info[file] # Conditionally add metadata based on mode if not scan_only_mode: metadata['title_aux'] = title if preemptions is not None: metadata['preemptions'] = preemptions if node_pattern is not None: metadata['nodename_pattern'] = node_pattern data[url] = (nodes, metadata) else: print(f"Missing info: {file}") # scan from node_list.json file extensions = [name for name in os.listdir(temp_dir) if os.path.isdir(os.path.join(temp_dir, name))] for extension in extensions: node_list_json_path = os.path.join(temp_dir, extension, 'node_list.json') if os.path.exists(node_list_json_path): # Skip if extension not in node_info (scan-only mode with limited URLs) if extension not in node_info: continue git_url, title, preemptions, node_pattern = node_info[extension] with open(node_list_json_path, 'r', encoding='utf-8') as f: try: node_list_json = json.load(f) except Exception as e: print(f"\nERROR: Invalid json format '{node_list_json_path}'") print("------------------------------------------------------") print(e) print("------------------------------------------------------") node_list_json = {} metadata_in_url = {} if git_url not in data: nodes = set() else: nodes_in_url, metadata_in_url = data[git_url] nodes = set(nodes_in_url) try: for x, desc in node_list_json.items(): nodes.add(x.strip()) except Exception as e: print(f"\nERROR: Invalid json format '{node_list_json_path}'") print("------------------------------------------------------") print(e) print("------------------------------------------------------") node_list_json = {} # Conditionally add metadata based on mode if not scan_only_mode: metadata_in_url['title_aux'] = title if preemptions is not None: metadata_in_url['preemptions'] = preemptions if node_pattern is not None: metadata_in_url['nodename_pattern'] = node_pattern nodes = list(nodes) nodes.sort() data[git_url] = (nodes, metadata_in_url) json_path = "extension-node-map.json" with open(json_path, "w", encoding='utf-8') as file: json.dump(data, file, indent=4, sort_keys=True) if __name__ == "__main__": # Parse arguments args = parse_arguments() # Determine mode scan_only_mode = args.scan_only is not None url_list_file = args.scan_only if scan_only_mode else None # Determine temp_dir if args.temp_dir: temp_dir = args.temp_dir elif args.temp_dir_positional: temp_dir = args.temp_dir_positional else: temp_dir = os.path.join(os.getcwd(), ".tmp") if not os.path.exists(temp_dir): os.makedirs(temp_dir) # Determine skip flags skip_update = args.skip_update or args.skip_all skip_stat_update = args.skip_stat_update or args.skip_all or scan_only_mode if not skip_stat_update: auth = Auth.Token(os.environ.get('GITHUB_TOKEN')) g = Github(auth=auth) else: g = None print("### ComfyUI Manager Node Scanner ###") if scan_only_mode: print(f"\n# [Scan-Only Mode] Processing URL list: {url_list_file}\n") else: print("\n# [Standard Mode] Updating extensions\n") # Update/clone repositories and collect node info updated_node_info = update_custom_nodes(scan_only_mode, url_list_file) print("\n# Generating 'extension-node-map.json'...\n") # Generate extension-node-map.json force_rescan = args.force_rescan if hasattr(args, 'force_rescan') else False if force_rescan: print("⚠️ Force rescan enabled - ignoring all cached results\n") gen_json(updated_node_info, scan_only_mode, force_rescan) print("\n✅ DONE.\n") if scan_only_mode: print("Output: extension-node-map.json (node mappings only)") else: print("Output: extension-node-map.json (full metadata)")