mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 13:47:19 +08:00
perf: remove the sleep before runtime initialization (#10033)
Signed-off-by: hereisok <hereisok@angai.wk@gmail.com> Co-authored-by: mamoodi <mamoodiha@gmail.com>
This commit is contained in:
@@ -362,11 +362,6 @@ class AgentSession:
|
||||
git_provider_tokens=git_provider_tokens,
|
||||
)
|
||||
|
||||
# FIXME: this sleep is a terrible hack.
|
||||
# This is to give the websocket a second to connect, so that
|
||||
# the status messages make it through to the frontend.
|
||||
# We should find a better way to plumb status messages through.
|
||||
await asyncio.sleep(1)
|
||||
try:
|
||||
await self.runtime.connect()
|
||||
except AgentRuntimeUnavailableError as e:
|
||||
|
||||
@@ -89,6 +89,12 @@ class Session:
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.user_id = user_id
|
||||
|
||||
self._publish_queue: asyncio.Queue = asyncio.Queue()
|
||||
self._monitor_publish_queue_task: asyncio.Task = self.loop.create_task(
|
||||
self._monitor_publish_queue()
|
||||
)
|
||||
self._wait_websocket_initial_complete: bool = True
|
||||
|
||||
async def close(self) -> None:
|
||||
if self.sio:
|
||||
await self.sio.emit(
|
||||
@@ -100,6 +106,7 @@ class Session:
|
||||
)
|
||||
self.is_alive = False
|
||||
await self.agent_session.close()
|
||||
self._monitor_publish_queue_task.cancel()
|
||||
|
||||
async def initialize_agent(
|
||||
self,
|
||||
@@ -338,17 +345,44 @@ class Session:
|
||||
self.agent_session.event_stream.add_event(event, EventSource.USER)
|
||||
|
||||
async def send(self, data: dict[str, object]) -> None:
|
||||
if asyncio.get_running_loop() != self.loop:
|
||||
self.loop.create_task(self._send(data))
|
||||
self._publish_queue.put_nowait(data)
|
||||
|
||||
async def _monitor_publish_queue(self):
|
||||
try:
|
||||
while True:
|
||||
data: dict = await self._publish_queue.get()
|
||||
await self._send(data)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
await self._send(data)
|
||||
|
||||
async def _send(self, data: dict[str, object]) -> bool:
|
||||
try:
|
||||
if not self.is_alive:
|
||||
return False
|
||||
|
||||
_start_time = time.time()
|
||||
_waiting_times = 1
|
||||
|
||||
if self.sio:
|
||||
# Wait once during initialization to avoid event push failures during websocket connection intervals
|
||||
while self._wait_websocket_initial_complete and (
|
||||
time.time() - _start_time < 2
|
||||
):
|
||||
if bool(
|
||||
self.sio.manager.rooms.get('/', {}).get(
|
||||
ROOM_KEY.format(sid=self.sid)
|
||||
)
|
||||
):
|
||||
break
|
||||
self.logger.warning(
|
||||
f'There is no listening client in the current room,'
|
||||
f' waiting for the {_waiting_times}th attempt: {self.sid}'
|
||||
)
|
||||
_waiting_times += 1
|
||||
await asyncio.sleep(0.1)
|
||||
self._wait_websocket_initial_complete = False
|
||||
await self.sio.emit('oh_event', data, to=ROOM_KEY.format(sid=self.sid))
|
||||
|
||||
await asyncio.sleep(0.001) # This flushes the data to the client
|
||||
self.last_active_ts = int(time.time())
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user