Make AST regression tests CWD-independent and fail-loud
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled

Two robustness fixes to the workflow_id regression tests:

1. Resolve source-file paths relative to __file__ (repo_root =
   Path(__file__).resolve().parents[2]) instead of bare
   Path("main.py"). The bare form depends on the process CWD being the
   repo root, which made the tests flaky when run from tests-unit/, an
   IDE runner, or any other directory. Verified passing from three
   different CWDs (repo root, tests-unit/, /tmp).

2. Track matched_terminal_executing_send and assert it after the AST
   walk in TestTerminalExecutingResetInMainPy. Previously, if someone
   refactored the send_sync("executing", {...}) call to pass a local
   variable instead of an inline dict literal, the loop would simply
   find no match and the test would pass vacuously, losing its
   regression-protection value. Verified by temporarily refactoring
   main.py to use a variable payload: the test now fails with a clear
   message asking to update the regression test, and passes again once
   the inline dict is restored.
This commit is contained in:
Glary-Bot 2026-05-12 19:17:21 +00:00
parent 7ca4457fd1
commit 137de03c7b

View File

@ -138,7 +138,8 @@ class TestExecutionMessagePayloadsContainWorkflowId:
def _assert_workflow_id_in_every_prompt_id_dict(self, file_path: str): def _assert_workflow_id_in_every_prompt_id_dict(self, file_path: str):
from pathlib import Path from pathlib import Path
source = Path(file_path).read_text() repo_root = Path(__file__).resolve().parents[2]
source = (repo_root / file_path).read_text()
offenders = [] offenders = []
for node, keys in self._emitting_dicts(source): for node, keys in self._emitting_dicts(source):
if "workflow_id" not in keys: if "workflow_id" not in keys:
@ -241,7 +242,8 @@ class TestTerminalExecutingResetInMainPy:
import ast import ast
from pathlib import Path from pathlib import Path
source = Path("main.py").read_text() repo_root = Path(__file__).resolve().parents[2]
source = (repo_root / "main.py").read_text()
tree = ast.parse(source) tree = ast.parse(source)
worker = next( worker = next(
@ -262,6 +264,7 @@ class TestTerminalExecutingResetInMainPy:
"in finally)." "in finally)."
) )
matched_terminal_executing_send = False
for node in ast.walk(worker): for node in ast.walk(worker):
if not isinstance(node, ast.Call): if not isinstance(node, ast.Call):
continue continue
@ -276,6 +279,7 @@ class TestTerminalExecutingResetInMainPy:
and isinstance(node.args[1], ast.Dict) and isinstance(node.args[1], ast.Dict)
): ):
continue continue
matched_terminal_executing_send = True
payload = node.args[1] payload = node.args[1]
for key, value in zip(payload.keys, payload.values): for key, value in zip(payload.keys, payload.values):
if isinstance(key, ast.Constant) and key.value == "workflow_id": if isinstance(key, ast.Constant) and key.value == "workflow_id":
@ -285,3 +289,9 @@ class TestTerminalExecutingResetInMainPy:
"server.last_workflow_id; the executor clears it in its " "server.last_workflow_id; the executor clears it in its "
"finally block. Use a locally captured workflow_id instead." "finally block. Use a locally captured workflow_id instead."
) )
assert matched_terminal_executing_send, (
"main.py:prompt_worker no longer has an inline "
'send_sync("executing", {...}) payload; update this regression test '
"so it still verifies the terminal workflow_id source."
)