diff --git a/openhands/server/conversation_manager/conversation_manager.py b/openhands/server/conversation_manager/conversation_manager.py index f8d1843d3f..330bcc14c8 100644 --- a/openhands/server/conversation_manager/conversation_manager.py +++ b/openhands/server/conversation_manager/conversation_manager.py @@ -107,6 +107,10 @@ class ConversationManager(ABC): async def send_to_event_stream(self, connection_id: str, data: dict): """Send data to an event stream.""" + @abstractmethod + async def send_event_to_conversation(self, sid: str, data: dict): + """Send an event to a conversation.""" + @abstractmethod async def disconnect_from_session(self, connection_id: str): """Disconnect from a session.""" diff --git a/openhands/server/conversation_manager/docker_nested_conversation_manager.py b/openhands/server/conversation_manager/docker_nested_conversation_manager.py index 22fb55ca47..e7dbdb6f6e 100644 --- a/openhands/server/conversation_manager/docker_nested_conversation_manager.py +++ b/openhands/server/conversation_manager/docker_nested_conversation_manager.py @@ -275,6 +275,18 @@ class DockerNestedConversationManager(ConversationManager): # Not supported - clients should connect directly to the nested server! raise ValueError('unsupported_operation') + async def send_event_to_conversation(self, sid, data): + async with httpx.AsyncClient( + headers={ + 'X-Session-API-Key': self._get_session_api_key_for_conversation(sid) + } + ) as client: + nested_url = self._get_nested_url(sid) + response = await client.post( + f'{nested_url}/api/conversations/{sid}/events', json=data + ) + response.raise_for_status() + async def disconnect_from_session(self, connection_id: str): # Not supported - clients should connect directly to the nested server! raise ValueError('unsupported_operation') diff --git a/openhands/server/conversation_manager/standalone_conversation_manager.py b/openhands/server/conversation_manager/standalone_conversation_manager.py index 9a372ad080..6b3c880add 100644 --- a/openhands/server/conversation_manager/standalone_conversation_manager.py +++ b/openhands/server/conversation_manager/standalone_conversation_manager.py @@ -331,13 +331,13 @@ class StandaloneConversationManager(ConversationManager): sid = self._local_connection_id_to_session_id.get(connection_id) if not sid: raise RuntimeError(f'no_connected_session:{connection_id}') + await self.send_event_to_conversation(sid, data) + async def send_event_to_conversation(self, sid: str, data: dict): session = self._local_agent_loops_by_sid.get(sid) - if session: - await session.dispatch(data) - return - - raise RuntimeError(f'no_connected_session:{connection_id}:{sid}') + if not session: + raise RuntimeError(f'no_conversation:{sid}') + await session.dispatch(data) async def disconnect_from_session(self, connection_id: str): sid = self._local_connection_id_to_session_id.pop(connection_id, None)