import errno import os import sys import asyncio import traceback import time import nodes import folder_paths import execution from comfy_execution.jobs import ( JobStatus, get_job, get_all_jobs, validate_job_id, cancel_job, CANCEL_PENDING, CANCEL_RUNNING, ) import uuid import urllib import json import glob import struct import ssl import socket import ipaddress from PIL import Image, ImageOps from PIL.PngImagePlugin import PngInfo from io import BytesIO import aiohttp from aiohttp import web import logging import mimetypes from comfy.cli_args import args from comfy.deploy_environment import get_deploy_environment import comfy.utils import comfy.model_management from comfy_api import feature_flags import node_helpers from comfyui_version import __version__ from app.frontend_management import FrontendManager, parse_version from comfy_api.internal import _ComfyNodeInternal from app.assets.seeder import asset_seeder from app.assets.api.routes import register_assets_routes from app.assets.services.ingest import register_file_in_place from app.assets.services.asset_management import resolve_hash_to_path from app.model_downloader.api.routes import register_routes as register_model_downloader_routes from app.user_manager import UserManager from app.model_manager import ModelFileManager from app.custom_node_manager import CustomNodeManager from app.subgraph_manager import SubgraphManager from app.node_replace_manager import NodeReplaceManager from typing import Optional, Union from api_server.routes.internal.internal_routes import InternalRoutes from protocol import BinaryEventTypes # Import cache control middleware from middleware.cache_middleware import cache_control if args.enable_manager: import comfyui_manager def _remove_sensitive_from_queue(queue: list) -> list: """Remove sensitive data (index 5) from queue item tuples.""" return [item[:5] for item in queue] async def send_socket_catch_exception(function, message): try: await function(message) except (aiohttp.ClientError, aiohttp.ClientPayloadError, ConnectionResetError, BrokenPipeError, ConnectionError) as err: logging.warning("send error: {}".format(err)) # Track deprecated paths that have been warned about to only warn once per file _deprecated_paths_warned = set() @web.middleware async def deprecation_warning(request: web.Request, handler): """Middleware to warn about deprecated frontend API paths""" path = request.path if path.startswith("/scripts/ui") or path.startswith("/extensions/core/"): # Only warn once per unique file path if path not in _deprecated_paths_warned: _deprecated_paths_warned.add(path) logging.warning( f"[DEPRECATION WARNING] Detected import of deprecated legacy API: {path}. " f"This is likely caused by a custom node extension using outdated APIs. " f"Please update your extensions or contact the extension author for an updated version." ) response: web.Response = await handler(request) return response @web.middleware async def compress_body(request: web.Request, handler): accept_encoding = request.headers.get("Accept-Encoding", "") response: web.Response = await handler(request) if not isinstance(response, web.Response): return response if response.content_type not in ["application/json", "text/plain"]: return response if response.body and "gzip" in accept_encoding: response.enable_compression() return response def create_cors_middleware(allowed_origin: str): @web.middleware async def cors_middleware(request: web.Request, handler): if request.method == "OPTIONS": # Pre-flight request. Reply successfully: response = web.Response() else: response = await handler(request) response.headers['Access-Control-Allow-Origin'] = allowed_origin response.headers['Access-Control-Allow-Methods'] = 'POST, GET, DELETE, PUT, OPTIONS, PATCH' response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization' response.headers['Access-Control-Allow-Credentials'] = 'true' return response return cors_middleware def is_loopback(host): if host is None: return False try: if ipaddress.ip_address(host).is_loopback: return True else: return False except: pass loopback = False for family in (socket.AF_INET, socket.AF_INET6): try: r = socket.getaddrinfo(host, None, family, socket.SOCK_STREAM) for family, _, _, _, sockaddr in r: if not ipaddress.ip_address(sockaddr[0]).is_loopback: return loopback else: loopback = True except socket.gaierror: pass return loopback def create_origin_only_middleware(): @web.middleware async def origin_only_middleware(request: web.Request, handler): if 'Sec-Fetch-Site' in request.headers: sec_fetch_site = request.headers['Sec-Fetch-Site'] if sec_fetch_site == 'cross-site': return web.Response(status=403) #this code is used to prevent the case where a random website can queue comfy workflows by making a POST to 127.0.0.1 which browsers don't prevent for some dumb reason. #in that case the Host and Origin hostnames won't match #I know the proper fix would be to add a cookie but this should take care of the problem in the meantime if 'Host' in request.headers and 'Origin' in request.headers: host = request.headers['Host'] origin = request.headers['Origin'] host_domain = host.lower() parsed = urllib.parse.urlparse(origin) origin_domain = parsed.netloc.lower() host_domain_parsed = urllib.parse.urlsplit('//' + host_domain) #limit the check to when the host domain is localhost, this makes it slightly less safe but should still prevent the exploit loopback = is_loopback(host_domain_parsed.hostname) if parsed.port is None: #if origin doesn't have a port strip it from the host to handle weird browsers, same for host host_domain = host_domain_parsed.hostname if host_domain_parsed.port is None: origin_domain = parsed.hostname if loopback and host_domain is not None and origin_domain is not None and len(host_domain) > 0 and len(origin_domain) > 0: if host_domain != origin_domain: logging.warning("WARNING: request with non matching host and origin {} != {}, returning 403".format(host_domain, origin_domain)) return web.Response(status=403) if request.method == "OPTIONS": response = web.Response() else: response = await handler(request) return response return origin_only_middleware def create_block_external_middleware(): @web.middleware async def block_external_middleware(request: web.Request, handler): if request.method == "OPTIONS": # Pre-flight request. Reply successfully: response = web.Response() else: response = await handler(request) response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval' blob:; style-src 'self' 'unsafe-inline'; img-src 'self' data: blob:; font-src 'self'; connect-src 'self' data:; frame-src 'self'; object-src 'self';" return response return block_external_middleware class PromptServer(): def __init__(self, loop): PromptServer.instance = self self.user_manager = UserManager() self.model_file_manager = ModelFileManager() self.custom_node_manager = CustomNodeManager() self.subgraph_manager = SubgraphManager() self.node_replace_manager = NodeReplaceManager() self.internal_routes = InternalRoutes(self) self.supports = ["custom_nodes_from_web"] self.prompt_queue = execution.PromptQueue(self) self.loop = loop self.messages = asyncio.Queue() self.client_session:Optional[aiohttp.ClientSession] = None self.number = 0 middlewares = [cache_control, deprecation_warning] if args.enable_compress_response_body: middlewares.append(compress_body) if args.enable_cors_header: middlewares.append(create_cors_middleware(args.enable_cors_header)) else: middlewares.append(create_origin_only_middleware()) if args.disable_api_nodes: middlewares.append(create_block_external_middleware()) if args.enable_manager: middlewares.append(comfyui_manager.create_middleware()) max_upload_size = round(args.max_upload_size * 1024 * 1024) self.app = web.Application(client_max_size=max_upload_size, middlewares=middlewares) self.sockets = dict() self.sockets_metadata = dict() self.web_root = ( FrontendManager.init_frontend(args.front_end_version) if args.front_end_root is None else args.front_end_root ) logging.info(f"[Prompt Server] web root: {self.web_root}") if args.enable_assets: register_assets_routes(self.app, self.user_manager) else: register_assets_routes(self.app) asset_seeder.disable() register_model_downloader_routes(self.app) routes = web.RouteTableDef() self.routes = routes self.last_node_id = None self.client_id = None self.on_prompt_handlers = [] @routes.get('/ws') async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) sid = request.rel_url.query.get('clientId', '') if sid: # Reusing existing session, remove old self.sockets.pop(sid, None) else: sid = uuid.uuid4().hex # Store WebSocket for backward compatibility self.sockets[sid] = ws # Store metadata separately self.sockets_metadata[sid] = {"feature_flags": {}} try: # Send initial state to the new client await self.send("status", {"status": self.get_queue_info(), "sid": sid}, sid) # On reconnect if we are the currently executing client send the current node if self.client_id == sid and self.last_node_id is not None: await self.send("executing", { "node": self.last_node_id }, sid) # Flag to track if we've received the first message first_message = True async for msg in ws: if msg.type == aiohttp.WSMsgType.ERROR: logging.warning('ws connection closed with exception %s' % ws.exception()) elif msg.type == aiohttp.WSMsgType.TEXT: try: data = json.loads(msg.data) # Check if first message is feature flags if first_message and data.get("type") == "feature_flags": # Store client feature flags client_flags = data.get("data", {}) self.sockets_metadata[sid]["feature_flags"] = client_flags # Send server feature flags in response await self.send( "feature_flags", feature_flags.get_server_features(), sid, ) logging.debug( f"Feature flags negotiated for client {sid}: {client_flags}" ) first_message = False except json.JSONDecodeError: logging.warning( f"Invalid JSON received from client {sid}: {msg.data}" ) except Exception as e: logging.error(f"Error processing WebSocket message: {e}") finally: self.sockets.pop(sid, None) self.sockets_metadata.pop(sid, None) return ws @routes.get("/") async def get_root(request): response = web.FileResponse(os.path.join(self.web_root, "index.html")) response.headers['Cache-Control'] = 'no-store, must-revalidate' response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response @routes.get("/embeddings") def get_embeddings(request): embeddings = folder_paths.get_filename_list("embeddings") return web.json_response(list(map(lambda a: os.path.splitext(a)[0], embeddings))) @routes.get("/models") def list_model_types(request): model_types = list(folder_paths.folder_names_and_paths.keys()) return web.json_response(model_types) @routes.get("/models/{folder}") async def get_models(request): folder = request.match_info.get("folder", None) if folder not in folder_paths.folder_names_and_paths: return web.Response(status=404) files = folder_paths.get_filename_list(folder) return web.json_response(files) @routes.get("/extensions") async def get_extensions(request): files = glob.glob(os.path.join( glob.escape(self.web_root), 'extensions/**/*.js'), recursive=True) extensions = list(map(lambda f: "/" + os.path.relpath(f, self.web_root).replace("\\", "/"), files)) for name, dir in nodes.EXTENSION_WEB_DIRS.items(): files = glob.glob(os.path.join(glob.escape(dir), '**/*.js'), recursive=True) extensions.extend(list(map(lambda f: "/extensions/" + urllib.parse.quote( name) + "/" + os.path.relpath(f, dir).replace("\\", "/"), files))) return web.json_response(extensions) def get_dir_by_type(dir_type): if dir_type is None: dir_type = "input" if dir_type == "input": type_dir = folder_paths.get_input_directory() elif dir_type == "temp": type_dir = folder_paths.get_temp_directory() elif dir_type == "output": type_dir = folder_paths.get_output_directory() return type_dir, dir_type def compare_image_hash(filepath, image): hasher = node_helpers.hasher() # function to compare hashes of two images to see if it already exists, fix to #3465 if os.path.exists(filepath): a = hasher() b = hasher() with open(filepath, "rb") as f: a.update(f.read()) b.update(image.file.read()) image.file.seek(0) return a.hexdigest() == b.hexdigest() return False def image_upload(post, image_save_function=None): image = post.get("image") overwrite = post.get("overwrite") image_is_duplicate = False image_upload_type = post.get("type") upload_dir, image_upload_type = get_dir_by_type(image_upload_type) if image and image.file: filename = image.filename if not filename: return web.Response(status=400) subfolder = post.get("subfolder", "") full_output_folder = os.path.join(upload_dir, os.path.normpath(subfolder)) filepath = os.path.abspath(os.path.join(full_output_folder, filename)) if os.path.commonpath((upload_dir, filepath)) != upload_dir: return web.Response(status=400) if not os.path.exists(full_output_folder): os.makedirs(full_output_folder) split = os.path.splitext(filename) if overwrite is not None and (overwrite == "true" or overwrite == "1"): pass else: i = 1 while os.path.exists(filepath): if compare_image_hash(filepath, image): #compare hash to prevent saving of duplicates with same name, fix for #3465 image_is_duplicate = True break filename = f"{split[0]} ({i}){split[1]}" filepath = os.path.join(full_output_folder, filename) i += 1 if not image_is_duplicate: if image_save_function is not None: image_save_function(image, post, filepath) else: with open(filepath, "wb") as f: f.write(image.file.read()) resp = {"name" : filename, "subfolder": subfolder, "type": image_upload_type} if args.enable_assets: try: tag = image_upload_type if image_upload_type in ("input", "output") else "input" result = register_file_in_place(abs_path=filepath, name=filename, tags=[tag]) resp["asset"] = { "id": result.ref.id, "name": result.ref.name, "asset_hash": result.asset.hash, "size": result.asset.size_bytes, "mime_type": result.asset.mime_type, "tags": result.tags, } except Exception: logging.warning("Failed to register uploaded image as asset", exc_info=True) return web.json_response(resp) else: return web.Response(status=400) @routes.post("/upload/image") async def upload_image(request): post = await request.post() return image_upload(post) @routes.post("/upload/mask") async def upload_mask(request): post = await request.post() def image_save_function(image, post, filepath): original_ref = json.loads(post.get("original_ref")) filename, output_dir = folder_paths.annotated_filepath(original_ref['filename']) if not filename: return web.Response(status=400) # validation for security: prevent accessing arbitrary path if filename[0] == '/' or '..' in filename: return web.Response(status=400) if output_dir is None: type = original_ref.get("type", "output") output_dir = folder_paths.get_directory_by_type(type) if output_dir is None: return web.Response(status=400) if original_ref.get("subfolder", "") != "": full_output_dir = os.path.join(output_dir, original_ref["subfolder"]) if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: return web.Response(status=403) output_dir = full_output_dir file = os.path.join(output_dir, filename) if os.path.isfile(file): with Image.open(file) as original_pil: metadata = PngInfo() if hasattr(original_pil,'text'): for key in original_pil.text: metadata.add_text(key, original_pil.text[key]) original_pil = original_pil.convert('RGBA') mask_pil = Image.open(image.file).convert('RGBA') # alpha copy new_alpha = mask_pil.getchannel('A') original_pil.putalpha(new_alpha) original_pil.save(filepath, compress_level=4, pnginfo=metadata) return image_upload(post, image_save_function) @routes.get("/view") async def view_image(request): if "filename" in request.rel_url.query: filename = request.rel_url.query["filename"] # The frontend's LoadImage combo widget uses asset_hash values # (e.g. "blake3:...") as widget values. When litegraph renders the # node preview, it constructs /view?filename=, so this # endpoint must resolve blake3 hashes to their on-disk file paths. if filename.startswith("blake3:"): owner_id = self.user_manager.get_request_user_id(request) result = resolve_hash_to_path(filename, owner_id=owner_id) if result is None: return web.Response(status=404) file, filename, resolved_content_type = result.abs_path, result.download_name, result.content_type else: resolved_content_type = None filename, output_dir = folder_paths.annotated_filepath(filename) if not filename: return web.Response(status=400) # validation for security: prevent accessing arbitrary path if filename[0] == '/' or '..' in filename: return web.Response(status=400) if output_dir is None: type = request.rel_url.query.get("type", "output") output_dir = folder_paths.get_directory_by_type(type) if output_dir is None: return web.Response(status=400) if "subfolder" in request.rel_url.query: full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"]) if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: return web.Response(status=403) output_dir = full_output_dir filename = os.path.basename(filename) file = os.path.join(output_dir, filename) if os.path.isfile(file): if 'preview' in request.rel_url.query: with Image.open(file) as img: preview_info = request.rel_url.query['preview'].split(';') image_format = preview_info[0] if image_format not in ['webp', 'jpeg'] or 'a' in request.rel_url.query.get('channel', ''): image_format = 'webp' quality = 90 if preview_info[-1].isdigit(): quality = int(preview_info[-1]) buffer = BytesIO() if image_format in ['jpeg'] or request.rel_url.query.get('channel', '') == 'rgb': img = img.convert("RGB") img.save(buffer, format=image_format, quality=quality) buffer.seek(0) return web.Response(body=buffer.read(), content_type=f'image/{image_format}', headers={"Content-Disposition": f"filename=\"{filename}\""}) if 'channel' not in request.rel_url.query: channel = 'rgba' else: channel = request.rel_url.query["channel"] if channel == 'rgb': with Image.open(file) as img: if img.mode == "RGBA": r, g, b, a = img.split() new_img = Image.merge('RGB', (r, g, b)) else: new_img = img.convert("RGB") buffer = BytesIO() new_img.save(buffer, format='PNG') buffer.seek(0) return web.Response(body=buffer.read(), content_type='image/png', headers={"Content-Disposition": f"filename=\"{filename}\""}) elif channel == 'a': with Image.open(file) as img: if img.mode == "RGBA": _, _, _, a = img.split() else: a = Image.new('L', img.size, 255) # alpha img alpha_img = Image.new('RGBA', img.size) alpha_img.putalpha(a) alpha_buffer = BytesIO() alpha_img.save(alpha_buffer, format='PNG') alpha_buffer.seek(0) return web.Response(body=alpha_buffer.read(), content_type='image/png', headers={"Content-Disposition": f"filename=\"{filename}\""}) else: # Use the content type from asset resolution if available, # otherwise guess from the filename. content_type = ( resolved_content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream' ) # For security, force renderable/active types (HTML, JS, # CSS, SVG, XML — anything that can carry inline