diff --git a/app/logger.py b/app/logger.py index 22ee2b11b..3c317882e 100644 --- a/app/logger.py +++ b/app/logger.py @@ -11,6 +11,10 @@ stderr_interceptor = None class LogInterceptor(io.TextIOWrapper): + # Maximum logs to buffer between flushes to prevent unbounded memory growth + # if callbacks persistently fail. 10000 entries is ~2-5MB depending on message size. + MAX_PENDING_LOGS = 10000 + def __init__(self, stream, *args, **kwargs): buffer = stream.buffer encoding = stream.encoding @@ -23,6 +27,9 @@ class LogInterceptor(io.TextIOWrapper): entry = {"t": datetime.now().isoformat(), "m": data} with self._lock: self._logs_since_flush.append(entry) + # Enforce max size to prevent OOM if callbacks persistently fail + if len(self._logs_since_flush) > self.MAX_PENDING_LOGS: + self._logs_since_flush = self._logs_since_flush[-self.MAX_PENDING_LOGS:] # Simple handling for cr to overwrite the last output if it isnt a full line # else logs just get full of progress messages diff --git a/tests-unit/app_test/test_logger.py b/tests-unit/app_test/test_logger.py index b57ba1887..52e1522b1 100644 --- a/tests-unit/app_test/test_logger.py +++ b/tests-unit/app_test/test_logger.py @@ -240,3 +240,41 @@ class TestLogInterceptorWrite: # Check that it was added to _logs_since_flush assert len(interceptor._logs_since_flush) == 1 assert interceptor._logs_since_flush[0]["m"] == "test message" + + def test_write_enforces_max_pending_logs(self): + """Test that write() enforces MAX_PENDING_LOGS to prevent OOM.""" + from app.logger import LogInterceptor + + class MockStream: + def __init__(self): + self._buffer = io.BytesIO() + self.encoding = 'utf-8' + self.line_buffering = False + + @property + def buffer(self): + return self._buffer + + mock_stream = MockStream() + interceptor = LogInterceptor(mock_stream) + + # Initialize the global logs + import app.logger + from collections import deque + app.logger.logs = deque(maxlen=100) + + # Manually set _logs_since_flush to be at the limit + interceptor._logs_since_flush = [ + {"t": "test", "m": f"old_message_{i}"} + for i in range(LogInterceptor.MAX_PENDING_LOGS) + ] + + # Write one more message - should trigger trimming + interceptor.write("new_message") + + # Should still be at MAX_PENDING_LOGS, oldest dropped + assert len(interceptor._logs_since_flush) == LogInterceptor.MAX_PENDING_LOGS + # The new message should be at the end + assert interceptor._logs_since_flush[-1]["m"] == "new_message" + # The oldest message should have been dropped (old_message_0) + assert interceptor._logs_since_flush[0]["m"] == "old_message_1"