From 1a410446e33c16e206d0ce77617753c2f19dabdf Mon Sep 17 00:00:00 2001 From: RUiNtheExtinct Date: Sun, 28 Dec 2025 14:35:31 +0530 Subject: [PATCH] fix(logger): add max size limit for pending logs to prevent OOM If flush callbacks persistently fail (e.g., network issues), logs would accumulate indefinitely in _logs_since_flush, potentially causing OOM on long-running servers. Added MAX_PENDING_LOGS (10000) limit - when exceeded, oldest logs are dropped. This is similar to how the global logs deque uses maxlen. --- app/logger.py | 7 ++++++ tests-unit/app_test/test_logger.py | 38 ++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/app/logger.py b/app/logger.py index 22ee2b11b..3c317882e 100644 --- a/app/logger.py +++ b/app/logger.py @@ -11,6 +11,10 @@ stderr_interceptor = None class LogInterceptor(io.TextIOWrapper): + # Maximum logs to buffer between flushes to prevent unbounded memory growth + # if callbacks persistently fail. 10000 entries is ~2-5MB depending on message size. + MAX_PENDING_LOGS = 10000 + def __init__(self, stream, *args, **kwargs): buffer = stream.buffer encoding = stream.encoding @@ -23,6 +27,9 @@ class LogInterceptor(io.TextIOWrapper): entry = {"t": datetime.now().isoformat(), "m": data} with self._lock: self._logs_since_flush.append(entry) + # Enforce max size to prevent OOM if callbacks persistently fail + if len(self._logs_since_flush) > self.MAX_PENDING_LOGS: + self._logs_since_flush = self._logs_since_flush[-self.MAX_PENDING_LOGS:] # Simple handling for cr to overwrite the last output if it isnt a full line # else logs just get full of progress messages diff --git a/tests-unit/app_test/test_logger.py b/tests-unit/app_test/test_logger.py index b57ba1887..52e1522b1 100644 --- a/tests-unit/app_test/test_logger.py +++ b/tests-unit/app_test/test_logger.py @@ -240,3 +240,41 @@ class TestLogInterceptorWrite: # Check that it was added to _logs_since_flush assert len(interceptor._logs_since_flush) == 1 assert interceptor._logs_since_flush[0]["m"] == "test message" + + def test_write_enforces_max_pending_logs(self): + """Test that write() enforces MAX_PENDING_LOGS to prevent OOM.""" + from app.logger import LogInterceptor + + class MockStream: + def __init__(self): + self._buffer = io.BytesIO() + self.encoding = 'utf-8' + self.line_buffering = False + + @property + def buffer(self): + return self._buffer + + mock_stream = MockStream() + interceptor = LogInterceptor(mock_stream) + + # Initialize the global logs + import app.logger + from collections import deque + app.logger.logs = deque(maxlen=100) + + # Manually set _logs_since_flush to be at the limit + interceptor._logs_since_flush = [ + {"t": "test", "m": f"old_message_{i}"} + for i in range(LogInterceptor.MAX_PENDING_LOGS) + ] + + # Write one more message - should trigger trimming + interceptor.write("new_message") + + # Should still be at MAX_PENDING_LOGS, oldest dropped + assert len(interceptor._logs_since_flush) == LogInterceptor.MAX_PENDING_LOGS + # The new message should be at the end + assert interceptor._logs_since_flush[-1]["m"] == "new_message" + # The oldest message should have been dropped (old_message_0) + assert interceptor._logs_since_flush[0]["m"] == "old_message_1"