Fix event stream replay during new connections by replaying before joining (#8818)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Robert Brennan 2025-06-05 12:37:02 -07:00 committed by GitHub
parent afd8ee61e7
commit 309c086976
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -12,6 +12,7 @@ from openhands.events.action import (
)
from openhands.events.action.agent import RecallAction
from openhands.events.async_event_store_wrapper import AsyncEventStoreWrapper
from openhands.events.event_store import EventStore
from openhands.events.observation import (
NullObservation,
)
@ -124,26 +125,28 @@ async def connect(connection_id: str, environ: dict) -> None:
f'User {user_id} is allowed to connect to conversation {conversation_id}'
)
conversation_init_data = await setup_init_convo_settings(
user_id, conversation_id, providers_set
)
agent_loop_info = await conversation_manager.join_conversation(
conversation_id,
connection_id,
conversation_init_data,
user_id,
)
try:
event_store = EventStore(
conversation_id, conversation_manager.file_store, user_id
)
except FileNotFoundError as e:
logger.error(
f'Failed to create EventStore for conversation {conversation_id}: {e}'
)
raise ConnectionRefusedError(f'Failed to access conversation events: {e}')
logger.info(
f'Connected to conversation {conversation_id} with connection_id {connection_id}. Replaying event stream...'
f'Replaying event stream for conversation {conversation_id} with connection_id {connection_id}...'
)
agent_state_changed = None
if agent_loop_info is None:
raise ConnectionRefusedError('Failed to join conversation')
async_store = AsyncEventStoreWrapper(
agent_loop_info.event_store, latest_event_id + 1
)
# Create an async store to replay events
async_store = AsyncEventStoreWrapper(event_store, latest_event_id + 1)
# Process all available events
async for event in async_store:
logger.debug(f'oh_event: {event.__class__.__name__}')
if isinstance(
event,
(NullAction, NullObservation, RecallAction),
@ -153,13 +156,33 @@ async def connect(connection_id: str, environ: dict) -> None:
agent_state_changed = event
else:
await sio.emit('oh_event', event_to_dict(event), to=connection_id)
# Send the agent state changed event last if we have one
if agent_state_changed:
await sio.emit(
'oh_event', event_to_dict(agent_state_changed), to=connection_id
)
logger.info(
f'Finished replaying event stream for conversation {conversation_id}'
)
conversation_init_data = await setup_init_convo_settings(
user_id, conversation_id, providers_set
)
agent_loop_info = await conversation_manager.join_conversation(
conversation_id,
connection_id,
conversation_init_data,
user_id,
)
if agent_loop_info is None:
raise ConnectionRefusedError('Failed to join conversation')
logger.info(
f'Successfully joined conversation {conversation_id} with connection_id {connection_id}'
)
except ConnectionRefusedError:
# Close the broken connection after sending an error message
asyncio.create_task(sio.disconnect(connection_id))