From 517a72fd0da41bc70ee481bfbeb131a6284b694a Mon Sep 17 00:00:00 2001 From: Tim O'Farrell Date: Mon, 7 Jul 2025 14:37:17 -0600 Subject: [PATCH] Use the same event stream instance for conversations as sessions (#9545) Co-authored-by: openhands --- .../standalone_conversation_manager.py | 15 +++++- openhands/server/session/conversation.py | 34 ++++++++---- openhands/utils/conversation_summary.py | 8 +-- tests/unit/test_auto_generate_title.py | 52 +++++++++---------- 4 files changed, 68 insertions(+), 41 deletions(-) diff --git a/openhands/server/conversation_manager/standalone_conversation_manager.py b/openhands/server/conversation_manager/standalone_conversation_manager.py index 4835161e3e..977e29439d 100644 --- a/openhands/server/conversation_manager/standalone_conversation_manager.py +++ b/openhands/server/conversation_manager/standalone_conversation_manager.py @@ -105,9 +105,22 @@ class StandaloneConversationManager(ConversationManager): ) return conversation + # Get the event stream for the conversation - required to keep the cur_id up to date + event_stream = None + runtime = None + session = self._local_agent_loops_by_sid.get(sid) + if session: + event_stream = session.agent_session.event_stream + runtime = session.agent_session.runtime + # Create new conversation if none exists c = ServerConversation( - sid, file_store=self.file_store, config=self.config, user_id=user_id + sid, + file_store=self.file_store, + config=self.config, + user_id=user_id, + event_stream=event_stream, + runtime=runtime, ) try: await c.connect() diff --git a/openhands/server/session/conversation.py b/openhands/server/session/conversation.py index 85fa693121..5972a367e3 100644 --- a/openhands/server/session/conversation.py +++ b/openhands/server/session/conversation.py @@ -15,6 +15,7 @@ class ServerConversation: event_stream: EventStream runtime: Runtime user_id: str | None + _attach_to_existing: bool = False def __init__( self, @@ -22,30 +23,43 @@ class ServerConversation: file_store: FileStore, config: OpenHandsConfig, user_id: str | None, + event_stream: EventStream | None = None, + runtime: Runtime | None = None, ): self.sid = sid self.config = config self.file_store = file_store self.user_id = user_id - self.event_stream = EventStream(sid, file_store, user_id) + + if event_stream is None: + event_stream = EventStream(sid, file_store, user_id) + self.event_stream = event_stream + if config.security.security_analyzer: self.security_analyzer = options.SecurityAnalyzers.get( config.security.security_analyzer, SecurityAnalyzer )(self.event_stream) - runtime_cls = get_runtime_cls(self.config.runtime) - self.runtime = runtime_cls( - config=config, - event_stream=self.event_stream, - sid=self.sid, - attach_to_existing=True, - headless_mode=False, - ) + if runtime: + self._attach_to_existing = True + else: + runtime_cls = get_runtime_cls(self.config.runtime) + runtime = runtime_cls( + config=config, + event_stream=self.event_stream, + sid=self.sid, + attach_to_existing=True, + headless_mode=False, + ) + self.runtime = runtime async def connect(self) -> None: - await self.runtime.connect() + if not self._attach_to_existing: + await self.runtime.connect() async def disconnect(self) -> None: + if self._attach_to_existing: + return if self.event_stream: self.event_stream.close() asyncio.create_task(call_sync_from_async(self.runtime.close)) diff --git a/openhands/utils/conversation_summary.py b/openhands/utils/conversation_summary.py index c2fc0b75d9..f00792a5c5 100644 --- a/openhands/utils/conversation_summary.py +++ b/openhands/utils/conversation_summary.py @@ -6,7 +6,7 @@ from openhands.core.config import LLMConfig from openhands.core.logger import openhands_logger as logger from openhands.events.action.message import MessageAction from openhands.events.event import EventSource -from openhands.events.stream import EventStream +from openhands.events.event_store import EventStore from openhands.llm.llm import LLM from openhands.storage.data_models.settings import Settings from openhands.storage.files import FileStore @@ -90,12 +90,12 @@ async def auto_generate_title( A generated title string """ try: - # Create an event stream for the conversation - event_stream = EventStream(conversation_id, file_store, user_id) + # Create an event store for the conversation + event_store = EventStore(conversation_id, file_store, user_id) # Find the first user message first_user_message = None - for event in event_stream.search_events(): + for event in event_store.search_events(): if ( event.source == EventSource.USER and isinstance(event, MessageAction) diff --git a/tests/unit/test_auto_generate_title.py b/tests/unit/test_auto_generate_title.py index 76f373e927..f052c62e12 100644 --- a/tests/unit/test_auto_generate_title.py +++ b/tests/unit/test_auto_generate_title.py @@ -9,7 +9,7 @@ from openhands.core.config.llm_config import LLMConfig from openhands.core.config.openhands_config import OpenHandsConfig from openhands.events.action import MessageAction from openhands.events.event import EventSource -from openhands.events.stream import EventStream +from openhands.events.event_store import EventStore from openhands.server.conversation_manager.standalone_conversation_manager import ( StandaloneConversationManager, ) @@ -37,14 +37,14 @@ async def test_auto_generate_title_with_llm(): user_message._id = 1 user_message._timestamp = datetime.now(timezone.utc).isoformat() - # Mock the EventStream class + # Mock the EventStore class with patch( - 'openhands.utils.conversation_summary.EventStream' - ) as mock_event_stream_cls: + 'openhands.utils.conversation_summary.EventStore' + ) as mock_event_store_cls: # Configure the mock event stream to return our test message - mock_event_stream = MagicMock(spec=EventStream) - mock_event_stream.search_events.return_value = [user_message] - mock_event_stream_cls.return_value = mock_event_stream + mock_event_store = MagicMock(spec=EventStore) + mock_event_store.search_events.return_value = [user_message] + mock_event_store_cls.return_value = mock_event_store # Mock the LLM response with patch('openhands.utils.conversation_summary.LLM') as mock_llm_cls: @@ -69,8 +69,8 @@ async def test_auto_generate_title_with_llm(): # Verify the result assert title == 'Python Data Analysis Script' - # Verify EventStream was created with the correct parameters - mock_event_stream_cls.assert_called_once_with( + # Verify EventStore was created with the correct parameters + mock_event_store_cls.assert_called_once_with( conversation_id, file_store, user_id ) @@ -102,14 +102,14 @@ async def test_auto_generate_title_fallback(): user_message._id = 1 user_message._timestamp = datetime.now(timezone.utc).isoformat() - # Mock the EventStream class + # Mock the EventStore class with patch( - 'openhands.utils.conversation_summary.EventStream' - ) as mock_event_stream_cls: + 'openhands.utils.conversation_summary.EventStore' + ) as mock_event_store_cls: # Configure the mock event stream to return our test message - mock_event_stream = MagicMock(spec=EventStream) - mock_event_stream.search_events.return_value = [user_message] - mock_event_stream_cls.return_value = mock_event_stream + mock_event_store = MagicMock(spec=EventStore) + mock_event_store.search_events.return_value = [user_message] + mock_event_store_cls.return_value = mock_event_store # Mock the LLM to raise an exception with patch('openhands.utils.conversation_summary.LLM') as mock_llm_cls: @@ -132,8 +132,8 @@ async def test_auto_generate_title_fallback(): assert title == 'This is a very long message th...' assert len(title) <= 35 - # Verify EventStream was created with the correct parameters - mock_event_stream_cls.assert_called_once_with( + # Verify EventStore was created with the correct parameters + mock_event_store_cls.assert_called_once_with( conversation_id, file_store, user_id ) @@ -148,14 +148,14 @@ async def test_auto_generate_title_no_messages(): conversation_id = 'test-conversation' user_id = 'test-user' - # Mock the EventStream class + # Mock the EventStore class with patch( - 'openhands.utils.conversation_summary.EventStream' - ) as mock_event_stream_cls: - # Configure the mock event stream to return no events - mock_event_stream = MagicMock(spec=EventStream) - mock_event_stream.search_events.return_value = [] - mock_event_stream_cls.return_value = mock_event_stream + 'openhands.utils.conversation_summary.EventStore' + ) as mock_event_store_cls: + # Configure the mock event store to return no events + mock_event_store = MagicMock(spec=EventStore) + mock_event_store.search_events.return_value = [] + mock_event_store_cls.return_value = mock_event_store # Create test settings settings = Settings( @@ -172,8 +172,8 @@ async def test_auto_generate_title_no_messages(): # Verify the result is empty assert title == '' - # Verify EventStream was created with the correct parameters - mock_event_stream_cls.assert_called_once_with( + # Verify EventStore was created with the correct parameters + mock_event_store_cls.assert_called_once_with( conversation_id, file_store, user_id )