ComfyUI/runpod_handler.py
Bahadir Ciloglu e054031606 Fix Python linting issues and enhance RunPod network storage
- Fix all whitespace and formatting issues in runpod_handler.py
- Remove trailing whitespace and blank line whitespace
- Add comprehensive RunPod network storage setup
- Enhance start_runpod.py with automatic model mounting
- Update Dockerfile for optimized RunPod deployment
- Add detailed setup documentation in runpod_setup.md
- Improve .dockerignore for faster builds
- Add .env to .gitignore for security
2025-11-01 16:24:00 +03:00

245 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
RunPod Serverless Worker Handler for ComfyUI
Optimized for the new ComfyUI features and performance improvements
"""
import os
import json
import time
import logging
import tempfile
import requests
from typing import Dict, Any, Optional
import runpod
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ComfyUIServerlessHandler:
def __init__(self):
self.comfyui_url = "http://127.0.0.1:8000"
self.client_id = "runpod_serverless_worker"
self.setup_paths()
def setup_paths(self):
"""Setup required paths for serverless operation"""
os.makedirs("/tmp/inputs", exist_ok=True)
os.makedirs("/tmp/outputs", exist_ok=True)
os.makedirs("/tmp/comfyui", exist_ok=True)
def wait_for_comfyui(self, timeout: int = 120) -> bool:
"""Wait for ComfyUI to be ready"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{self.comfyui_url}/system_stats")
if response.status_code == 200:
logger.info("ComfyUI is ready")
return True
except requests.exceptions.RequestException:
pass
time.sleep(2)
logger.error(f"ComfyUI not ready after {timeout} seconds")
return False
def download_input_files(self, input_data: Dict[str, Any]) -> Dict[str, str]:
"""Download input files and return local paths"""
local_files = {}
if "input_files" in input_data:
for file_key, file_url in input_data["input_files"].items():
try:
response = requests.get(file_url, timeout=60)
response.raise_for_status()
# Create temporary file
with tempfile.NamedTemporaryFile(
delete=False,
dir="/tmp/inputs",
suffix=os.path.splitext(file_url)[1]
) as tmp_file:
tmp_file.write(response.content)
local_files[file_key] = tmp_file.name
logger.info(f"Downloaded {file_key} to {local_files[file_key]}")
except Exception as e:
logger.error(f"Failed to download {file_key}: {str(e)}")
raise
return local_files
def execute_workflow(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
"""Execute ComfyUI workflow"""
try:
# Queue the workflow
queue_response = requests.post(
f"{self.comfyui_url}/prompt",
json={
"prompt": workflow,
"client_id": self.client_id
},
timeout=30
)
queue_response.raise_for_status()
prompt_id = queue_response.json()["prompt_id"]
logger.info(f"Queued workflow with prompt_id: {prompt_id}")
# Wait for completion
return self.wait_for_completion(prompt_id)
except Exception as e:
logger.error(f"Failed to execute workflow: {str(e)}")
raise
def wait_for_completion(self, prompt_id: str, timeout: int = 300) -> Dict[str, Any]:
"""Wait for workflow completion and return results"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Check queue status
queue_response = requests.get(f"{self.comfyui_url}/queue")
queue_data = queue_response.json()
# Check if our job is still in queue
running = any(item[1]["prompt_id"] == prompt_id for item in queue_data.get("queue_running", []))
pending = any(item[1]["prompt_id"] == prompt_id for item in queue_data.get("queue_pending", []))
if not running and not pending:
# Job completed, get results
history_response = requests.get(f"{self.comfyui_url}/history/{prompt_id}")
if history_response.status_code == 200:
history_data = history_response.json()
if prompt_id in history_data:
return self.process_results(history_data[prompt_id])
time.sleep(2)
except Exception as e:
logger.error(f"Error checking completion: {str(e)}")
time.sleep(5)
raise TimeoutError(f"Workflow {prompt_id} timed out after {timeout} seconds")
def process_results(self, history_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process and upload results"""
results = {
"status": "completed",
"outputs": [],
"metadata": {}
}
if "outputs" in history_data:
for node_id, node_output in history_data["outputs"].items():
if "images" in node_output:
for image_info in node_output["images"]:
# Download image from ComfyUI
image_url = f"{self.comfyui_url}/view"
params = {
"filename": image_info["filename"],
"subfolder": image_info.get("subfolder", ""),
"type": image_info.get("type", "output")
}
try:
image_response = requests.get(image_url, params=params)
image_response.raise_for_status()
# Save to temp file for upload
output_path = f"/tmp/outputs/{image_info['filename']}"
with open(output_path, "wb") as f:
f.write(image_response.content)
results["outputs"].append({
"type": "image",
"filename": image_info["filename"],
"path": output_path,
"node_id": node_id
})
except Exception as e:
logger.error(f"Failed to process image {image_info['filename']}: {str(e)}")
return results
def cleanup(self):
"""Clean up temporary files"""
try:
import shutil
shutil.rmtree("/tmp/inputs", ignore_errors=True)
shutil.rmtree("/tmp/outputs", ignore_errors=True)
os.makedirs("/tmp/inputs", exist_ok=True)
os.makedirs("/tmp/outputs", exist_ok=True)
logger.info("Cleaned up temporary files")
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
def handler(job: Dict[str, Any]) -> Dict[str, Any]:
"""Main serverless handler function"""
handler_instance = ComfyUIServerlessHandler()
try:
# Wait for ComfyUI to be ready
if not handler_instance.wait_for_comfyui():
return {"error": "ComfyUI failed to start"}
# Get job input
job_input = job.get("input", {})
# Download input files if any
local_files = handler_instance.download_input_files(job_input)
# Update workflow with local file paths
workflow = job_input.get("workflow", {})
if local_files and "file_mappings" in job_input:
for node_id, mappings in job_input["file_mappings"].items():
if node_id in workflow:
for input_key, file_key in mappings.items():
if file_key in local_files:
workflow[node_id]["inputs"][input_key] = local_files[file_key]
# Execute workflow
results = handler_instance.execute_workflow(workflow)
# Upload output files to RunPod storage or return base64
output_urls = []
for output in results.get("outputs", []):
if output["type"] == "image":
# For serverless, we typically return base64 or upload to storage
with open(output["path"], "rb") as f:
import base64
image_data = base64.b64encode(f.read()).decode()
output_urls.append({
"filename": output["filename"],
"data": image_data,
"node_id": output["node_id"]
})
return {
"status": "success",
"outputs": output_urls,
"execution_time": time.time() - job.get("start_time", time.time())
}
except Exception as e:
logger.error(f"Handler error: {str(e)}")
return {
"error": str(e),
"status": "failed"
}
finally:
# Always cleanup
handler_instance.cleanup()
if __name__ == "__main__":
# Start the serverless worker
runpod.serverless.start({"handler": handler})