mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-11 14:50:49 +08:00
If a callback modifies the logs list (e.g., clear()) and a subsequent callback raises an exception, the preserved logs for retry would have been corrupted. Now passes a shallow copy to callbacks.
322 lines
10 KiB
Python
322 lines
10 KiB
Python
"""Tests for the logger module, specifically LogInterceptor."""
|
|
|
|
import io
|
|
import pytest
|
|
from unittest.mock import MagicMock
|
|
|
|
|
|
class TestLogInterceptorFlush:
|
|
"""Test that LogInterceptor.flush() handles OSError gracefully."""
|
|
|
|
def test_flush_handles_errno_22(self):
|
|
"""Test that flush() catches OSError with errno 22 and still executes callbacks."""
|
|
# We can't easily mock the parent flush, so we test the behavior by
|
|
# creating a LogInterceptor and verifying the flush method exists
|
|
# with the try-except structure.
|
|
|
|
# Read the source to verify the fix is in place
|
|
import inspect
|
|
from app.logger import LogInterceptor
|
|
|
|
source = inspect.getsource(LogInterceptor.flush)
|
|
|
|
# Verify the try-except structure is present
|
|
assert 'try:' in source
|
|
assert 'super().flush()' in source
|
|
assert 'except OSError as e:' in source
|
|
assert 'e.errno != 22' in source or 'e.errno == 22' in source
|
|
|
|
def test_flush_callback_execution(self):
|
|
"""Test that flush callbacks are executed."""
|
|
from app.logger import LogInterceptor
|
|
|
|
# Create a proper stream for LogInterceptor
|
|
import sys
|
|
|
|
# Use a StringIO-based approach with a real buffer
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Register a callback
|
|
callback_results = []
|
|
interceptor.on_flush(lambda logs: callback_results.append(len(logs)))
|
|
|
|
# Add some logs
|
|
interceptor._logs_since_flush = [
|
|
{"t": "test", "m": "message1"},
|
|
{"t": "test", "m": "message2"}
|
|
]
|
|
|
|
# Flush should execute callback
|
|
interceptor.flush()
|
|
|
|
assert len(callback_results) == 1
|
|
assert callback_results[0] == 2 # Two log entries
|
|
|
|
def test_flush_clears_logs_after_callback(self):
|
|
"""Test that logs are cleared after flush callbacks."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Add a dummy callback
|
|
interceptor.on_flush(lambda logs: None)
|
|
|
|
# Add some logs
|
|
interceptor._logs_since_flush = [{"t": "test", "m": "message"}]
|
|
|
|
# Flush
|
|
interceptor.flush()
|
|
|
|
# Logs should be cleared
|
|
assert interceptor._logs_since_flush == []
|
|
|
|
def test_flush_multiple_callbacks_receive_same_logs(self):
|
|
"""Test that all callbacks receive the same logs, not just the first one."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Register multiple callbacks
|
|
callback1_results = []
|
|
callback2_results = []
|
|
callback3_results = []
|
|
interceptor.on_flush(lambda logs: callback1_results.append(len(logs)))
|
|
interceptor.on_flush(lambda logs: callback2_results.append(len(logs)))
|
|
interceptor.on_flush(lambda logs: callback3_results.append(len(logs)))
|
|
|
|
# Add some logs
|
|
interceptor._logs_since_flush = [
|
|
{"t": "test", "m": "message1"},
|
|
{"t": "test", "m": "message2"},
|
|
{"t": "test", "m": "message3"}
|
|
]
|
|
|
|
# Flush should execute all callbacks with the same logs
|
|
interceptor.flush()
|
|
|
|
# All callbacks should have received 3 log entries
|
|
assert callback1_results == [3]
|
|
assert callback2_results == [3]
|
|
assert callback3_results == [3]
|
|
|
|
def test_flush_preserves_logs_when_callback_raises(self):
|
|
"""Test that logs are preserved for retry if a callback raises an exception."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Register a callback that raises
|
|
def raising_callback(logs):
|
|
raise ValueError("Callback error")
|
|
|
|
interceptor.on_flush(raising_callback)
|
|
|
|
# Add some logs
|
|
original_logs = [
|
|
{"t": "test", "m": "message1"},
|
|
{"t": "test", "m": "message2"}
|
|
]
|
|
interceptor._logs_since_flush = original_logs.copy()
|
|
|
|
# Flush should raise
|
|
with pytest.raises(ValueError, match="Callback error"):
|
|
interceptor.flush()
|
|
|
|
# Logs should be preserved for retry on next flush
|
|
assert interceptor._logs_since_flush == original_logs
|
|
|
|
def test_flush_protects_logs_from_callback_mutation(self):
|
|
"""Test that callback mutations don't affect preserved logs on failure."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# First callback mutates the list, second raises
|
|
def mutating_callback(logs):
|
|
logs.clear() # Mutate the passed list
|
|
|
|
def raising_callback(logs):
|
|
raise ValueError("Callback error")
|
|
|
|
interceptor.on_flush(mutating_callback)
|
|
interceptor.on_flush(raising_callback)
|
|
|
|
# Add some logs
|
|
original_logs = [
|
|
{"t": "test", "m": "message1"},
|
|
{"t": "test", "m": "message2"}
|
|
]
|
|
interceptor._logs_since_flush = original_logs.copy()
|
|
|
|
# Flush should raise
|
|
with pytest.raises(ValueError, match="Callback error"):
|
|
interceptor.flush()
|
|
|
|
# Logs should be preserved despite mutation by first callback
|
|
assert interceptor._logs_since_flush == original_logs
|
|
|
|
def test_flush_clears_logs_after_all_callbacks_succeed(self):
|
|
"""Test that logs are cleared only after all callbacks execute successfully."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Register multiple callbacks
|
|
callback1_results = []
|
|
callback2_results = []
|
|
interceptor.on_flush(lambda logs: callback1_results.append(len(logs)))
|
|
interceptor.on_flush(lambda logs: callback2_results.append(len(logs)))
|
|
|
|
# Add some logs
|
|
interceptor._logs_since_flush = [
|
|
{"t": "test", "m": "message1"},
|
|
{"t": "test", "m": "message2"}
|
|
]
|
|
|
|
# Flush should succeed
|
|
interceptor.flush()
|
|
|
|
# All callbacks should have executed
|
|
assert callback1_results == [2]
|
|
assert callback2_results == [2]
|
|
|
|
# Logs should be cleared after success
|
|
assert interceptor._logs_since_flush == []
|
|
|
|
|
|
class TestLogInterceptorWrite:
|
|
"""Test that LogInterceptor.write() works correctly."""
|
|
|
|
def test_write_adds_to_logs(self):
|
|
"""Test that write() adds entries to the log buffer."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Initialize the global logs
|
|
import app.logger
|
|
from collections import deque
|
|
app.logger.logs = deque(maxlen=100)
|
|
|
|
# Write a message
|
|
interceptor.write("test message")
|
|
|
|
# Check that it was added to _logs_since_flush
|
|
assert len(interceptor._logs_since_flush) == 1
|
|
assert interceptor._logs_since_flush[0]["m"] == "test message"
|
|
|
|
def test_write_enforces_max_pending_logs(self):
|
|
"""Test that write() enforces MAX_PENDING_LOGS to prevent OOM."""
|
|
from app.logger import LogInterceptor
|
|
|
|
class MockStream:
|
|
def __init__(self):
|
|
self._buffer = io.BytesIO()
|
|
self.encoding = 'utf-8'
|
|
self.line_buffering = False
|
|
|
|
@property
|
|
def buffer(self):
|
|
return self._buffer
|
|
|
|
mock_stream = MockStream()
|
|
interceptor = LogInterceptor(mock_stream)
|
|
|
|
# Initialize the global logs
|
|
import app.logger
|
|
from collections import deque
|
|
app.logger.logs = deque(maxlen=100)
|
|
|
|
# Manually set _logs_since_flush to be at the limit
|
|
interceptor._logs_since_flush = [
|
|
{"t": "test", "m": f"old_message_{i}"}
|
|
for i in range(LogInterceptor.MAX_PENDING_LOGS)
|
|
]
|
|
|
|
# Write one more message - should trigger trimming
|
|
interceptor.write("new_message")
|
|
|
|
# Should still be at MAX_PENDING_LOGS, oldest dropped
|
|
assert len(interceptor._logs_since_flush) == LogInterceptor.MAX_PENDING_LOGS
|
|
# The new message should be at the end
|
|
assert interceptor._logs_since_flush[-1]["m"] == "new_message"
|
|
# The oldest message should have been dropped (old_message_0)
|
|
assert interceptor._logs_since_flush[0]["m"] == "old_message_1"
|