From e7aae1495c2a2c914e4b13618cd704153ab954d1 Mon Sep 17 00:00:00 2001 From: hereisok Date: Tue, 26 Aug 2025 01:56:57 +0800 Subject: [PATCH] perf: remove the sleep before runtime initialization (#10033) Signed-off-by: hereisok Co-authored-by: mamoodi --- openhands/server/session/agent_session.py | 5 --- openhands/server/session/session.py | 40 +++++++++++++++++++++-- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/openhands/server/session/agent_session.py b/openhands/server/session/agent_session.py index cc7a6f76c8..984a5985b5 100644 --- a/openhands/server/session/agent_session.py +++ b/openhands/server/session/agent_session.py @@ -362,11 +362,6 @@ class AgentSession: git_provider_tokens=git_provider_tokens, ) - # FIXME: this sleep is a terrible hack. - # This is to give the websocket a second to connect, so that - # the status messages make it through to the frontend. - # We should find a better way to plumb status messages through. - await asyncio.sleep(1) try: await self.runtime.connect() except AgentRuntimeUnavailableError as e: diff --git a/openhands/server/session/session.py b/openhands/server/session/session.py index abdfd56a83..c45f11e9d4 100644 --- a/openhands/server/session/session.py +++ b/openhands/server/session/session.py @@ -89,6 +89,12 @@ class Session: self.loop = asyncio.get_event_loop() self.user_id = user_id + self._publish_queue: asyncio.Queue = asyncio.Queue() + self._monitor_publish_queue_task: asyncio.Task = self.loop.create_task( + self._monitor_publish_queue() + ) + self._wait_websocket_initial_complete: bool = True + async def close(self) -> None: if self.sio: await self.sio.emit( @@ -100,6 +106,7 @@ class Session: ) self.is_alive = False await self.agent_session.close() + self._monitor_publish_queue_task.cancel() async def initialize_agent( self, @@ -338,17 +345,44 @@ class Session: self.agent_session.event_stream.add_event(event, EventSource.USER) async def send(self, data: dict[str, object]) -> None: - if asyncio.get_running_loop() != self.loop: - self.loop.create_task(self._send(data)) + self._publish_queue.put_nowait(data) + + async def _monitor_publish_queue(self): + try: + while True: + data: dict = await self._publish_queue.get() + await self._send(data) + except asyncio.CancelledError: return - await self._send(data) async def _send(self, data: dict[str, object]) -> bool: try: if not self.is_alive: return False + + _start_time = time.time() + _waiting_times = 1 + if self.sio: + # Wait once during initialization to avoid event push failures during websocket connection intervals + while self._wait_websocket_initial_complete and ( + time.time() - _start_time < 2 + ): + if bool( + self.sio.manager.rooms.get('/', {}).get( + ROOM_KEY.format(sid=self.sid) + ) + ): + break + self.logger.warning( + f'There is no listening client in the current room,' + f' waiting for the {_waiting_times}th attempt: {self.sid}' + ) + _waiting_times += 1 + await asyncio.sleep(0.1) + self._wait_websocket_initial_complete = False await self.sio.emit('oh_event', data, to=ROOM_KEY.format(sid=self.sid)) + await asyncio.sleep(0.001) # This flushes the data to the client self.last_active_ts = int(time.time()) return True