diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index 20ebae155..fa3ab0faf 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -4,11 +4,22 @@ Provides normalization and helper functions for job status tracking. """ import uuid -from typing import Optional +from typing import Callable, Optional from comfy_api.internal import prune_dict +# Result of classifying a job for cancellation. +# 'running' -> job is currently executing (interrupt it) +# 'pending' -> job is queued but not started (dequeue it) +# 'terminal' -> job already finished (present in history); cancel is a no-op +# 'unknown' -> job id is not present anywhere +CANCEL_RUNNING = 'running' +CANCEL_PENDING = 'pending' +CANCEL_TERMINAL = 'terminal' +CANCEL_UNKNOWN = 'unknown' + + class JobStatus: """Job status constants.""" PENDING = 'pending' @@ -407,3 +418,71 @@ def get_all_jobs( jobs = jobs[:limit] return (jobs, total_count) + + +def classify_job_for_cancel(prompt_id: str, running: list, queued: list, history: dict) -> str: + """Classify a job id for cancellation. + + Returns one of CANCEL_RUNNING, CANCEL_PENDING, CANCEL_TERMINAL, CANCEL_UNKNOWN. + + Queue items are tuples whose second element (index 1) is the prompt_id. + History is a dict keyed by prompt_id, so a job present there has already + finished and cancelling it is a no-op. + """ + for item in running: + if item[1] == prompt_id: + return CANCEL_RUNNING + for item in queued: + if item[1] == prompt_id: + return CANCEL_PENDING + if prompt_id in history: + return CANCEL_TERMINAL + return CANCEL_UNKNOWN + + +def cancel_job( + prompt_id: str, + running: list, + queued: list, + history: dict, + interrupt: Callable[[str], bool], + dequeue: Callable[[str], bool], +) -> str: + """Cancel a single job by id, regardless of state. + + Maps the cancel onto the runtime's existing mechanics: + - a running job is interrupted via ``interrupt`` + - a pending job is removed from the queue via ``dequeue`` + - a job that already finished (terminal) is a no-op + - an unknown id is a no-op (callers that need fail-fast behaviour should + validate ids up front with ``classify_job_for_cancel``) + + Both ``interrupt`` and ``dequeue`` take the prompt id and return whether + they acted on a job that was *actually* in that state, so the value returned + here reflects what truly happened rather than the (possibly stale) + classification. This matters around the narrow TOCTOU windows where a job + changes state between the caller's snapshot and the action: + + - a job classified RUNNING may have finished before ``interrupt`` fires: + ``interrupt`` returns False and this returns CANCEL_UNKNOWN (no-op). + - a job classified PENDING may have started executing before ``dequeue`` + fires: ``dequeue`` returns False, ``interrupt`` then catches the now- + running job and this returns CANCEL_RUNNING. If it had simply finished + instead, both return False and this returns CANCEL_UNKNOWN. + + ``interrupt`` must be atomic — interrupt the job only if it is still the one + running — so a cancel can never land on an unrelated prompt that started in + the meantime (see ``execution.PromptQueue.interrupt_if_running``). + """ + classification = classify_job_for_cancel(prompt_id, running, queued, history) + if classification == CANCEL_RUNNING: + return CANCEL_RUNNING if interrupt(prompt_id) else CANCEL_UNKNOWN + if classification == CANCEL_PENDING: + if dequeue(prompt_id): + return CANCEL_PENDING + # Left the pending queue between classification and dequeue: if it + # started executing, interrupt the now-running job; otherwise it has + # already finished and the cancel is a genuine no-op. + return CANCEL_RUNNING if interrupt(prompt_id) else CANCEL_UNKNOWN + # CANCEL_TERMINAL and CANCEL_UNKNOWN are intentional no-ops. + return classification diff --git a/execution.py b/execution.py index 9e16e451d..c45317593 100644 --- a/execution.py +++ b/execution.py @@ -1308,6 +1308,25 @@ class PromptQueue: queued = copy.copy(self.queue) return (running, queued) + def interrupt_if_running(self, prompt_id): + """Interrupt the running prompt with this id, atomically. + + Checks the live running set and signals the interrupt under the queue + mutex, so the worker cannot move the job to done (and start the next + prompt) in between. Returns True if a matching job was running and an + interrupt was signalled, False otherwise. The atomicity is what keeps a + cancel from landing on an unrelated prompt that started after a separate + is-running check: the global interrupt flag is reset at the start of + every prompt (execute_async), so a job that finishes before consuming + the flag cannot leak the interrupt onto its successor. + """ + with self.mutex: + for item in self.currently_running.values(): + if item[1] == prompt_id: + nodes.interrupt_processing() + return True + return False + def get_tasks_remaining(self): with self.mutex: return len(self.queue) + len(self.currently_running) diff --git a/server.py b/server.py index 6b0029adf..361850f38 100644 --- a/server.py +++ b/server.py @@ -8,7 +8,15 @@ import time import nodes import folder_paths import execution -from comfy_execution.jobs import JobStatus, get_job, get_all_jobs, validate_job_id +from comfy_execution.jobs import ( + JobStatus, + get_job, + get_all_jobs, + validate_job_id, + cancel_job, + CANCEL_PENDING, + CANCEL_RUNNING, +) import uuid import urllib import json @@ -899,6 +907,107 @@ class PromptServer(): return web.json_response(job) + def _cancel_job_by_id(job_id): + """Cancel a single job by id using the queue's existing mechanics. + + Running jobs are interrupted (same mechanism as /interrupt); pending + jobs are dequeued (same mechanism as /queue {"delete": [...]}). + Already-finished or unknown ids are no-ops. State-agnostic. + + Returns True when a cancel was actually dispatched (running or + pending job), False when the call was a no-op (terminal/unknown id). + """ + running, queued = self.prompt_queue.get_current_queue() + history = self.prompt_queue.get_history() + + def interrupt(prompt_id): + logging.info(f"Cancelling running prompt {prompt_id}") + # Atomic: only interrupts if the job is still the one running, + # so a cancel can't land on a prompt that started in the gap + # since the snapshot above. Returns whether it actually fired. + return self.prompt_queue.interrupt_if_running(prompt_id) + + def dequeue(prompt_id): + logging.info(f"Cancelling pending prompt {prompt_id}") + return self.prompt_queue.delete_queue_item(lambda a: a[1] == prompt_id) + + classification = cancel_job(job_id, running, queued, history, interrupt, dequeue) + return classification in (CANCEL_RUNNING, CANCEL_PENDING) + + @routes.post("/api/jobs/{job_id}/cancel") + async def cancel_job_by_id(request): + """Cancel a single job by id, regardless of state. + + Idempotent: cancelling a job that has already finished, or an id + that is not known, returns 200 with {"cancelled": false} rather + than an error. + """ + job_id = request.match_info.get("job_id", None) + if not job_id: + return web.json_response( + {"error": "job_id is required"}, + status=400 + ) + + cancelled = _cancel_job_by_id(job_id) + return web.json_response({"cancelled": cancelled}) + + @routes.post("/api/jobs/cancel") + async def cancel_jobs_batch(request): + """Cancel a batch of jobs by id. + + Body: {"job_ids": ["", ...]} + + Best-effort and idempotent: every well-formed id is cancelled if it + is running or pending; ids that are already finished or unknown are + no-ops, not errors. A batch of all no-ops still returns 200 with + {"cancelled": false}. This matches the single-cancel endpoint and + means "cancel all" still cancels the in-progress jobs even if some + finished between the client's snapshot and the request. Malformed + ids are still rejected up front with 400 (see below). + """ + try: + json_data = await request.json() + except json.JSONDecodeError: + return web.json_response( + {"error": "Request body must be valid JSON"}, + status=400 + ) + + job_ids = json_data.get("job_ids") if isinstance(json_data, dict) else None + if not isinstance(job_ids, list): + return web.json_response( + {"error": "job_ids must be a list"}, + status=400 + ) + + # Validate that every element is a well-formed job id before doing + # anything else. An unhashable element (e.g. a nested dict or list) + # would cause a TypeError when used as a history dict key; a + # non-string or non-UUID value is never a valid id. Reject early + # with 400 rather than letting the classify loop raise 500. + invalid_ids = [] + for jid in job_ids: + try: + validate_job_id(jid) + except (ValueError, AttributeError): + invalid_ids.append(jid if isinstance(jid, str) else repr(jid)) + if invalid_ids: + return web.json_response( + {"error": "job_ids contains invalid id(s)", "invalid_ids": invalid_ids}, + status=400, + ) + + # Best-effort: cancel each id that is still running/pending; an id + # that has finished or never existed is a no-op rather than a reason + # to fail the whole batch. + cancelled = False + for jid in job_ids: + if _cancel_job_by_id(jid): + cancelled = True + + return web.json_response({"cancelled": cancelled}) + @routes.get("/history") async def get_history(request): max_items = request.rel_url.query.get("max_items", None) diff --git a/tests-unit/jobs_cancel_test/__init__.py b/tests-unit/jobs_cancel_test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests-unit/jobs_cancel_test/jobs_cancel_test.py b/tests-unit/jobs_cancel_test/jobs_cancel_test.py new file mode 100644 index 000000000..f1d591b0d --- /dev/null +++ b/tests-unit/jobs_cancel_test/jobs_cancel_test.py @@ -0,0 +1,453 @@ +"""Tests for the jobs-namespace cancel endpoints. + +Covers both layers: + +* the pure cancel helpers in ``comfy_execution.jobs`` + (``classify_job_for_cancel`` / ``cancel_job``), which hold the business + logic of mapping a cancel onto interrupt-vs-dequeue, and + +* the HTTP contract of ``POST /api/jobs/{job_id}/cancel`` and + ``POST /api/jobs/cancel`` (status codes, single-cancel idempotency, and + best-effort batch cancellation that treats unknown/finished ids as no-ops + while still rejecting malformed ids with 400). + +The HTTP layer is exercised against a small aiohttp app whose handlers are a +faithful copy of the wiring in ``server.py`` driven by a fake queue that +mirrors ``execution.PromptQueue`` (``get_current_queue`` / ``get_history`` / +``delete_queue_item``). This keeps the test free of the heavy ComfyUI runtime +(torch, nodes, ...) while still testing the real cancel logic. +""" + +import json + +import pytest +from aiohttp import web + +from comfy_execution.jobs import ( + CANCEL_PENDING, + CANCEL_RUNNING, + CANCEL_TERMINAL, + CANCEL_UNKNOWN, + cancel_job, + classify_job_for_cancel, + validate_job_id, +) + +# Classifications for which a cancel was actually dispatched (vs a no-op). +_CANCELLED = (CANCEL_RUNNING, CANCEL_PENDING) + +# Canonical UUID ids for HTTP-layer tests (the batch endpoint validates UUID format). +_UUID_A = "aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa" +_UUID_B = "bbbbbbbb-bbbb-4bbb-bbbb-bbbbbbbbbbbb" +_UUID_C = "cccccccc-cccc-4ccc-cccc-cccccccccccc" +_UUID_D = "dddddddd-dddd-4ddd-dddd-dddddddddddd" +_UUID_MISSING = "ffffffff-ffff-4fff-ffff-ffffffffffff" + + +def make_queue_item(prompt_id, number=0): + """Build a queue tuple shaped like the real ones: index 1 is the id.""" + return (number, prompt_id, {}, {}, []) + + +class FakePromptQueue: + """Minimal stand-in for execution.PromptQueue for the cancel paths. + + Tracks interrupts and dequeues so tests can assert side effects. + """ + + def __init__(self, running=None, pending=None, history=None): + self._running = list(running or []) + self._pending = list(pending or []) + self._history = dict(history or {}) + self.interrupt_count = 0 + + def get_current_queue(self): + return (list(self._running), list(self._pending)) + + def get_history(self, prompt_id=None): + if prompt_id is None: + return dict(self._history) + if prompt_id in self._history: + return {prompt_id: self._history[prompt_id]} + return {} + + def delete_queue_item(self, function): + for i, item in enumerate(self._pending): + if function(item): + self._pending.pop(i) + return True + return False + + def interrupt_if_running(self, prompt_id): + # Mirrors execution.PromptQueue.interrupt_if_running: only signals an + # interrupt when the id is actually in the running set. + if any(item[1] == prompt_id for item in self._running): + self.interrupt_count += 1 + return True + return False + + +def build_app(queue): + """Build an aiohttp app exposing the cancel routes against ``queue``. + + Handler bodies mirror server.py exactly. + """ + + def _cancel_job_by_id(job_id): + running, pending = queue.get_current_queue() + history = queue.get_history() + + def interrupt(prompt_id): + return queue.interrupt_if_running(prompt_id) + + def dequeue(prompt_id): + return queue.delete_queue_item(lambda a: a[1] == prompt_id) + + classification = cancel_job( + job_id, running, pending, history, interrupt, dequeue + ) + return classification in _CANCELLED + + async def cancel_job_by_id(request): + job_id = request.match_info.get("job_id", None) + if not job_id: + return web.json_response({"error": "job_id is required"}, status=400) + cancelled = _cancel_job_by_id(job_id) + return web.json_response({"cancelled": cancelled}) + + async def cancel_jobs_batch(request): + try: + json_data = await request.json() + except json.JSONDecodeError: + return web.json_response( + {"error": "Request body must be valid JSON"}, status=400 + ) + + job_ids = json_data.get("job_ids") if isinstance(json_data, dict) else None + if not isinstance(job_ids, list): + return web.json_response({"error": "job_ids must be a list"}, status=400) + + invalid_ids = [] + for jid in job_ids: + try: + validate_job_id(jid) + except (ValueError, AttributeError): + invalid_ids.append(jid if isinstance(jid, str) else repr(jid)) + if invalid_ids: + return web.json_response( + {"error": "job_ids contains invalid id(s)", "invalid_ids": invalid_ids}, + status=400, + ) + + cancelled = False + for jid in job_ids: + if _cancel_job_by_id(jid): + cancelled = True + return web.json_response({"cancelled": cancelled}) + + app = web.Application() + app.router.add_post("/api/jobs/{job_id}/cancel", cancel_job_by_id) + app.router.add_post("/api/jobs/cancel", cancel_jobs_batch) + return app + + +# --------------------------------------------------------------------------- +# Pure helper tests: classification + cancel side effects +# --------------------------------------------------------------------------- + + +class TestClassifyJobForCancel: + def test_running(self): + running = [make_queue_item("a")] + assert classify_job_for_cancel("a", running, [], {}) == CANCEL_RUNNING + + def test_pending(self): + pending = [make_queue_item("b")] + assert classify_job_for_cancel("b", [], pending, {}) == CANCEL_PENDING + + def test_terminal(self): + history = {"c": {"prompt": make_queue_item("c"), "outputs": {}, "status": {}}} + assert classify_job_for_cancel("c", [], [], history) == CANCEL_TERMINAL + + def test_unknown(self): + assert classify_job_for_cancel("z", [], [], {}) == CANCEL_UNKNOWN + + +class TestCancelJobHelper: + """``interrupt`` and ``dequeue`` both take the id and return whether they + actually acted, so cancel_job's return reflects the real outcome.""" + + def test_running_is_interrupted_not_dequeued(self): + interrupts = [] + dequeues = [] + result = cancel_job( + "a", [make_queue_item("a")], [], {}, + interrupt=lambda pid: interrupts.append(pid) or True, + dequeue=lambda pid: dequeues.append(pid) or True, + ) + assert result == CANCEL_RUNNING + assert interrupts == ["a"] + assert dequeues == [] + + def test_pending_is_dequeued_not_interrupted(self): + interrupts = [] + dequeues = [] + result = cancel_job( + "b", [], [make_queue_item("b")], {}, + interrupt=lambda pid: interrupts.append(pid) or True, + dequeue=lambda pid: dequeues.append(pid) or True, + ) + assert result == CANCEL_PENDING + assert dequeues == ["b"] + assert interrupts == [] + + def test_terminal_is_noop(self): + history = {"c": {"prompt": make_queue_item("c"), "outputs": {}, "status": {}}} + interrupts = [] + dequeues = [] + result = cancel_job( + "c", [], [], history, + interrupt=lambda pid: interrupts.append(pid) or True, + dequeue=lambda pid: dequeues.append(pid) or True, + ) + assert result == CANCEL_TERMINAL + assert interrupts == [] + assert dequeues == [] + + def test_unknown_is_noop(self): + interrupts = [] + dequeues = [] + result = cancel_job( + "z", [], [], {}, + interrupt=lambda pid: interrupts.append(pid) or True, + dequeue=lambda pid: dequeues.append(pid) or True, + ) + assert result == CANCEL_UNKNOWN + assert interrupts == [] + assert dequeues == [] + + def test_running_but_finished_before_interrupt_returns_unknown(self): + """Classified RUNNING from a stale snapshot, but the job finished before + the atomic interrupt fired (interrupt returns False). cancel_job reports + UNKNOWN rather than claiming a cancel that did not happen — and the + atomic interrupt guarantees no unrelated job was hit.""" + interrupts = [] + result = cancel_job( + "a", [make_queue_item("a")], [], {}, + interrupt=lambda pid: interrupts.append(pid) or False, + dequeue=lambda pid: True, + ) + assert result == CANCEL_UNKNOWN + assert interrupts == ["a"] # interrupt was attempted atomically + + def test_pending_started_running_is_interrupted(self): + """Pending->running race: the job leaves the queue (dequeue False) + because it started executing. The atomic interrupt catches the now- + running job, so cancel_job interrupts it and reports CANCEL_RUNNING.""" + interrupts = [] + dequeues = [] + result = cancel_job( + "b", [], [make_queue_item("b")], {}, + interrupt=lambda pid: interrupts.append(pid) or True, + dequeue=lambda pid: (dequeues.append(pid), False)[1], + ) + assert result == CANCEL_RUNNING + assert dequeues == ["b"] # dequeue attempted first + assert interrupts == ["b"] # then the now-running job was interrupted + + def test_pending_dequeue_miss_not_running_returns_unknown(self): + """Dequeue miss where the job is not running anymore (it finished): the + atomic interrupt finds nothing to interrupt and returns False, so + cancel_job is a no-op reporting UNKNOWN — never reporting a cancel that + did not happen, and never interrupting a bystander.""" + interrupts = [] + dequeues = [] + result = cancel_job( + "b", [], [make_queue_item("b")], {}, + interrupt=lambda pid: interrupts.append(pid) or False, + dequeue=lambda pid: (dequeues.append(pid), False)[1], + ) + assert result == CANCEL_UNKNOWN + assert dequeues == ["b"] + assert interrupts == ["b"] # interrupt attempted, found nothing running + + +# --------------------------------------------------------------------------- +# HTTP contract tests: POST /api/jobs/{job_id}/cancel +# --------------------------------------------------------------------------- + + +class TestSingleCancelEndpoint: + @pytest.mark.asyncio + async def test_cancel_running_job_interrupts(self, aiohttp_client): + queue = FakePromptQueue(running=[make_queue_item("a")]) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/a/cancel") + + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": True} + assert queue.interrupt_count == 1 + + @pytest.mark.asyncio + async def test_cancel_pending_job_dequeues(self, aiohttp_client): + queue = FakePromptQueue(pending=[make_queue_item("b")]) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/b/cancel") + + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": True} + # Pending job removed from the queue; nothing interrupted. + assert queue.get_current_queue()[1] == [] + assert queue.interrupt_count == 0 + + @pytest.mark.asyncio + async def test_cancel_terminal_job_is_idempotent_noop(self, aiohttp_client): + history = {"c": {"prompt": make_queue_item("c"), "outputs": {}, "status": {}}} + queue = FakePromptQueue(history=history) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/c/cancel") + + # Already-finished job: 200 no-op (cancelled=false), not an error. + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": False} + assert queue.interrupt_count == 0 + + @pytest.mark.asyncio + async def test_cancel_unknown_id_is_200_noop(self, aiohttp_client): + queue = FakePromptQueue() + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/does-not-exist/cancel") + + # Single-cancel of an unknown id is treated as an idempotent no-op. + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": False} + assert queue.interrupt_count == 0 + + @pytest.mark.asyncio + async def test_cancel_pending_that_started_running_interrupts(self, aiohttp_client): + """Pending->running race end to end: the job is pending at snapshot time + but starts executing by the time we dequeue (delete misses). The live + re-check sees it running and interrupts it, so the cancel is not dropped + and the caller still gets cancelled=True.""" + + class RacingQueue(FakePromptQueue): + def delete_queue_item(self, function): + # The worker picked the job up just before we removed it: it + # leaves the pending queue (delete misses) and is now running. + self._running = list(self._pending) + self._pending = [] + return False + + queue = RacingQueue(pending=[make_queue_item("b")]) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/b/cancel") + + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": True} + assert queue.interrupt_count == 1 + + +# --------------------------------------------------------------------------- +# HTTP contract tests: POST /api/jobs/cancel (batch) +# --------------------------------------------------------------------------- + + +class TestBatchCancelEndpoint: + @pytest.mark.asyncio + async def test_batch_happy_path(self, aiohttp_client): + queue = FakePromptQueue( + running=[make_queue_item(_UUID_A)], + pending=[make_queue_item(_UUID_B, number=1)], + ) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/cancel", json={"job_ids": [_UUID_A, _UUID_B]}) + + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": True} + assert queue.interrupt_count == 1 # running job interrupted + assert queue.get_current_queue()[1] == [] # pending job dequeued + + @pytest.mark.asyncio + async def test_batch_best_effort_skips_unknown_id(self, aiohttp_client): + """An unknown id in the batch is a no-op, not a reason to abort: the + running and pending jobs are still cancelled (200, cancelled=true). This + is the "cancel all as a job finishes" case from review.""" + queue = FakePromptQueue( + running=[make_queue_item(_UUID_A)], + pending=[make_queue_item(_UUID_B, number=1)], + ) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post( + "/api/jobs/cancel", json={"job_ids": [_UUID_A, _UUID_MISSING, _UUID_B]} + ) + + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": True} + assert queue.interrupt_count == 1 # running job interrupted + assert queue.get_current_queue()[1] == [] # pending job dequeued + + @pytest.mark.asyncio + async def test_batch_all_terminal_is_idempotent_noop(self, aiohttp_client): + history = { + _UUID_C: {"prompt": make_queue_item(_UUID_C), "outputs": {}, "status": {}}, + _UUID_D: {"prompt": make_queue_item(_UUID_D), "outputs": {}, "status": {}}, + } + queue = FakePromptQueue(history=history) + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/cancel", json={"job_ids": [_UUID_C, _UUID_D]}) + + # All known but terminal: 200 with cancelled=false, nothing dispatched. + assert resp.status == 200 + assert (await resp.json()) == {"cancelled": False} + assert queue.interrupt_count == 0 + + @pytest.mark.asyncio + async def test_batch_missing_job_ids_is_400(self, aiohttp_client): + queue = FakePromptQueue() + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/cancel", json={}) + + assert resp.status == 400 + + @pytest.mark.asyncio + async def test_batch_unhashable_element_is_400_not_500(self, aiohttp_client): + """An unhashable element such as a dict or list must yield 400, not 500. + + Previously, passing e.g. {"job_ids": [{}]} would reach the classify + loop where ``prompt_id in history`` raises TypeError on an unhashable + type, resulting in an unhandled 500. The input-validation guard must + catch this before any queue or history access. + """ + queue = FakePromptQueue() + client = await aiohttp_client(build_app(queue)) + + resp = await client.post("/api/jobs/cancel", json={"job_ids": [{}]}) + + assert resp.status == 400 + body = await resp.json() + assert "invalid_ids" in body + # No queue side effects. + assert queue.interrupt_count == 0 + + @pytest.mark.asyncio + async def test_batch_non_uuid_string_element_is_400(self, aiohttp_client): + """A string that is not a valid UUID must be rejected with 400.""" + queue = FakePromptQueue() + client = await aiohttp_client(build_app(queue)) + + resp = await client.post( + "/api/jobs/cancel", json={"job_ids": ["not-a-uuid"]} + ) + + assert resp.status == 400 + body = await resp.json() + assert "invalid_ids" in body