mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
This commit is contained in:
parent
84e28234e5
commit
dcfc2da428
@ -17,16 +17,26 @@ class LogStreamer:
|
||||
logFn: Callable,
|
||||
):
|
||||
self.log = logFn
|
||||
self.log_generator = container.logs(stream=True, follow=True)
|
||||
# Initialize all attributes before starting the thread on this instance
|
||||
self.stdout_thread = None
|
||||
self.log_generator = None
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
# Start the stdout streaming thread
|
||||
self.stdout_thread = threading.Thread(target=self._stream_logs)
|
||||
self.stdout_thread.daemon = True
|
||||
self.stdout_thread.start()
|
||||
try:
|
||||
self.log_generator = container.logs(stream=True, follow=True)
|
||||
# Start the stdout streaming thread
|
||||
self.stdout_thread = threading.Thread(target=self._stream_logs)
|
||||
self.stdout_thread.daemon = True
|
||||
self.stdout_thread.start()
|
||||
except Exception as e:
|
||||
self.log('error', f'Failed to initialize log streaming: {e}')
|
||||
|
||||
def _stream_logs(self) -> None:
|
||||
"""Stream logs from the Docker container to stdout."""
|
||||
if not self.log_generator:
|
||||
self.log('error', 'Log generator not initialized')
|
||||
return
|
||||
|
||||
try:
|
||||
for log_line in self.log_generator:
|
||||
if self._stop_event.is_set():
|
||||
@ -38,7 +48,11 @@ class LogStreamer:
|
||||
self.log('error', f'Error streaming docker logs to stdout: {e}')
|
||||
|
||||
def __del__(self) -> None:
|
||||
if self.stdout_thread and self.stdout_thread.is_alive():
|
||||
if (
|
||||
hasattr(self, 'stdout_thread')
|
||||
and self.stdout_thread
|
||||
and self.stdout_thread.is_alive()
|
||||
):
|
||||
self.close(timeout=5)
|
||||
|
||||
def close(self, timeout: float = 5.0) -> None:
|
||||
@ -47,5 +61,5 @@ class LogStreamer:
|
||||
if self.stdout_thread and self.stdout_thread.is_alive():
|
||||
self.stdout_thread.join(timeout)
|
||||
# Close the log generator to release the file descriptor
|
||||
if hasattr(self.log_generator, 'close'):
|
||||
if self.log_generator is not None:
|
||||
self.log_generator.close()
|
||||
|
||||
90
tests/unit/test_log_streamer.py
Normal file
90
tests/unit/test_log_streamer.py
Normal file
@ -0,0 +1,90 @@
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from openhands.runtime.utils.log_streamer import LogStreamer
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_container():
|
||||
return Mock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_log_fn():
|
||||
return Mock()
|
||||
|
||||
|
||||
def test_init_failure_handling(mock_container, mock_log_fn):
|
||||
"""Test that LogStreamer handles initialization failures gracefully."""
|
||||
mock_container.logs.side_effect = Exception('Test error')
|
||||
|
||||
streamer = LogStreamer(mock_container, mock_log_fn)
|
||||
assert streamer.stdout_thread is None
|
||||
assert streamer.log_generator is None
|
||||
mock_log_fn.assert_called_with(
|
||||
'error', 'Failed to initialize log streaming: Test error'
|
||||
)
|
||||
|
||||
|
||||
def test_stream_logs_without_generator(mock_container, mock_log_fn):
|
||||
"""Test that _stream_logs handles missing log generator gracefully."""
|
||||
streamer = LogStreamer(mock_container, mock_log_fn)
|
||||
streamer.log_generator = None
|
||||
streamer._stream_logs()
|
||||
mock_log_fn.assert_called_with('error', 'Log generator not initialized')
|
||||
|
||||
|
||||
def test_cleanup_without_thread(mock_container, mock_log_fn):
|
||||
"""Test that cleanup works even if stdout_thread is not initialized."""
|
||||
streamer = LogStreamer(mock_container, mock_log_fn)
|
||||
streamer.stdout_thread = None
|
||||
streamer.close() # Should not raise any exceptions
|
||||
|
||||
|
||||
def test_normal_operation(mock_container, mock_log_fn):
|
||||
"""Test normal operation of LogStreamer."""
|
||||
|
||||
# Create a mock generator class that mimics Docker's log generator
|
||||
class MockLogGenerator:
|
||||
def __init__(self, logs):
|
||||
self.logs = iter(logs)
|
||||
self.closed = False
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if self.closed:
|
||||
raise StopIteration
|
||||
return next(self.logs)
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
mock_logs = MockLogGenerator([b'test log 1\n', b'test log 2\n'])
|
||||
mock_container.logs.return_value = mock_logs
|
||||
|
||||
streamer = LogStreamer(mock_container, mock_log_fn)
|
||||
assert streamer.stdout_thread is not None
|
||||
assert streamer.log_generator is not None
|
||||
|
||||
streamer.close()
|
||||
|
||||
# Verify logs were processed
|
||||
expected_calls = [
|
||||
('debug', '[inside container] test log 1'),
|
||||
('debug', '[inside container] test log 2'),
|
||||
]
|
||||
actual_calls = [(args[0], args[1]) for args, _ in mock_log_fn.call_args_list]
|
||||
for expected in expected_calls:
|
||||
assert expected in actual_calls
|
||||
|
||||
|
||||
def test_del_without_thread(mock_container, mock_log_fn):
|
||||
"""Test that __del__ works even if stdout_thread was not initialized."""
|
||||
streamer = LogStreamer(mock_container, mock_log_fn)
|
||||
delattr(
|
||||
streamer, 'stdout_thread'
|
||||
) # Simulate case where the thread was never created
|
||||
streamer.__del__() # Should not raise any exceptions
|
||||
Loading…
x
Reference in New Issue
Block a user