diff --git a/runpod_handler.py b/runpod_handler.py index c0234c51d..e82f4fbf6 100644 --- a/runpod_handler.py +++ b/runpod_handler.py @@ -5,12 +5,11 @@ Optimized for the new ComfyUI features and performance improvements """ import os -import json import time import logging import tempfile import requests -from typing import Dict, Any, Optional +from typing import Dict, Any import runpod # Configure logging @@ -25,13 +24,13 @@ class ComfyUIServerlessHandler: self.comfyui_url = "http://127.0.0.1:8000" self.client_id = "runpod_serverless_worker" self.setup_paths() - + def setup_paths(self): """Setup required paths for serverless operation""" os.makedirs("/tmp/inputs", exist_ok=True) os.makedirs("/tmp/outputs", exist_ok=True) os.makedirs("/tmp/comfyui", exist_ok=True) - + def wait_for_comfyui(self, timeout: int = 120) -> bool: """Wait for ComfyUI to be ready""" start_time = time.time() @@ -44,37 +43,37 @@ class ComfyUIServerlessHandler: except requests.exceptions.RequestException: pass time.sleep(2) - + logger.error(f"ComfyUI not ready after {timeout} seconds") return False - + def download_input_files(self, input_data: Dict[str, Any]) -> Dict[str, str]: """Download input files and return local paths""" local_files = {} - + if "input_files" in input_data: for file_key, file_url in input_data["input_files"].items(): try: response = requests.get(file_url, timeout=60) response.raise_for_status() - + # Create temporary file with tempfile.NamedTemporaryFile( - delete=False, + delete=False, dir="/tmp/inputs", suffix=os.path.splitext(file_url)[1] ) as tmp_file: tmp_file.write(response.content) local_files[file_key] = tmp_file.name - + logger.info(f"Downloaded {file_key} to {local_files[file_key]}") - + except Exception as e: logger.error(f"Failed to download {file_key}: {str(e)}") raise - + return local_files - + def execute_workflow(self, workflow: Dict[str, Any]) -> Dict[str, Any]: """Execute ComfyUI workflow""" try: @@ -88,31 +87,31 @@ class ComfyUIServerlessHandler: timeout=30 ) queue_response.raise_for_status() - + prompt_id = queue_response.json()["prompt_id"] logger.info(f"Queued workflow with prompt_id: {prompt_id}") - + # Wait for completion return self.wait_for_completion(prompt_id) - + except Exception as e: logger.error(f"Failed to execute workflow: {str(e)}") raise - + def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> Dict[str, Any]: """Wait for workflow completion and return results""" start_time = time.time() - + while time.time() - start_time < timeout: try: # Check queue status queue_response = requests.get(f"{self.comfyui_url}/queue") queue_data = queue_response.json() - + # Check if our job is still in queue running = any(item[1]["prompt_id"] == prompt_id for item in queue_data.get("queue_running", [])) pending = any(item[1]["prompt_id"] == prompt_id for item in queue_data.get("queue_pending", [])) - + if not running and not pending: # Job completed, get results history_response = requests.get(f"{self.comfyui_url}/history/{prompt_id}") @@ -120,15 +119,15 @@ class ComfyUIServerlessHandler: history_data = history_response.json() if prompt_id in history_data: return self.process_results(history_data[prompt_id]) - + time.sleep(2) - + except Exception as e: logger.error(f"Error checking completion: {str(e)}") time.sleep(5) - + raise TimeoutError(f"Workflow {prompt_id} timed out after {timeout} seconds") - + def process_results(self, history_data: Dict[str, Any]) -> Dict[str, Any]: """Process and upload results""" results = { @@ -136,7 +135,7 @@ class ComfyUIServerlessHandler: "outputs": [], "metadata": {} } - + if "outputs" in history_data: for node_id, node_output in history_data["outputs"].items(): if "images" in node_output: @@ -148,28 +147,28 @@ class ComfyUIServerlessHandler: "subfolder": image_info.get("subfolder", ""), "type": image_info.get("type", "output") } - + try: image_response = requests.get(image_url, params=params) image_response.raise_for_status() - + # Save to temp file for upload output_path = f"/tmp/outputs/{image_info['filename']}" with open(output_path, "wb") as f: f.write(image_response.content) - + results["outputs"].append({ "type": "image", "filename": image_info["filename"], "path": output_path, "node_id": node_id }) - + except Exception as e: logger.error(f"Failed to process image {image_info['filename']}: {str(e)}") - + return results - + def cleanup(self): """Clean up temporary files""" try: @@ -185,18 +184,18 @@ class ComfyUIServerlessHandler: def handler(job: Dict[str, Any]) -> Dict[str, Any]: """Main serverless handler function""" handler_instance = ComfyUIServerlessHandler() - + try: # Wait for ComfyUI to be ready if not handler_instance.wait_for_comfyui(): return {"error": "ComfyUI failed to start"} - + # Get job input job_input = job.get("input", {}) - + # Download input files if any local_files = handler_instance.download_input_files(job_input) - + # Update workflow with local file paths workflow = job_input.get("workflow", {}) if local_files and "file_mappings" in job_input: @@ -205,10 +204,10 @@ def handler(job: Dict[str, Any]) -> Dict[str, Any]: for input_key, file_key in mappings.items(): if file_key in local_files: workflow[node_id]["inputs"][input_key] = local_files[file_key] - + # Execute workflow results = handler_instance.execute_workflow(workflow) - + # Upload output files to RunPod storage or return base64 output_urls = [] for output in results.get("outputs", []): @@ -222,24 +221,24 @@ def handler(job: Dict[str, Any]) -> Dict[str, Any]: "data": image_data, "node_id": output["node_id"] }) - + return { "status": "success", "outputs": output_urls, "execution_time": time.time() - job.get("start_time", time.time()) } - + except Exception as e: logger.error(f"Handler error: {str(e)}") return { "error": str(e), "status": "failed" } - + finally: # Always cleanup handler_instance.cleanup() if __name__ == "__main__": # Start the serverless worker - runpod.serverless.start({"handler": handler}) \ No newline at end of file + runpod.serverless.start({"handler": handler}) diff --git a/start_runpod.py b/start_runpod.py index 067f31815..464ca66ea 100644 --- a/start_runpod.py +++ b/start_runpod.py @@ -8,7 +8,6 @@ import sys import logging import subprocess import time -from pathlib import Path # Logging setup logging.basicConfig( @@ -23,17 +22,17 @@ def mount_runpod_storage(): # RunPod network storage path (environment variable'dan al) network_storage_path = os.environ.get('RUNPOD_NETWORK_STORAGE_PATH', '/runpod-volume') models_storage_path = os.path.join(network_storage_path, 'models') - + # Local models klasörü local_models_path = '/app/models' - + logger.info(f"Network storage path: {network_storage_path}") logger.info(f"Models storage path: {models_storage_path}") - + # Network storage'da models klasörü var mı kontrol et if os.path.exists(models_storage_path): logger.info("Network storage'da models klasörü bulundu") - + # Local models klasörünü sil ve symlink oluştur if os.path.exists(local_models_path): if os.path.islink(local_models_path): @@ -41,33 +40,33 @@ def mount_runpod_storage(): else: import shutil shutil.rmtree(local_models_path) - + # Symlink oluştur os.symlink(models_storage_path, local_models_path) logger.info(f"Models klasörü network storage'a bağlandı: {models_storage_path} -> {local_models_path}") - + # Model klasörlerini kontrol et check_model_folders(local_models_path) - + else: logger.warning(f"Network storage'da models klasörü bulunamadı: {models_storage_path}") logger.info("Local models klasörü kullanılacak") - + # Network storage'da models klasörü oluştur os.makedirs(models_storage_path, exist_ok=True) logger.info(f"Network storage'da models klasörü oluşturuldu: {models_storage_path}") - + # Mevcut local models'i network storage'a taşı if os.path.exists(local_models_path) and not os.path.islink(local_models_path): import shutil shutil.copytree(local_models_path, models_storage_path, dirs_exist_ok=True) shutil.rmtree(local_models_path) logger.info("Local models network storage'a taşındı") - + # Symlink oluştur os.symlink(models_storage_path, local_models_path) logger.info("Models klasörü network storage'a bağlandı") - + except Exception as e: logger.error(f"Network storage mount hatası: {e}") logger.info("Local models klasörü kullanılacak") @@ -77,7 +76,7 @@ def check_model_folders(models_path): """Model klasörlerinin varlığını kontrol et""" required_folders = [ 'checkpoints', - 'loras', + 'loras', 'vae', 'controlnet', 'upscale_models', @@ -88,7 +87,7 @@ def check_model_folders(models_path): 'embeddings', 'clip_vision' ] - + for folder in required_folders: folder_path = os.path.join(models_path, folder) if not os.path.exists(folder_path): @@ -109,16 +108,16 @@ def download_essential_models(): try: models_to_download = os.environ.get('DOWNLOAD_MODELS', '').split(',') models_to_download = [m.strip() for m in models_to_download if m.strip()] - + if not models_to_download: logger.info("İndirilecek model belirtilmedi") return - + logger.info(f"İndirilecek modeller: {models_to_download}") - + # Burada model indirme logic'i eklenebilir # Örnek: huggingface-hub kullanarak - + except Exception as e: logger.error(f"Model indirme hatası: {e}") @@ -127,11 +126,11 @@ def setup_environment(): # ComfyUI için gerekli environment variables os.environ['HF_HUB_DISABLE_TELEMETRY'] = '1' os.environ['DO_NOT_TRACK'] = '1' - + # RunPod specific if 'RUNPOD_POD_ID' in os.environ: logger.info(f"RunPod Pod ID: {os.environ['RUNPOD_POD_ID']}") - + # Port ayarı port = os.environ.get('PORT', '8188') os.environ['PORT'] = port @@ -141,43 +140,43 @@ def wait_for_storage(): """Network storage'ın hazır olmasını bekle""" max_wait = 30 # 30 saniye wait_interval = 2 - + network_storage_path = os.environ.get('RUNPOD_NETWORK_STORAGE_PATH', '/runpod-volume') - + for i in range(0, max_wait, wait_interval): if os.path.exists(network_storage_path): logger.info("Network storage hazır") return True - + logger.info(f"Network storage bekleniyor... ({i}/{max_wait}s)") time.sleep(wait_interval) - + logger.warning("Network storage timeout - local storage kullanılacak") return False def main(): """Ana başlangıç fonksiyonu""" logger.info("RunPod ComfyUI başlatılıyor...") - + # Environment setup setup_environment() - + # Network storage'ı bekle wait_for_storage() - + # Network storage mount mount_runpod_storage() - + # Temel modelleri indir (opsiyonel) download_essential_models() - + # ComfyUI'yi başlat logger.info("ComfyUI başlatılıyor...") - + # Port ve listen address port = os.environ.get('PORT', '8188') listen = os.environ.get('LISTEN', '0.0.0.0') - + # ComfyUI command cmd = [ sys.executable, 'main.py', @@ -185,14 +184,14 @@ def main(): '--port', port, '--cpu' # CPU mode for RunPod ] - + # Extra args if os.environ.get('COMFYUI_ARGS'): extra_args = os.environ['COMFYUI_ARGS'].split() cmd.extend(extra_args) - + logger.info(f"ComfyUI komutu: {' '.join(cmd)}") - + # ComfyUI'yi başlat try: subprocess.run(cmd, check=True) @@ -203,4 +202,4 @@ def main(): sys.exit(1) if __name__ == "__main__": - main() \ No newline at end of file + main()