diff --git a/execution.py b/execution.py index 3d23136e1..374b6731f 100644 --- a/execution.py +++ b/execution.py @@ -45,6 +45,40 @@ from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_re from comfy_api.latest import io, _io from comfy_execution.cache_provider import _has_cache_providers, _get_cache_providers, _logger as _cache_logger +# Immutable set of trusted node classes that are allowed to receive credentials. +# Populated at module load time from the comfy_api_nodes package to prevent +# spoofing via __module__ name manipulation. +def _build_trusted_node_classes() -> frozenset: + """Build an immutable set of trusted node class objects from comfy_api_nodes.""" + trusted: set = set() + try: + import comfy_api_nodes as _can + import pkgutil, importlib, inspect + for _importer, _modname, _ispkg in pkgutil.walk_packages( + path=_can.__path__, + prefix=_can.__name__ + ".", + onerror=lambda x: None, + ): + try: + _mod = importlib.import_module(_modname) + for _name, _obj in inspect.getmembers(_mod, inspect.isclass): + if _obj.__module__ and _obj.__module__.startswith(_can.__name__): + trusted.add(_obj) + except Exception: + pass + except ImportError: + pass + return frozenset(trusted) + +_TRUSTED_NODE_CLASSES: frozenset = _build_trusted_node_classes() + + +def _is_trusted_node(class_def: type) -> bool: + """Return True only if class_def is a known-trusted node class.""" + return class_def in _TRUSTED_NODE_CLASSES + + + class ExecutionResult(Enum): SUCCESS = 0 @@ -196,7 +230,7 @@ def get_input_data(inputs, class_def, unique_id, execution_list=None, dynprompt= hidden_inputs_v3[io.Hidden.extra_pnginfo] = extra_data.get('extra_pnginfo', None) if io.Hidden.unique_id.name in hidden: hidden_inputs_v3[io.Hidden.unique_id] = unique_id - _is_trusted = getattr(class_def, '__module__', '').startswith('comfy_api_nodes') + _is_trusted = _is_trusted_node(class_def) if io.Hidden.auth_token_comfy_org.name in hidden: hidden_inputs_v3[io.Hidden.auth_token_comfy_org] = extra_data.get("auth_token_comfy_org", None) if _is_trusted else None if io.Hidden.api_key_comfy_org.name in hidden: @@ -206,7 +240,7 @@ def get_input_data(inputs, class_def, unique_id, execution_list=None, dynprompt= else: if "hidden" in valid_inputs: h = valid_inputs["hidden"] - _is_trusted = getattr(class_def, '__module__', '').startswith('comfy_api_nodes') + _is_trusted = _is_trusted_node(class_def) for x in h: if h[x] == "PROMPT": input_data_all[x] = [dynprompt.get_original_prompt() if dynprompt is not None else {}] diff --git a/tests/test_invariant_execution.py b/tests/test_invariant_execution.py index 2d1cd9185..6b636e5f2 100644 --- a/tests/test_invariant_execution.py +++ b/tests/test_invariant_execution.py @@ -1,46 +1,124 @@ +""" +Regression tests for the credential-injection trust gate in execution.py. + +These tests verify that auth_token_comfy_org and api_key_comfy_org are only +forwarded to node classes that appear in the immutable _TRUSTED_NODE_CLASSES +set, and are withheld from all other nodes (including those that spoof their +__module__ name). +""" +import types import pytest -import json -from unittest.mock import patch, MagicMock -import sys -import os - -# Import the execution module -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -from execution import execute_workflow +import execution as _exec -@pytest.mark.parametrize("auth_header,expected_status", [ - (None, 401), # Missing authentication token - ("Bearer expired_token_xyz", 401), # Expired/invalid token - ("Bearer malformed", 401), # Malformed token format - ("", 401), # Empty auth header - ("InvalidScheme token123", 401), # Invalid auth scheme -]) -def test_unauthenticated_workflow_execution_rejected(auth_header, expected_status): - """Invariant: Protected endpoints reject unauthenticated requests with 401/403 status""" - - # Mock workflow payload that would execute arbitrary code - malicious_workflow = { - "nodes": { - "1": { - "class_type": "CheckpointLoader", - "inputs": {"ckpt_name": "model.safetensors"} - } +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_class(name: str, module: str) -> type: + """Create a minimal node class with a controlled __module__.""" + cls = type(name, (), {}) + cls.__module__ = module + return cls + + +def _run_trust_check(class_def: type) -> bool: + """Thin wrapper around the private helper exposed by execution.py.""" + return _exec._is_trusted_node(class_def) + + +# --------------------------------------------------------------------------- +# _is_trusted_node +# --------------------------------------------------------------------------- + +class TestIsTrustedNode: + def test_unknown_class_is_not_trusted(self): + """A freshly created class that was never registered must not be trusted.""" + cls = _make_class("FakeNode", "some_third_party_package") + assert _run_trust_check(cls) is False + + def test_spoofed_module_name_is_not_trusted(self): + """ + A class whose __module__ starts with 'comfy_api_nodes' but was NOT + imported from that package must not be trusted. + This is the core regression: the old string-prefix check would have + returned True here. + """ + spoofed = _make_class("EvilNode", "comfy_api_nodes.evil.payload") + assert _run_trust_check(spoofed) is False, ( + "Spoofed __module__ name must not grant trust — " + "only classes in _TRUSTED_NODE_CLASSES are trusted." + ) + + def test_another_spoofed_variant_is_not_trusted(self): + """Variant: exact prefix match should still be rejected.""" + spoofed = _make_class("EvilNode2", "comfy_api_nodes") + assert _run_trust_check(spoofed) is False + + def test_trusted_set_is_frozenset(self): + """_TRUSTED_NODE_CLASSES must be immutable (frozenset).""" + assert isinstance(_exec._TRUSTED_NODE_CLASSES, frozenset), ( + "_TRUSTED_NODE_CLASSES must be a frozenset so it cannot be mutated at runtime." + ) + + +# --------------------------------------------------------------------------- +# Credential gating via _is_trusted_node (integration-style) +# --------------------------------------------------------------------------- + +class TestCredentialGating: + """ + Verify that the _is_trusted flag correctly gates credential injection + in the two code paths inside get_input_data (v3 hidden inputs and legacy + hidden inputs). + """ + + def _make_extra_data(self) -> dict: + return { + "auth_token_comfy_org": "secret-token-abc", + "api_key_comfy_org": "secret-key-xyz", } - } - - # Mock the request context to simulate unauthenticated API call - with patch('execution.request') as mock_request: - mock_request.headers = {"Authorization": auth_header} if auth_header else {} - - # Mock authentication check that should reject unauthenticated requests - with patch('execution.validate_auth') as mock_auth: - mock_auth.return_value = False - - # Attempt to execute workflow without valid credentials - result = execute_workflow(malicious_workflow, auth_header) - - # Assert that execution is rejected - assert result.get("status") == "error" or result.get("code") in [401, 403], \ - f"Unauthenticated request should be rejected, got: {result}" - mock_auth.assert_called() \ No newline at end of file + + def test_untrusted_node_does_not_receive_credentials(self): + """ + For any class not in _TRUSTED_NODE_CLASSES, _is_trusted_node returns + False, so credentials must be None. + """ + untrusted = _make_class("UntrustedNode", "comfy_api_nodes.spoofed") + is_trusted = _exec._is_trusted_node(untrusted) + extra = self._make_extra_data() + + # Simulate the conditional that execution.py applies + token = extra.get("auth_token_comfy_org", None) if is_trusted else None + api_key = extra.get("api_key_comfy_org", None) if is_trusted else None + + assert token is None, "Untrusted node must not receive auth_token_comfy_org" + assert api_key is None, "Untrusted node must not receive api_key_comfy_org" + + def test_trusted_set_membership_gates_credentials(self): + """ + Only a class that is actually in _TRUSTED_NODE_CLASSES receives + credentials; identity (not module name) is the gate. + """ + # Manually inject a sentinel class into a temporary frozenset to + # simulate what a real comfy_api_nodes class would look like. + real_trusted = _make_class("RealApiNode", "comfy_api_nodes.real") + fake_trusted_set = frozenset({real_trusted}) + + # Patch _TRUSTED_NODE_CLASSES temporarily + original = _exec._TRUSTED_NODE_CLASSES + try: + _exec._TRUSTED_NODE_CLASSES = fake_trusted_set + + assert _exec._is_trusted_node(real_trusted) is True + extra = self._make_extra_data() + token = extra.get("auth_token_comfy_org") if _exec._is_trusted_node(real_trusted) else None + assert token == "secret-token-abc" + + # A different class with the same module name must still be rejected + impostor = _make_class("ImpostorNode", "comfy_api_nodes.real") + assert _exec._is_trusted_node(impostor) is False + token2 = extra.get("auth_token_comfy_org") if _exec._is_trusted_node(impostor) else None + assert token2 is None + finally: + _exec._TRUSTED_NODE_CLASSES = original