Apply code changes: @orbisai0security can you address code review comm...

This commit is contained in:
orbisai0security 2026-06-12 15:45:13 +00:00
parent 06e716ffd1
commit e66e6955f8
2 changed files with 156 additions and 44 deletions

View File

@ -45,6 +45,40 @@ from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_re
from comfy_api.latest import io, _io
from comfy_execution.cache_provider import _has_cache_providers, _get_cache_providers, _logger as _cache_logger
# Immutable set of trusted node classes that are allowed to receive credentials.
# Populated at module load time from the comfy_api_nodes package to prevent
# spoofing via __module__ name manipulation.
def _build_trusted_node_classes() -> frozenset:
"""Build an immutable set of trusted node class objects from comfy_api_nodes."""
trusted: set = set()
try:
import comfy_api_nodes as _can
import pkgutil, importlib, inspect
for _importer, _modname, _ispkg in pkgutil.walk_packages(
path=_can.__path__,
prefix=_can.__name__ + ".",
onerror=lambda x: None,
):
try:
_mod = importlib.import_module(_modname)
for _name, _obj in inspect.getmembers(_mod, inspect.isclass):
if _obj.__module__ and _obj.__module__.startswith(_can.__name__):
trusted.add(_obj)
except Exception:
pass
except ImportError:
pass
return frozenset(trusted)
_TRUSTED_NODE_CLASSES: frozenset = _build_trusted_node_classes()
def _is_trusted_node(class_def: type) -> bool:
"""Return True only if class_def is a known-trusted node class."""
return class_def in _TRUSTED_NODE_CLASSES
class ExecutionResult(Enum):
SUCCESS = 0
@ -196,7 +230,7 @@ def get_input_data(inputs, class_def, unique_id, execution_list=None, dynprompt=
hidden_inputs_v3[io.Hidden.extra_pnginfo] = extra_data.get('extra_pnginfo', None)
if io.Hidden.unique_id.name in hidden:
hidden_inputs_v3[io.Hidden.unique_id] = unique_id
_is_trusted = getattr(class_def, '__module__', '').startswith('comfy_api_nodes')
_is_trusted = _is_trusted_node(class_def)
if io.Hidden.auth_token_comfy_org.name in hidden:
hidden_inputs_v3[io.Hidden.auth_token_comfy_org] = extra_data.get("auth_token_comfy_org", None) if _is_trusted else None
if io.Hidden.api_key_comfy_org.name in hidden:
@ -206,7 +240,7 @@ def get_input_data(inputs, class_def, unique_id, execution_list=None, dynprompt=
else:
if "hidden" in valid_inputs:
h = valid_inputs["hidden"]
_is_trusted = getattr(class_def, '__module__', '').startswith('comfy_api_nodes')
_is_trusted = _is_trusted_node(class_def)
for x in h:
if h[x] == "PROMPT":
input_data_all[x] = [dynprompt.get_original_prompt() if dynprompt is not None else {}]

View File

@ -1,46 +1,124 @@
"""
Regression tests for the credential-injection trust gate in execution.py.
These tests verify that auth_token_comfy_org and api_key_comfy_org are only
forwarded to node classes that appear in the immutable _TRUSTED_NODE_CLASSES
set, and are withheld from all other nodes (including those that spoof their
__module__ name).
"""
import types
import pytest
import json
from unittest.mock import patch, MagicMock
import sys
import os
# Import the execution module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from execution import execute_workflow
import execution as _exec
@pytest.mark.parametrize("auth_header,expected_status", [
(None, 401), # Missing authentication token
("Bearer expired_token_xyz", 401), # Expired/invalid token
("Bearer malformed", 401), # Malformed token format
("", 401), # Empty auth header
("InvalidScheme token123", 401), # Invalid auth scheme
])
def test_unauthenticated_workflow_execution_rejected(auth_header, expected_status):
"""Invariant: Protected endpoints reject unauthenticated requests with 401/403 status"""
# Mock workflow payload that would execute arbitrary code
malicious_workflow = {
"nodes": {
"1": {
"class_type": "CheckpointLoader",
"inputs": {"ckpt_name": "model.safetensors"}
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_class(name: str, module: str) -> type:
"""Create a minimal node class with a controlled __module__."""
cls = type(name, (), {})
cls.__module__ = module
return cls
def _run_trust_check(class_def: type) -> bool:
"""Thin wrapper around the private helper exposed by execution.py."""
return _exec._is_trusted_node(class_def)
# ---------------------------------------------------------------------------
# _is_trusted_node
# ---------------------------------------------------------------------------
class TestIsTrustedNode:
def test_unknown_class_is_not_trusted(self):
"""A freshly created class that was never registered must not be trusted."""
cls = _make_class("FakeNode", "some_third_party_package")
assert _run_trust_check(cls) is False
def test_spoofed_module_name_is_not_trusted(self):
"""
A class whose __module__ starts with 'comfy_api_nodes' but was NOT
imported from that package must not be trusted.
This is the core regression: the old string-prefix check would have
returned True here.
"""
spoofed = _make_class("EvilNode", "comfy_api_nodes.evil.payload")
assert _run_trust_check(spoofed) is False, (
"Spoofed __module__ name must not grant trust — "
"only classes in _TRUSTED_NODE_CLASSES are trusted."
)
def test_another_spoofed_variant_is_not_trusted(self):
"""Variant: exact prefix match should still be rejected."""
spoofed = _make_class("EvilNode2", "comfy_api_nodes")
assert _run_trust_check(spoofed) is False
def test_trusted_set_is_frozenset(self):
"""_TRUSTED_NODE_CLASSES must be immutable (frozenset)."""
assert isinstance(_exec._TRUSTED_NODE_CLASSES, frozenset), (
"_TRUSTED_NODE_CLASSES must be a frozenset so it cannot be mutated at runtime."
)
# ---------------------------------------------------------------------------
# Credential gating via _is_trusted_node (integration-style)
# ---------------------------------------------------------------------------
class TestCredentialGating:
"""
Verify that the _is_trusted flag correctly gates credential injection
in the two code paths inside get_input_data (v3 hidden inputs and legacy
hidden inputs).
"""
def _make_extra_data(self) -> dict:
return {
"auth_token_comfy_org": "secret-token-abc",
"api_key_comfy_org": "secret-key-xyz",
}
}
# Mock the request context to simulate unauthenticated API call
with patch('execution.request') as mock_request:
mock_request.headers = {"Authorization": auth_header} if auth_header else {}
# Mock authentication check that should reject unauthenticated requests
with patch('execution.validate_auth') as mock_auth:
mock_auth.return_value = False
# Attempt to execute workflow without valid credentials
result = execute_workflow(malicious_workflow, auth_header)
# Assert that execution is rejected
assert result.get("status") == "error" or result.get("code") in [401, 403], \
f"Unauthenticated request should be rejected, got: {result}"
mock_auth.assert_called()
def test_untrusted_node_does_not_receive_credentials(self):
"""
For any class not in _TRUSTED_NODE_CLASSES, _is_trusted_node returns
False, so credentials must be None.
"""
untrusted = _make_class("UntrustedNode", "comfy_api_nodes.spoofed")
is_trusted = _exec._is_trusted_node(untrusted)
extra = self._make_extra_data()
# Simulate the conditional that execution.py applies
token = extra.get("auth_token_comfy_org", None) if is_trusted else None
api_key = extra.get("api_key_comfy_org", None) if is_trusted else None
assert token is None, "Untrusted node must not receive auth_token_comfy_org"
assert api_key is None, "Untrusted node must not receive api_key_comfy_org"
def test_trusted_set_membership_gates_credentials(self):
"""
Only a class that is actually in _TRUSTED_NODE_CLASSES receives
credentials; identity (not module name) is the gate.
"""
# Manually inject a sentinel class into a temporary frozenset to
# simulate what a real comfy_api_nodes class would look like.
real_trusted = _make_class("RealApiNode", "comfy_api_nodes.real")
fake_trusted_set = frozenset({real_trusted})
# Patch _TRUSTED_NODE_CLASSES temporarily
original = _exec._TRUSTED_NODE_CLASSES
try:
_exec._TRUSTED_NODE_CLASSES = fake_trusted_set
assert _exec._is_trusted_node(real_trusted) is True
extra = self._make_extra_data()
token = extra.get("auth_token_comfy_org") if _exec._is_trusted_node(real_trusted) else None
assert token == "secret-token-abc"
# A different class with the same module name must still be rejected
impostor = _make_class("ImpostorNode", "comfy_api_nodes.real")
assert _exec._is_trusted_node(impostor) is False
token2 = extra.get("auth_token_comfy_org") if _exec._is_trusted_node(impostor) else None
assert token2 is None
finally:
_exec._TRUSTED_NODE_CLASSES = original