Use the same event stream instance for conversations as sessions (#9545)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell 2025-07-07 14:37:17 -06:00 committed by GitHub
parent 7cfecb6e52
commit 517a72fd0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 68 additions and 41 deletions

View File

@ -105,9 +105,22 @@ class StandaloneConversationManager(ConversationManager):
)
return conversation
# Get the event stream for the conversation - required to keep the cur_id up to date
event_stream = None
runtime = None
session = self._local_agent_loops_by_sid.get(sid)
if session:
event_stream = session.agent_session.event_stream
runtime = session.agent_session.runtime
# Create new conversation if none exists
c = ServerConversation(
sid, file_store=self.file_store, config=self.config, user_id=user_id
sid,
file_store=self.file_store,
config=self.config,
user_id=user_id,
event_stream=event_stream,
runtime=runtime,
)
try:
await c.connect()

View File

@ -15,6 +15,7 @@ class ServerConversation:
event_stream: EventStream
runtime: Runtime
user_id: str | None
_attach_to_existing: bool = False
def __init__(
self,
@ -22,30 +23,43 @@ class ServerConversation:
file_store: FileStore,
config: OpenHandsConfig,
user_id: str | None,
event_stream: EventStream | None = None,
runtime: Runtime | None = None,
):
self.sid = sid
self.config = config
self.file_store = file_store
self.user_id = user_id
self.event_stream = EventStream(sid, file_store, user_id)
if event_stream is None:
event_stream = EventStream(sid, file_store, user_id)
self.event_stream = event_stream
if config.security.security_analyzer:
self.security_analyzer = options.SecurityAnalyzers.get(
config.security.security_analyzer, SecurityAnalyzer
)(self.event_stream)
runtime_cls = get_runtime_cls(self.config.runtime)
self.runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
attach_to_existing=True,
headless_mode=False,
)
if runtime:
self._attach_to_existing = True
else:
runtime_cls = get_runtime_cls(self.config.runtime)
runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
attach_to_existing=True,
headless_mode=False,
)
self.runtime = runtime
async def connect(self) -> None:
await self.runtime.connect()
if not self._attach_to_existing:
await self.runtime.connect()
async def disconnect(self) -> None:
if self._attach_to_existing:
return
if self.event_stream:
self.event_stream.close()
asyncio.create_task(call_sync_from_async(self.runtime.close))

View File

@ -6,7 +6,7 @@ from openhands.core.config import LLMConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.message import MessageAction
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.events.event_store import EventStore
from openhands.llm.llm import LLM
from openhands.storage.data_models.settings import Settings
from openhands.storage.files import FileStore
@ -90,12 +90,12 @@ async def auto_generate_title(
A generated title string
"""
try:
# Create an event stream for the conversation
event_stream = EventStream(conversation_id, file_store, user_id)
# Create an event store for the conversation
event_store = EventStore(conversation_id, file_store, user_id)
# Find the first user message
first_user_message = None
for event in event_stream.search_events():
for event in event_store.search_events():
if (
event.source == EventSource.USER
and isinstance(event, MessageAction)

View File

@ -9,7 +9,7 @@ from openhands.core.config.llm_config import LLMConfig
from openhands.core.config.openhands_config import OpenHandsConfig
from openhands.events.action import MessageAction
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.events.event_store import EventStore
from openhands.server.conversation_manager.standalone_conversation_manager import (
StandaloneConversationManager,
)
@ -37,14 +37,14 @@ async def test_auto_generate_title_with_llm():
user_message._id = 1
user_message._timestamp = datetime.now(timezone.utc).isoformat()
# Mock the EventStream class
# Mock the EventStore class
with patch(
'openhands.utils.conversation_summary.EventStream'
) as mock_event_stream_cls:
'openhands.utils.conversation_summary.EventStore'
) as mock_event_store_cls:
# Configure the mock event stream to return our test message
mock_event_stream = MagicMock(spec=EventStream)
mock_event_stream.search_events.return_value = [user_message]
mock_event_stream_cls.return_value = mock_event_stream
mock_event_store = MagicMock(spec=EventStore)
mock_event_store.search_events.return_value = [user_message]
mock_event_store_cls.return_value = mock_event_store
# Mock the LLM response
with patch('openhands.utils.conversation_summary.LLM') as mock_llm_cls:
@ -69,8 +69,8 @@ async def test_auto_generate_title_with_llm():
# Verify the result
assert title == 'Python Data Analysis Script'
# Verify EventStream was created with the correct parameters
mock_event_stream_cls.assert_called_once_with(
# Verify EventStore was created with the correct parameters
mock_event_store_cls.assert_called_once_with(
conversation_id, file_store, user_id
)
@ -102,14 +102,14 @@ async def test_auto_generate_title_fallback():
user_message._id = 1
user_message._timestamp = datetime.now(timezone.utc).isoformat()
# Mock the EventStream class
# Mock the EventStore class
with patch(
'openhands.utils.conversation_summary.EventStream'
) as mock_event_stream_cls:
'openhands.utils.conversation_summary.EventStore'
) as mock_event_store_cls:
# Configure the mock event stream to return our test message
mock_event_stream = MagicMock(spec=EventStream)
mock_event_stream.search_events.return_value = [user_message]
mock_event_stream_cls.return_value = mock_event_stream
mock_event_store = MagicMock(spec=EventStore)
mock_event_store.search_events.return_value = [user_message]
mock_event_store_cls.return_value = mock_event_store
# Mock the LLM to raise an exception
with patch('openhands.utils.conversation_summary.LLM') as mock_llm_cls:
@ -132,8 +132,8 @@ async def test_auto_generate_title_fallback():
assert title == 'This is a very long message th...'
assert len(title) <= 35
# Verify EventStream was created with the correct parameters
mock_event_stream_cls.assert_called_once_with(
# Verify EventStore was created with the correct parameters
mock_event_store_cls.assert_called_once_with(
conversation_id, file_store, user_id
)
@ -148,14 +148,14 @@ async def test_auto_generate_title_no_messages():
conversation_id = 'test-conversation'
user_id = 'test-user'
# Mock the EventStream class
# Mock the EventStore class
with patch(
'openhands.utils.conversation_summary.EventStream'
) as mock_event_stream_cls:
# Configure the mock event stream to return no events
mock_event_stream = MagicMock(spec=EventStream)
mock_event_stream.search_events.return_value = []
mock_event_stream_cls.return_value = mock_event_stream
'openhands.utils.conversation_summary.EventStore'
) as mock_event_store_cls:
# Configure the mock event store to return no events
mock_event_store = MagicMock(spec=EventStore)
mock_event_store.search_events.return_value = []
mock_event_store_cls.return_value = mock_event_store
# Create test settings
settings = Settings(
@ -172,8 +172,8 @@ async def test_auto_generate_title_no_messages():
# Verify the result is empty
assert title == ''
# Verify EventStream was created with the correct parameters
mock_event_stream_cls.assert_called_once_with(
# Verify EventStore was created with the correct parameters
mock_event_store_cls.assert_called_once_with(
conversation_id, file_store, user_id
)