diff --git a/comfy_api/latest/_ui.py b/comfy_api/latest/_ui.py index e238cdf3c..5d2b931df 100644 --- a/comfy_api/latest/_ui.py +++ b/comfy_api/latest/_ui.py @@ -65,6 +65,22 @@ class SavedAudios(_UIOutput): return {"audio": self.results} +def _is_isolated_child() -> bool: + return os.environ.get("PYISOLATE_CHILD") == "1" + + +def _get_preview_folder_type() -> FolderType: + if _is_isolated_child(): + return FolderType.output + return FolderType.temp + + +def _get_preview_route_prefix(folder_type: FolderType) -> str: + if folder_type == FolderType.output: + return "output" + return "temp" + + def _get_directory_by_folder_type(folder_type: FolderType) -> str: if folder_type == FolderType.input: return folder_paths.get_input_directory() @@ -388,10 +404,11 @@ class AudioSaveHelper: class PreviewImage(_UIOutput): def __init__(self, image: Image.Type, animated: bool = False, cls: type[ComfyNode] = None, **kwargs): + folder_type = _get_preview_folder_type() self.values = ImageSaveHelper.save_images( image, filename_prefix="ComfyUI_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for _ in range(5)), - folder_type=FolderType.temp, + folder_type=folder_type, cls=cls, compress_level=1, ) @@ -412,10 +429,11 @@ class PreviewMask(PreviewImage): class PreviewAudio(_UIOutput): def __init__(self, audio: dict, cls: type[ComfyNode] = None, **kwargs): + folder_type = _get_preview_folder_type() self.values = AudioSaveHelper.save_audio( audio, filename_prefix="ComfyUI_temp_" + "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(5)), - folder_type=FolderType.temp, + folder_type=folder_type, cls=cls, format="flac", quality="128k", @@ -438,15 +456,16 @@ class PreviewUI3D(_UIOutput): self.model_file = model_file self.camera_info = camera_info self.bg_image_path = None + folder_type = _get_preview_folder_type() bg_image = kwargs.get("bg_image", None) if bg_image is not None: img_array = (bg_image[0].cpu().numpy() * 255).astype(np.uint8) img = PILImage.fromarray(img_array) - temp_dir = folder_paths.get_temp_directory() + preview_dir = _get_directory_by_folder_type(folder_type) filename = f"bg_{uuid.uuid4().hex}.png" - bg_image_path = os.path.join(temp_dir, filename) + bg_image_path = os.path.join(preview_dir, filename) img.save(bg_image_path, compress_level=1) - self.bg_image_path = f"temp/{filename}" + self.bg_image_path = f"{_get_preview_route_prefix(folder_type)}/{filename}" def as_dict(self): return {"result": [self.model_file, self.camera_info, self.bg_image_path]} diff --git a/execution.py b/execution.py index d827ab3d1..6c8d43919 100644 --- a/execution.py +++ b/execution.py @@ -42,6 +42,7 @@ from comfy_execution.progress import get_progress_state, reset_progress_state, a from comfy_execution.utils import CurrentNodeContext from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func from comfy_api.latest import io, _io +from comfy_execution.cache_provider import _has_cache_providers, _get_cache_providers, _logger as _cache_logger _AIMDO_VBAR_RESET_UNAVAILABLE_LOGGED = False @@ -130,15 +131,15 @@ class CacheSet: # Performs like the old cache -- dump data ASAP def init_classic_cache(self): - self.outputs = HierarchicalCache(CacheKeySetInputSignature) + self.outputs = HierarchicalCache(CacheKeySetInputSignature, enable_providers=True) self.objects = HierarchicalCache(CacheKeySetID) def init_lru_cache(self, cache_size): - self.outputs = LRUCache(CacheKeySetInputSignature, max_size=cache_size) + self.outputs = LRUCache(CacheKeySetInputSignature, max_size=cache_size, enable_providers=True) self.objects = HierarchicalCache(CacheKeySetID) def init_ram_cache(self, min_headroom): - self.outputs = RAMPressureCache(CacheKeySetInputSignature) + self.outputs = RAMPressureCache(CacheKeySetInputSignature, enable_providers=True) self.objects = HierarchicalCache(CacheKeySetID) def init_null_cache(self): @@ -433,7 +434,7 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed, inputs = dynprompt.get_node(unique_id)['inputs'] class_type = dynprompt.get_node(unique_id)['class_type'] class_def = nodes.NODE_CLASS_MAPPINGS[class_type] - cached = caches.outputs.get(unique_id) + cached = await caches.outputs.get(unique_id) if cached is not None: if server.client_id is not None: cached_ui = cached.ui or {} @@ -489,10 +490,10 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed, server.last_node_id = display_node_id server.send_sync("executing", { "node": unique_id, "display_node": display_node_id, "prompt_id": prompt_id }, server.client_id) - obj = caches.objects.get(unique_id) + obj = await caches.objects.get(unique_id) if obj is None: obj = class_def() - caches.objects.set(unique_id, obj) + await caches.objects.set(unique_id, obj) if issubclass(class_def, _ComfyNodeInternal): lazy_status_present = first_real_override(class_def, "check_lazy_status") is not None @@ -536,7 +537,15 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed, GraphBuilder.set_default_prefix(unique_id, call_index, 0) try: - output_data, output_ui, has_subgraph, has_pending_tasks = await get_output_data(prompt_id, unique_id, obj, input_data_all, execution_block_cb=execution_block_cb, pre_execute_cb=pre_execute_cb, v3_data=v3_data) + output_data, output_ui, has_subgraph, has_pending_tasks = await get_output_data( + prompt_id, + unique_id, + obj, + input_data_all, + execution_block_cb=execution_block_cb, + pre_execute_cb=pre_execute_cb, + v3_data=v3_data, + ) finally: if comfy.memory_management.aimdo_enabled: if args.verbose == "DEBUG": @@ -621,7 +630,7 @@ async def execute(server, dynprompt, caches, current_item, extra_data, executed, cache_entry = CacheEntry(ui=ui_outputs.get(unique_id), outputs=output_data) execution_list.cache_update(unique_id, cache_entry) - caches.outputs.set(unique_id, cache_entry) + await caches.outputs.set(unique_id, cache_entry) except comfy.model_management.InterruptProcessingException as iex: logging.info("Processing interrupted") @@ -685,7 +694,7 @@ class PromptExecutor: return try: from comfy.isolation import notify_execution_graph - await notify_execution_graph(class_types) + await notify_execution_graph(class_types, caches=self.caches.all) except Exception: if fail_loud: raise @@ -757,6 +766,19 @@ class PromptExecutor: } self.add_message("execution_error", mes, broadcast=False) + def _notify_prompt_lifecycle(self, event: str, prompt_id: str): + if not _has_cache_providers(): + return + + for provider in _get_cache_providers(): + try: + if event == "start": + provider.on_prompt_start(prompt_id) + elif event == "end": + provider.on_prompt_end(prompt_id) + except Exception as e: + _cache_logger.warning(f"Cache provider {provider.__class__.__name__} error on {event}: {e}") + def execute(self, prompt, prompt_id, extra_data={}, execute_outputs=[]): asyncio.run(self.execute_async(prompt, prompt_id, extra_data, execute_outputs)) @@ -785,98 +807,106 @@ class PromptExecutor: self.status_messages = [] self.add_message("execution_start", { "prompt_id": prompt_id}, broadcast=False) - with torch.inference_mode(): - if args.use_process_isolation: - try: - # Boundary cleanup runs at the start of the next workflow in - # isolation mode, matching non-isolated "next prompt" timing. - self.caches = CacheSet(cache_type=self.cache_type, cache_args=self.cache_args) - await self._wait_model_patcher_quiescence_safe( - fail_loud=False, - timeout_ms=120000, - marker="EX:boundary_cleanup_wait_idle", - ) - await self._flush_running_extensions_transport_state_safe() - comfy.model_management.unload_all_models() - comfy.model_management.cleanup_models_gc() - comfy.model_management.cleanup_models() - gc.collect() - comfy.model_management.soft_empty_cache() - except Exception: - logging.debug("][ EX:isolation_boundary_cleanup_start failed", exc_info=True) - - dynamic_prompt = DynamicPrompt(prompt) - reset_progress_state(prompt_id, dynamic_prompt) - add_progress_handler(WebUIProgressHandler(self.server)) - is_changed_cache = IsChangedCache(prompt_id, dynamic_prompt, self.caches.outputs) - for cache in self.caches.all: - await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache) - cache.clean_unused() - - cached_nodes = [] - for node_id in prompt: - if self.caches.outputs.get(node_id) is not None: - cached_nodes.append(node_id) - - comfy.model_management.cleanup_models_gc() - self.add_message("execution_cached", - { "nodes": cached_nodes, "prompt_id": prompt_id}, - broadcast=False) - pending_subgraph_results = {} - pending_async_nodes = {} # TODO - Unify this with pending_subgraph_results - ui_node_outputs = {} - executed = set() - execution_list = ExecutionList(dynamic_prompt, self.caches.outputs) - current_outputs = self.caches.outputs.all_node_ids() - for node_id in list(execute_outputs): - execution_list.add_node(node_id) - - if args.use_process_isolation: - pending_class_types = set() - for node_id in execution_list.pendingNodes.keys(): - class_type = dynamic_prompt.get_node(node_id)["class_type"] - pending_class_types.add(class_type) + if args.use_process_isolation: + try: + # Boundary cleanup runs at the start of the next workflow in + # isolation mode, matching non-isolated "next prompt" timing. + self.caches = CacheSet(cache_type=self.cache_type, cache_args=self.cache_args) await self._wait_model_patcher_quiescence_safe( - fail_loud=True, + fail_loud=False, timeout_ms=120000, - marker="EX:notify_graph_wait_idle", + marker="EX:boundary_cleanup_wait_idle", ) - await self._notify_execution_graph_safe(pending_class_types, fail_loud=True) - - while not execution_list.is_empty(): - node_id, error, ex = await execution_list.stage_node_execution() - if error is not None: - self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex) - break - - assert node_id is not None, "Node ID should not be None at this point" - result, error, ex = await execute(self.server, dynamic_prompt, self.caches, node_id, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_node_outputs) - self.success = result != ExecutionResult.FAILURE - if result == ExecutionResult.FAILURE: - self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex) - break - elif result == ExecutionResult.PENDING: - execution_list.unstage_node_execution() - else: # result == ExecutionResult.SUCCESS: - execution_list.complete_node_execution() - self.caches.outputs.poll(ram_headroom=self.cache_args["ram"]) - else: - # Only execute when the while-loop ends without break - self.add_message("execution_success", { "prompt_id": prompt_id }, broadcast=False) - - ui_outputs = {} - meta_outputs = {} - for node_id, ui_info in ui_node_outputs.items(): - ui_outputs[node_id] = ui_info["output"] - meta_outputs[node_id] = ui_info["meta"] - self.history_result = { - "outputs": ui_outputs, - "meta": meta_outputs, - } - comfy.model_management.cleanup_models_gc() - self.server.last_node_id = None - if comfy.model_management.DISABLE_SMART_MEMORY: + await self._flush_running_extensions_transport_state_safe() comfy.model_management.unload_all_models() + comfy.model_management.cleanup_models_gc() + comfy.model_management.cleanup_models() + gc.collect() + comfy.model_management.soft_empty_cache() + except Exception: + logging.debug("][ EX:isolation_boundary_cleanup_start failed", exc_info=True) + + self._notify_prompt_lifecycle("start", prompt_id) + + try: + with torch.inference_mode(): + dynamic_prompt = DynamicPrompt(prompt) + reset_progress_state(prompt_id, dynamic_prompt) + add_progress_handler(WebUIProgressHandler(self.server)) + is_changed_cache = IsChangedCache(prompt_id, dynamic_prompt, self.caches.outputs) + for cache in self.caches.all: + await cache.set_prompt(dynamic_prompt, prompt.keys(), is_changed_cache) + cache.clean_unused() + + node_ids = list(prompt.keys()) + cache_results = await asyncio.gather( + *(self.caches.outputs.get(node_id) for node_id in node_ids) + ) + cached_nodes = [ + node_id for node_id, result in zip(node_ids, cache_results) + if result is not None + ] + + comfy.model_management.cleanup_models_gc() + self.add_message("execution_cached", + { "nodes": cached_nodes, "prompt_id": prompt_id}, + broadcast=False) + pending_subgraph_results = {} + pending_async_nodes = {} # TODO - Unify this with pending_subgraph_results + ui_node_outputs = {} + executed = set() + execution_list = ExecutionList(dynamic_prompt, self.caches.outputs) + current_outputs = self.caches.outputs.all_node_ids() + for node_id in list(execute_outputs): + execution_list.add_node(node_id) + + if args.use_process_isolation: + pending_class_types = set() + for node_id in execution_list.pendingNodes.keys(): + class_type = dynamic_prompt.get_node(node_id)["class_type"] + pending_class_types.add(class_type) + await self._wait_model_patcher_quiescence_safe( + fail_loud=True, + timeout_ms=120000, + marker="EX:notify_graph_wait_idle", + ) + await self._notify_execution_graph_safe(pending_class_types, fail_loud=True) + + while not execution_list.is_empty(): + node_id, error, ex = await execution_list.stage_node_execution() + if error is not None: + self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex) + break + + assert node_id is not None, "Node ID should not be None at this point" + result, error, ex = await execute(self.server, dynamic_prompt, self.caches, node_id, extra_data, executed, prompt_id, execution_list, pending_subgraph_results, pending_async_nodes, ui_node_outputs) + self.success = result != ExecutionResult.FAILURE + if result == ExecutionResult.FAILURE: + self.handle_execution_error(prompt_id, dynamic_prompt.original_prompt, current_outputs, executed, error, ex) + break + elif result == ExecutionResult.PENDING: + execution_list.unstage_node_execution() + else: # result == ExecutionResult.SUCCESS: + execution_list.complete_node_execution() + self.caches.outputs.poll(ram_headroom=self.cache_args["ram"]) + else: + # Only execute when the while-loop ends without break + self.add_message("execution_success", { "prompt_id": prompt_id }, broadcast=False) + + ui_outputs = {} + meta_outputs = {} + for node_id, ui_info in ui_node_outputs.items(): + ui_outputs[node_id] = ui_info["output"] + meta_outputs[node_id] = ui_info["meta"] + self.history_result = { + "outputs": ui_outputs, + "meta": meta_outputs, + } + self.server.last_node_id = None + if comfy.model_management.DISABLE_SMART_MEMORY: + comfy.model_management.unload_all_models() + finally: + self._notify_prompt_lifecycle("end", prompt_id) async def validate_inputs(prompt_id, prompt, item, validated): diff --git a/server.py b/server.py index 4c4779934..06b5cab9d 100644 --- a/server.py +++ b/server.py @@ -34,6 +34,8 @@ from app.frontend_management import FrontendManager, parse_version from comfy_api.internal import _ComfyNodeInternal from app.assets.seeder import asset_seeder from app.assets.api.routes import register_assets_routes +from app.assets.services.ingest import register_file_in_place +from app.assets.services.asset_management import resolve_hash_to_path from app.user_manager import UserManager from app.model_manager import ModelFileManager @@ -311,7 +313,7 @@ class PromptServer(): @routes.get("/") async def get_root(request): response = web.FileResponse(os.path.join(self.web_root, "index.html")) - response.headers['Cache-Control'] = 'no-cache' + response.headers['Cache-Control'] = 'no-store, must-revalidate' response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response @@ -347,6 +349,18 @@ class PromptServer(): extensions.extend(list(map(lambda f: "/extensions/" + urllib.parse.quote( name) + "/" + os.path.relpath(f, dir).replace("\\", "/"), files))) + # Include JS files from proxied web directories (isolated nodes) + if args.use_process_isolation: + from comfy.isolation.proxies.web_directory_proxy import get_web_directory_cache + cache = get_web_directory_cache() + for ext_name in cache.extension_names: + for entry in cache.list_files(ext_name): + if entry["relative_path"].endswith(".js"): + extensions.append( + "/extensions/" + urllib.parse.quote(ext_name) + + "/" + entry["relative_path"] + ) + return web.json_response(extensions) def get_dir_by_type(dir_type): @@ -420,7 +434,24 @@ class PromptServer(): with open(filepath, "wb") as f: f.write(image.file.read()) - return web.json_response({"name" : filename, "subfolder": subfolder, "type": image_upload_type}) + resp = {"name" : filename, "subfolder": subfolder, "type": image_upload_type} + + if args.enable_assets: + try: + tag = image_upload_type if image_upload_type in ("input", "output") else "input" + result = register_file_in_place(abs_path=filepath, name=filename, tags=[tag]) + resp["asset"] = { + "id": result.ref.id, + "name": result.ref.name, + "asset_hash": result.asset.hash, + "size": result.asset.size_bytes, + "mime_type": result.asset.mime_type, + "tags": result.tags, + } + except Exception: + logging.warning("Failed to register uploaded image as asset", exc_info=True) + + return web.json_response(resp) else: return web.Response(status=400) @@ -480,30 +511,43 @@ class PromptServer(): async def view_image(request): if "filename" in request.rel_url.query: filename = request.rel_url.query["filename"] - filename, output_dir = folder_paths.annotated_filepath(filename) - if not filename: - return web.Response(status=400) + # The frontend's LoadImage combo widget uses asset_hash values + # (e.g. "blake3:...") as widget values. When litegraph renders the + # node preview, it constructs /view?filename=, so this + # endpoint must resolve blake3 hashes to their on-disk file paths. + if filename.startswith("blake3:"): + owner_id = self.user_manager.get_request_user_id(request) + result = resolve_hash_to_path(filename, owner_id=owner_id) + if result is None: + return web.Response(status=404) + file, filename, resolved_content_type = result.abs_path, result.download_name, result.content_type + else: + resolved_content_type = None + filename, output_dir = folder_paths.annotated_filepath(filename) - # validation for security: prevent accessing arbitrary path - if filename[0] == '/' or '..' in filename: - return web.Response(status=400) + if not filename: + return web.Response(status=400) - if output_dir is None: - type = request.rel_url.query.get("type", "output") - output_dir = folder_paths.get_directory_by_type(type) + # validation for security: prevent accessing arbitrary path + if filename[0] == '/' or '..' in filename: + return web.Response(status=400) - if output_dir is None: - return web.Response(status=400) + if output_dir is None: + type = request.rel_url.query.get("type", "output") + output_dir = folder_paths.get_directory_by_type(type) - if "subfolder" in request.rel_url.query: - full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"]) - if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: - return web.Response(status=403) - output_dir = full_output_dir + if output_dir is None: + return web.Response(status=400) - filename = os.path.basename(filename) - file = os.path.join(output_dir, filename) + if "subfolder" in request.rel_url.query: + full_output_dir = os.path.join(output_dir, request.rel_url.query["subfolder"]) + if os.path.commonpath((os.path.abspath(full_output_dir), output_dir)) != output_dir: + return web.Response(status=403) + output_dir = full_output_dir + + filename = os.path.basename(filename) + file = os.path.join(output_dir, filename) if os.path.isfile(file): if 'preview' in request.rel_url.query: @@ -563,8 +607,13 @@ class PromptServer(): return web.Response(body=alpha_buffer.read(), content_type='image/png', headers={"Content-Disposition": f"filename=\"{filename}\""}) else: - # Get content type from mimetype, defaulting to 'application/octet-stream' - content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' + # Use the content type from asset resolution if available, + # otherwise guess from the filename. + content_type = ( + resolved_content_type + or mimetypes.guess_type(filename)[0] + or 'application/octet-stream' + ) # For security, force certain mimetypes to download instead of display if content_type in {'text/html', 'text/html-sandboxed', 'application/xhtml+xml', 'text/javascript', 'text/css'}: @@ -1022,6 +1071,36 @@ class PromptServer(): for name, dir in nodes.EXTENSION_WEB_DIRS.items(): self.app.add_routes([web.static('/extensions/' + name, dir)]) + # Add dynamic handler for proxied web directories (isolated nodes) + if args.use_process_isolation: + from comfy.isolation.proxies.web_directory_proxy import ( + get_web_directory_cache, + ALLOWED_EXTENSIONS, + ) + + async def serve_proxied_web_file(request): + ext_name = request.match_info["extension_name"] + file_path = request.match_info["path"] + suffix = os.path.splitext(file_path)[1].lower() + + if suffix not in ALLOWED_EXTENSIONS: + return web.Response(status=403, text="Forbidden file type") + + cache = get_web_directory_cache() + result = cache.get_file(ext_name, file_path) + if result is None: + return web.Response(status=404, text="Not found") + + return web.Response( + body=result["content"], + content_type=result["content_type"], + ) + + self.app.router.add_get( + "/extensions/{extension_name}/{path:.+}", + serve_proxied_web_file, + ) + installed_templates_version = FrontendManager.get_installed_templates_version() use_legacy_templates = True if installed_templates_version: