This commit is contained in:
Godwin Iheuwa 2026-01-08 06:03:29 +03:00 committed by GitHub
commit b95b132230
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 342 additions and 3 deletions

View File

@ -11,6 +11,10 @@ stderr_interceptor = None
class LogInterceptor(io.TextIOWrapper):
# Maximum logs to buffer between flushes to prevent unbounded memory growth
# if callbacks persistently fail. 10000 entries is ~2-5MB depending on message size.
MAX_PENDING_LOGS = 10000
def __init__(self, stream, *args, **kwargs):
buffer = stream.buffer
encoding = stream.encoding
@ -23,6 +27,9 @@ class LogInterceptor(io.TextIOWrapper):
entry = {"t": datetime.now().isoformat(), "m": data}
with self._lock:
self._logs_since_flush.append(entry)
# Enforce max size to prevent OOM if callbacks persistently fail
if len(self._logs_since_flush) > self.MAX_PENDING_LOGS:
self._logs_since_flush = self._logs_since_flush[-self.MAX_PENDING_LOGS:]
# Simple handling for cr to overwrite the last output if it isnt a full line
# else logs just get full of progress messages
@ -32,10 +39,21 @@ class LogInterceptor(io.TextIOWrapper):
super().write(data)
def flush(self):
super().flush()
try:
super().flush()
except OSError as e:
# errno 22 (EINVAL) can occur on Windows with piped/redirected streams
# This is safe to ignore as write() already succeeded
if e.errno != 22:
raise
if not self._logs_since_flush:
return
# Copy to prevent callback mutations from affecting retry on failure
logs_to_send = list(self._logs_since_flush)
for cb in self._flush_callbacks:
cb(self._logs_since_flush)
self._logs_since_flush = []
cb(logs_to_send)
# Only clear after all callbacks succeed - if any raises, logs remain for retry
self._logs_since_flush = []
def on_flush(self, callback):
self._flush_callbacks.append(callback)

View File

@ -0,0 +1,321 @@
"""Tests for the logger module, specifically LogInterceptor."""
import io
import pytest
from unittest.mock import MagicMock
class TestLogInterceptorFlush:
"""Test that LogInterceptor.flush() handles OSError gracefully."""
def test_flush_handles_errno_22(self):
"""Test that flush() catches OSError with errno 22 and still executes callbacks."""
# We can't easily mock the parent flush, so we test the behavior by
# creating a LogInterceptor and verifying the flush method exists
# with the try-except structure.
# Read the source to verify the fix is in place
import inspect
from app.logger import LogInterceptor
source = inspect.getsource(LogInterceptor.flush)
# Verify the try-except structure is present
assert 'try:' in source
assert 'super().flush()' in source
assert 'except OSError as e:' in source
assert 'e.errno != 22' in source or 'e.errno == 22' in source
def test_flush_callback_execution(self):
"""Test that flush callbacks are executed."""
from app.logger import LogInterceptor
# Create a proper stream for LogInterceptor
import sys
# Use a StringIO-based approach with a real buffer
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Register a callback
callback_results = []
interceptor.on_flush(lambda logs: callback_results.append(len(logs)))
# Add some logs
interceptor._logs_since_flush = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]
# Flush should execute callback
interceptor.flush()
assert len(callback_results) == 1
assert callback_results[0] == 2 # Two log entries
def test_flush_clears_logs_after_callback(self):
"""Test that logs are cleared after flush callbacks."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Add a dummy callback
interceptor.on_flush(lambda logs: None)
# Add some logs
interceptor._logs_since_flush = [{"t": "test", "m": "message"}]
# Flush
interceptor.flush()
# Logs should be cleared
assert interceptor._logs_since_flush == []
def test_flush_multiple_callbacks_receive_same_logs(self):
"""Test that all callbacks receive the same logs, not just the first one."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Register multiple callbacks
callback1_results = []
callback2_results = []
callback3_results = []
interceptor.on_flush(lambda logs: callback1_results.append(len(logs)))
interceptor.on_flush(lambda logs: callback2_results.append(len(logs)))
interceptor.on_flush(lambda logs: callback3_results.append(len(logs)))
# Add some logs
interceptor._logs_since_flush = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"},
{"t": "test", "m": "message3"}
]
# Flush should execute all callbacks with the same logs
interceptor.flush()
# All callbacks should have received 3 log entries
assert callback1_results == [3]
assert callback2_results == [3]
assert callback3_results == [3]
def test_flush_preserves_logs_when_callback_raises(self):
"""Test that logs are preserved for retry if a callback raises an exception."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Register a callback that raises
def raising_callback(logs):
raise ValueError("Callback error")
interceptor.on_flush(raising_callback)
# Add some logs
original_logs = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]
interceptor._logs_since_flush = original_logs.copy()
# Flush should raise
with pytest.raises(ValueError, match="Callback error"):
interceptor.flush()
# Logs should be preserved for retry on next flush
assert interceptor._logs_since_flush == original_logs
def test_flush_protects_logs_from_callback_mutation(self):
"""Test that callback mutations don't affect preserved logs on failure."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# First callback mutates the list, second raises
def mutating_callback(logs):
logs.clear() # Mutate the passed list
def raising_callback(logs):
raise ValueError("Callback error")
interceptor.on_flush(mutating_callback)
interceptor.on_flush(raising_callback)
# Add some logs
original_logs = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]
interceptor._logs_since_flush = original_logs.copy()
# Flush should raise
with pytest.raises(ValueError, match="Callback error"):
interceptor.flush()
# Logs should be preserved despite mutation by first callback
assert interceptor._logs_since_flush == original_logs
def test_flush_clears_logs_after_all_callbacks_succeed(self):
"""Test that logs are cleared only after all callbacks execute successfully."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Register multiple callbacks
callback1_results = []
callback2_results = []
interceptor.on_flush(lambda logs: callback1_results.append(len(logs)))
interceptor.on_flush(lambda logs: callback2_results.append(len(logs)))
# Add some logs
interceptor._logs_since_flush = [
{"t": "test", "m": "message1"},
{"t": "test", "m": "message2"}
]
# Flush should succeed
interceptor.flush()
# All callbacks should have executed
assert callback1_results == [2]
assert callback2_results == [2]
# Logs should be cleared after success
assert interceptor._logs_since_flush == []
class TestLogInterceptorWrite:
"""Test that LogInterceptor.write() works correctly."""
def test_write_adds_to_logs(self):
"""Test that write() adds entries to the log buffer."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Initialize the global logs
import app.logger
from collections import deque
app.logger.logs = deque(maxlen=100)
# Write a message
interceptor.write("test message")
# Check that it was added to _logs_since_flush
assert len(interceptor._logs_since_flush) == 1
assert interceptor._logs_since_flush[0]["m"] == "test message"
def test_write_enforces_max_pending_logs(self):
"""Test that write() enforces MAX_PENDING_LOGS to prevent OOM."""
from app.logger import LogInterceptor
class MockStream:
def __init__(self):
self._buffer = io.BytesIO()
self.encoding = 'utf-8'
self.line_buffering = False
@property
def buffer(self):
return self._buffer
mock_stream = MockStream()
interceptor = LogInterceptor(mock_stream)
# Initialize the global logs
import app.logger
from collections import deque
app.logger.logs = deque(maxlen=100)
# Manually set _logs_since_flush to be at the limit
interceptor._logs_since_flush = [
{"t": "test", "m": f"old_message_{i}"}
for i in range(LogInterceptor.MAX_PENDING_LOGS)
]
# Write one more message - should trigger trimming
interceptor.write("new_message")
# Should still be at MAX_PENDING_LOGS, oldest dropped
assert len(interceptor._logs_since_flush) == LogInterceptor.MAX_PENDING_LOGS
# The new message should be at the end
assert interceptor._logs_since_flush[-1]["m"] == "new_message"
# The oldest message should have been dropped (old_message_0)
assert interceptor._logs_since_flush[0]["m"] == "old_message_1"