mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-02-11 14:02:37 +08:00
- Fix all whitespace and formatting issues in runpod_handler.py - Remove trailing whitespace and blank line whitespace - Add comprehensive RunPod network storage setup - Enhance start_runpod.py with automatic model mounting - Update Dockerfile for optimized RunPod deployment - Add detailed setup documentation in runpod_setup.md - Improve .dockerignore for faster builds - Add .env to .gitignore for security
286 lines
10 KiB
Python
286 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RunPod Serverless Worker Handler for ComfyUI
|
|
Optimized for the new ComfyUI features and performance improvements
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
import tempfile
|
|
import requests
|
|
import subprocess
|
|
from typing import Dict, Any
|
|
import runpod
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ComfyUIServerlessHandler:
|
|
def __init__(self):
|
|
self.comfyui_url = "http://127.0.0.1:8000"
|
|
self.client_id = "runpod_serverless_worker"
|
|
self.comfyui_process = None
|
|
self.setup_paths()
|
|
self.start_comfyui()
|
|
|
|
def setup_paths(self):
|
|
"""Setup required paths for serverless operation"""
|
|
os.makedirs("/tmp/inputs", exist_ok=True)
|
|
os.makedirs("/tmp/outputs", exist_ok=True)
|
|
os.makedirs("/tmp/comfyui", exist_ok=True)
|
|
|
|
def start_comfyui(self):
|
|
"""Start ComfyUI server in background"""
|
|
try:
|
|
logger.info("Starting ComfyUI server...")
|
|
|
|
# Check if main.py exists
|
|
if not os.path.exists("/workspace/ComfyUI/main.py"):
|
|
logger.error("main.py not found in /workspace/ComfyUI")
|
|
raise FileNotFoundError("ComfyUI main.py not found")
|
|
|
|
# Check if models directory exists (network storage)
|
|
if not os.path.exists("/workspace/ComfyUI/models"):
|
|
logger.warning("Models directory not found, creating symlink to network storage")
|
|
if os.path.exists("/runpod-volume/models"):
|
|
os.symlink("/runpod-volume/models", "/workspace/ComfyUI/models")
|
|
else:
|
|
logger.error("Network storage models not found at /runpod-volume/models")
|
|
|
|
# Start ComfyUI
|
|
self.comfyui_process = subprocess.Popen([
|
|
"python3", "main.py",
|
|
"--listen", "0.0.0.0",
|
|
"--port", "8000",
|
|
"--dont-print-server",
|
|
"--disable-auto-launch"
|
|
], cwd="/workspace/ComfyUI",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
|
|
logger.info("ComfyUI server started")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start ComfyUI: {str(e)}")
|
|
raise
|
|
|
|
def wait_for_comfyui(self, timeout: int = 120) -> bool:
|
|
"""Wait for ComfyUI to be ready"""
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
response = requests.get(f"{self.comfyui_url}/system_stats")
|
|
if response.status_code == 200:
|
|
logger.info("ComfyUI is ready")
|
|
return True
|
|
except requests.exceptions.RequestException:
|
|
pass
|
|
time.sleep(2)
|
|
|
|
logger.error(f"ComfyUI not ready after {timeout} seconds")
|
|
return False
|
|
|
|
def download_input_files(self, input_data: Dict[str, Any]) -> Dict[str, str]:
|
|
"""Download input files and return local paths"""
|
|
local_files = {}
|
|
|
|
if "input_files" in input_data:
|
|
for file_key, file_url in input_data["input_files"].items():
|
|
try:
|
|
response = requests.get(file_url, timeout=60)
|
|
response.raise_for_status()
|
|
|
|
# Create temporary file
|
|
with tempfile.NamedTemporaryFile(
|
|
delete=False,
|
|
dir="/tmp/inputs",
|
|
suffix=os.path.splitext(file_url)[1]
|
|
) as tmp_file:
|
|
tmp_file.write(response.content)
|
|
local_files[file_key] = tmp_file.name
|
|
|
|
logger.info(f"Downloaded {file_key} to {local_files[file_key]}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to download {file_key}: {str(e)}")
|
|
raise
|
|
|
|
return local_files
|
|
|
|
def execute_workflow(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute ComfyUI workflow"""
|
|
try:
|
|
# Queue the workflow
|
|
queue_response = requests.post(
|
|
f"{self.comfyui_url}/prompt",
|
|
json={
|
|
"prompt": workflow,
|
|
"client_id": self.client_id
|
|
},
|
|
timeout=30
|
|
)
|
|
queue_response.raise_for_status()
|
|
|
|
prompt_id = queue_response.json()["prompt_id"]
|
|
logger.info(f"Queued workflow with prompt_id: {prompt_id}")
|
|
|
|
# Wait for completion
|
|
return self.wait_for_completion(prompt_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to execute workflow: {str(e)}")
|
|
raise
|
|
|
|
def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> Dict[str, Any]:
|
|
"""Wait for workflow completion and return results"""
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
try:
|
|
# Check queue status
|
|
queue_response = requests.get(f"{self.comfyui_url}/queue")
|
|
queue_data = queue_response.json()
|
|
|
|
# Check if our job is still in queue
|
|
running = any(item[1]["prompt_id"] == prompt_id for item in queue_data.get("queue_running", []))
|
|
pending = any(item[1]["prompt_id"] == prompt_id for item in queue_data.get("queue_pending", []))
|
|
|
|
if not running and not pending:
|
|
# Job completed, get results
|
|
history_response = requests.get(f"{self.comfyui_url}/history/{prompt_id}")
|
|
if history_response.status_code == 200:
|
|
history_data = history_response.json()
|
|
if prompt_id in history_data:
|
|
return self.process_results(history_data[prompt_id])
|
|
|
|
time.sleep(2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking completion: {str(e)}")
|
|
time.sleep(5)
|
|
|
|
raise TimeoutError(f"Workflow {prompt_id} timed out after {timeout} seconds")
|
|
|
|
def process_results(self, history_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Process and upload results"""
|
|
results = {
|
|
"status": "completed",
|
|
"outputs": [],
|
|
"metadata": {}
|
|
}
|
|
|
|
if "outputs" in history_data:
|
|
for node_id, node_output in history_data["outputs"].items():
|
|
if "images" in node_output:
|
|
for image_info in node_output["images"]:
|
|
# Download image from ComfyUI
|
|
image_url = f"{self.comfyui_url}/view"
|
|
params = {
|
|
"filename": image_info["filename"],
|
|
"subfolder": image_info.get("subfolder", ""),
|
|
"type": image_info.get("type", "output")
|
|
}
|
|
|
|
try:
|
|
image_response = requests.get(image_url, params=params)
|
|
image_response.raise_for_status()
|
|
|
|
# Save to temp file for upload
|
|
output_path = f"/tmp/outputs/{image_info['filename']}"
|
|
with open(output_path, "wb") as f:
|
|
f.write(image_response.content)
|
|
|
|
results["outputs"].append({
|
|
"type": "image",
|
|
"filename": image_info["filename"],
|
|
"path": output_path,
|
|
"node_id": node_id
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process image {image_info['filename']}: {str(e)}")
|
|
|
|
return results
|
|
|
|
def cleanup(self):
|
|
"""Clean up temporary files"""
|
|
try:
|
|
import shutil
|
|
shutil.rmtree("/tmp/inputs", ignore_errors=True)
|
|
shutil.rmtree("/tmp/outputs", ignore_errors=True)
|
|
os.makedirs("/tmp/inputs", exist_ok=True)
|
|
os.makedirs("/tmp/outputs", exist_ok=True)
|
|
logger.info("Cleaned up temporary files")
|
|
except Exception as e:
|
|
logger.error(f"Cleanup failed: {str(e)}")
|
|
|
|
|
|
def handler(job: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Main serverless handler function"""
|
|
handler_instance = ComfyUIServerlessHandler()
|
|
|
|
try:
|
|
# Wait for ComfyUI to be ready
|
|
if not handler_instance.wait_for_comfyui():
|
|
return {"error": "ComfyUI failed to start"}
|
|
|
|
# Get job input
|
|
job_input = job.get("input", {})
|
|
|
|
# Download input files if any
|
|
local_files = handler_instance.download_input_files(job_input)
|
|
|
|
# Update workflow with local file paths
|
|
workflow = job_input.get("workflow", {})
|
|
if local_files and "file_mappings" in job_input:
|
|
for node_id, mappings in job_input["file_mappings"].items():
|
|
if node_id in workflow:
|
|
for input_key, file_key in mappings.items():
|
|
if file_key in local_files:
|
|
workflow[node_id]["inputs"][input_key] = local_files[file_key]
|
|
|
|
# Execute workflow
|
|
results = handler_instance.execute_workflow(workflow)
|
|
|
|
# Upload output files to RunPod storage or return base64
|
|
output_urls = []
|
|
for output in results.get("outputs", []):
|
|
if output["type"] == "image":
|
|
# For serverless, we typically return base64 or upload to storage
|
|
with open(output["path"], "rb") as f:
|
|
import base64
|
|
image_data = base64.b64encode(f.read()).decode()
|
|
output_urls.append({
|
|
"filename": output["filename"],
|
|
"data": image_data,
|
|
"node_id": output["node_id"]
|
|
})
|
|
|
|
return {
|
|
"status": "success",
|
|
"outputs": output_urls,
|
|
"execution_time": time.time() - job.get("start_time", time.time())
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Handler error: {str(e)}")
|
|
return {
|
|
"error": str(e),
|
|
"status": "failed"
|
|
}
|
|
|
|
finally:
|
|
# Always cleanup
|
|
handler_instance.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Start the serverless worker
|
|
runpod.serverless.start({"handler": handler})
|