Conversation Manager small refactor (#9286)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell 2025-06-22 19:27:03 -06:00 committed by GitHub
parent 5d69e606eb
commit c9bb0fc168
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 21 additions and 5 deletions

View File

@ -107,6 +107,10 @@ class ConversationManager(ABC):
async def send_to_event_stream(self, connection_id: str, data: dict):
"""Send data to an event stream."""
@abstractmethod
async def send_event_to_conversation(self, sid: str, data: dict):
"""Send an event to a conversation."""
@abstractmethod
async def disconnect_from_session(self, connection_id: str):
"""Disconnect from a session."""

View File

@ -275,6 +275,18 @@ class DockerNestedConversationManager(ConversationManager):
# Not supported - clients should connect directly to the nested server!
raise ValueError('unsupported_operation')
async def send_event_to_conversation(self, sid, data):
async with httpx.AsyncClient(
headers={
'X-Session-API-Key': self._get_session_api_key_for_conversation(sid)
}
) as client:
nested_url = self._get_nested_url(sid)
response = await client.post(
f'{nested_url}/api/conversations/{sid}/events', json=data
)
response.raise_for_status()
async def disconnect_from_session(self, connection_id: str):
# Not supported - clients should connect directly to the nested server!
raise ValueError('unsupported_operation')

View File

@ -331,13 +331,13 @@ class StandaloneConversationManager(ConversationManager):
sid = self._local_connection_id_to_session_id.get(connection_id)
if not sid:
raise RuntimeError(f'no_connected_session:{connection_id}')
await self.send_event_to_conversation(sid, data)
async def send_event_to_conversation(self, sid: str, data: dict):
session = self._local_agent_loops_by_sid.get(sid)
if session:
await session.dispatch(data)
return
raise RuntimeError(f'no_connected_session:{connection_id}:{sid}')
if not session:
raise RuntimeError(f'no_conversation:{sid}')
await session.dispatch(data)
async def disconnect_from_session(self, connection_id: str):
sid = self._local_connection_id_to_session_id.pop(connection_id, None)