From a3f9d007d4d74ee0ada685a776c3eb649d7b7762 Mon Sep 17 00:00:00 2001 From: doctorpangloss <@hiddenswitch.com> Date: Fri, 9 Feb 2024 01:20:57 -0800 Subject: [PATCH] Fix CLI args issues --- comfy/cli_args.py | 17 ++++++++++++----- comfy/cli_args_types.py | 6 ++++-- comfy/cmd/main.py | 8 ++++---- comfy/cmd/server.py | 1 + comfy/cmd/worker.py | 3 ++- comfy/distributed/distributed_prompt_queue.py | 5 ++++- 6 files changed, 27 insertions(+), 13 deletions(-) diff --git a/comfy/cli_args.py b/comfy/cli_args.py index 33585b5d3..6ece40bf9 100644 --- a/comfy/cli_args.py +++ b/comfy/cli_args.py @@ -122,11 +122,18 @@ parser.add_argument("--analytics-use-identity-provider", action="store_true", parser.add_argument("--distributed-queue-connection-uri", type=str, default=None, help="EXAMPLE: \"amqp://guest:guest@127.0.0.1\" - Servers and clients will connect to this AMPQ URL to form a distributed queue and exchange prompt execution requests and progress updates.") parser.add_argument( - '--distributed-queue-roles', - action='append', - choices=['worker', 'frontend'], - default=["worker", "frontend"], - help='Specifies one or more roles for the distributed queue. Acceptable values are "worker" or "frontend", or both by writing the flag twice with each role. Prompters will start the web UI and connect to the provided AMQP URL to submit prompts; workers will pull requests off the AMQP URL.' + '--distributed-queue-worker', + required=False, + type=bool, + default=True, + help='Workers will pull requests off the AMQP URL.' +) +parser.add_argument( + '--distributed-queue-frontend', + required=False, + type=bool, + default=True, + help='Frontends will start the web UI and connect to the provided AMQP URL to submit prompts.' ) parser.add_argument("--distributed-queue-name", type=str, default="comfyui", help="This name will be used by the frontends and workers to exchange prompt requests and replies. Progress updates will be prefixed by the queue name, followed by a '.', then the user ID") diff --git a/comfy/cli_args_types.py b/comfy/cli_args_types.py index 1f9dfec9a..9be14b0a9 100644 --- a/comfy/cli_args_types.py +++ b/comfy/cli_args_types.py @@ -71,7 +71,8 @@ class Configuration(dict): write_out_config_file (bool): Enable writing out the configuration file. create_directories (bool): Creates the default models/, input/, output/ and temp/ directories, then exits. distributed_queue_connection_uri (Optional[str]): Servers and clients will connect to this AMQP URL to form a distributed queue and exchange prompt execution requests and progress updates. - distributed_queue_roles (List[str]): Specifies one or more roles for the distributed queue. Acceptable values are "worker" or "frontend", or both by writing the flag twice with each role. Frontends will start the web UI and connect to the provided AMQP URL to submit prompts; workers will pull requests off the AMQP URL. + distributed_queue_frontend (bool): Frontends will start the web UI and connect to the provided AMQP URL to submit prompts. + distributed_queue_worker (bool): Workers will pull requests off the AMQP URL. distributed_queue_name (str): This name will be used by the frontends and workers to exchange prompt requests and replies. Progress updates will be prefixed by the queue name, followed by a '.', then the user ID. """ def __init__(self, **kwargs): @@ -131,7 +132,8 @@ class Configuration(dict): self.write_out_config_file: bool = False self.create_directories: bool = False self.distributed_queue_connection_uri: Optional[str] = None - self.distributed_queue_roles: List[str] = ["worker", "frontend"] + self.distributed_queue_worker: bool = True + self.distributed_queue_frontend: bool = True self.distributed_queue_name: str = "comfyui" for key, value in kwargs.items(): self[key] = value diff --git a/comfy/cmd/main.py b/comfy/cmd/main.py index 41695d6c0..cacdc8bc1 100644 --- a/comfy/cmd/main.py +++ b/comfy/cmd/main.py @@ -227,10 +227,10 @@ async def main(): if args.distributed_queue_connection_uri is not None: distributed = True q = DistributedPromptQueue( - caller_server=server if "frontend" in args.distributed_queue_roles else None, + caller_server=server if args.distributed_queue_frontend else None, connection_uri=args.distributed_queue_connection_uri, - is_caller="frontend" in args.distributed_queue_roles, - is_callee="worker" in args.distributed_queue_roles, + is_caller=args.distributed_queue_frontend, + is_callee=args.distributed_queue_worker, loop=loop, queue_name=args.distributed_queue_name ) @@ -257,7 +257,7 @@ async def main(): # in a distributed setting, the default prompt worker will not be able to send execution events via the websocket worker_thread_server = server if not distributed else ServerStub() - if not distributed or "worker" in args.distributed_queue_roles: + if not distributed or args.distributed_queue_worker: if distributed: logging.warning(f"Distributed workers started in the default thread loop cannot notify clients of progress updates. Instead of comfyui or main.py, use comfyui-worker.") threading.Thread(target=prompt_worker, daemon=True, args=(q, worker_thread_server,)).start() diff --git a/comfy/cmd/server.py b/comfy/cmd/server.py index 8310e26b5..452672917 100644 --- a/comfy/cmd/server.py +++ b/comfy/cmd/server.py @@ -108,6 +108,7 @@ class PromptServer(ExecutorToClientProgress): routes = web.RouteTableDef() self.routes: web.RouteTableDef = routes self.last_node_id = None + self.last_prompt_id = None self.client_id = None self.on_prompt_handlers = [] diff --git a/comfy/cmd/worker.py b/comfy/cmd/worker.py index a3e155e66..454172dd9 100644 --- a/comfy/cmd/worker.py +++ b/comfy/cmd/worker.py @@ -10,7 +10,8 @@ from ..cli_args import args async def main(): # assume we are a worker - args.distributed_queue_roles = ["worker"] + args.distributed_queue_worker = True + args.distributed_queue_frontend = False assert args.distributed_queue_connection_uri is not None, "Set the --distributed-queue-connection-uri argument to your RabbitMQ server" async with DistributedPromptWorker(connection_uri=args.distributed_queue_connection_uri, diff --git a/comfy/distributed/distributed_prompt_queue.py b/comfy/distributed/distributed_prompt_queue.py index 8d3c810bd..91613db00 100644 --- a/comfy/distributed/distributed_prompt_queue.py +++ b/comfy/distributed/distributed_prompt_queue.py @@ -131,7 +131,10 @@ class DistributedPromptQueue(AbstractPromptQueue): assert self._is_callee # the loop receiving messages must not be mounted on the worker thread # otherwise receiving messages will be blocked forever - worker_event_loop = asyncio.get_event_loop() + try: + worker_event_loop = asyncio.get_event_loop() + except RuntimeError: + worker_event_loop = None assert self._loop != worker_event_loop, "get only makes sense in the context of the legacy comfyui prompt worker" # spin wait timeout = timeout or 30.0