Assorted fixes for the nested / docker runtimes. (#8899)

This commit is contained in:
tofarr 2025-06-04 13:56:03 -06:00 committed by GitHub
parent 7bea93b1b6
commit c6c2aafc4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 31 additions and 23 deletions

View File

@ -374,28 +374,11 @@ class DockerRuntime(ActionExecutionClient):
) )
self.log('debug', f'Container started. Server url: {self.api_url}') self.log('debug', f'Container started. Server url: {self.api_url}')
self.send_status_message('STATUS$CONTAINER_STARTED') self.send_status_message('STATUS$CONTAINER_STARTED')
except docker.errors.APIError as e:
if '409' in str(e):
self.log(
'warning',
f'Container {self.container_name} already exists. Removing...',
)
stop_all_containers(self.container_name)
return self.init_container()
else:
self.log(
'error',
f'Error: Instance {self.container_name} FAILED to start container!\n',
)
self.log('error', str(e))
raise e
except Exception as e: except Exception as e:
self.log( self.log(
'error', 'error',
f'Error: Instance {self.container_name} FAILED to start container!\n', f'Error: Instance {self.container_name} FAILED to start container!\n',
) )
self.log('error', str(e))
self.close() self.close()
raise e raise e

View File

@ -54,6 +54,7 @@ class DockerNestedConversationManager(ConversationManager):
docker_client: docker.DockerClient = field(default_factory=docker.from_env) docker_client: docker.DockerClient = field(default_factory=docker.from_env)
_conversation_store_class: type[ConversationStore] | None = None _conversation_store_class: type[ConversationStore] | None = None
_starting_conversation_ids: set[str] = field(default_factory=set) _starting_conversation_ids: set[str] = field(default_factory=set)
_runtime_container_image: str | None = None
async def __aenter__(self): async def __aenter__(self):
# No action is required on startup for this implementation # No action is required on startup for this implementation
@ -155,6 +156,12 @@ class DockerNestedConversationManager(ConversationManager):
try: try:
# Build the runtime container image if it is missing # Build the runtime container image if it is missing
await call_sync_from_async(runtime.maybe_build_runtime_container_image) await call_sync_from_async(runtime.maybe_build_runtime_container_image)
self._runtime_container_image = runtime.runtime_container_image
# check that the container already exists...
if await self._start_existing_container(runtime):
self._starting_conversation_ids.discard(sid)
return
# initialize the container but dont wait for it to start # initialize the container but dont wait for it to start
await call_sync_from_async(runtime.init_container) await call_sync_from_async(runtime.init_container)
@ -172,7 +179,7 @@ class DockerNestedConversationManager(ConversationManager):
) )
except Exception: except Exception:
self._starting_conversation_ids.remove(sid) self._starting_conversation_ids.discard(sid)
raise raise
async def _start_conversation( async def _start_conversation(
@ -262,7 +269,7 @@ class DockerNestedConversationManager(ConversationManager):
) )
assert response.status_code == status.HTTP_200_OK assert response.status_code == status.HTTP_200_OK
finally: finally:
self._starting_conversation_ids.remove(sid) self._starting_conversation_ids.discard(sid)
async def send_to_event_stream(self, connection_id: str, data: dict): async def send_to_event_stream(self, connection_id: str, data: dict):
# Not supported - clients should connect directly to the nested server! # Not supported - clients should connect directly to the nested server!
@ -431,6 +438,7 @@ class DockerNestedConversationManager(ConversationManager):
# We need to be able to specify the nested conversation id within the nested runtime # We need to be able to specify the nested conversation id within the nested runtime
env_vars['ALLOW_SET_CONVERSATION_ID'] = '1' env_vars['ALLOW_SET_CONVERSATION_ID'] = '1'
env_vars['WORKSPACE_BASE'] = f'/workspace' env_vars['WORKSPACE_BASE'] = f'/workspace'
env_vars['SANDBOX_CLOSE_DELAY'] = '0'
# Set up mounted volume for conversation directory within workspace # Set up mounted volume for conversation directory within workspace
# TODO: Check if we are using the standard event store and file store # TODO: Check if we are using the standard event store and file store
@ -442,9 +450,11 @@ class DockerNestedConversationManager(ConversationManager):
conversation_dir = get_conversation_dir(sid, user_id) conversation_dir = get_conversation_dir(sid, user_id)
volumes.append( volumes.append(
f'{config.file_store_path}/{conversation_dir}:/root/openhands/file_store/{conversation_dir}:rw' f'{config.file_store_path}/{conversation_dir}:/root/.openhands/file_store/{conversation_dir}:rw'
) )
config.sandbox.volumes = ','.join(volumes) config.sandbox.volumes = ','.join(volumes)
if not config.sandbox.runtime_container_image:
config.sandbox.runtime_container_image = self._runtime_container_image
# Currently this eventstream is never used and only exists because one is required in order to create a docker runtime # Currently this eventstream is never used and only exists because one is required in order to create a docker runtime
event_stream = EventStream(sid, self.file_store, user_id) event_stream = EventStream(sid, self.file_store, user_id)
@ -464,6 +474,18 @@ class DockerNestedConversationManager(ConversationManager):
return runtime return runtime
async def _start_existing_container(self, runtime: DockerRuntime) -> bool:
try:
container = self.docker_client.containers.get(runtime.container_name)
if container:
status = container.status
if status == 'exited':
await call_sync_from_async(container.start())
return True
return False
except docker.errors.NotFound as e:
return False
def _last_updated_at_key(conversation: ConversationMetadata) -> float: def _last_updated_at_key(conversation: ConversationMetadata) -> float:
last_updated_at = conversation.last_updated_at last_updated_at = conversation.last_updated_at

View File

@ -154,6 +154,10 @@ class StandaloneConversationManager(ConversationManager):
await conversation.disconnect() await conversation.disconnect()
self._detached_conversations.pop(sid, None) self._detached_conversations.pop(sid, None)
# Implies disconnected sandboxes stay open indefinitely
if not self.config.sandbox.close_delay:
return
close_threshold = time.time() - self.config.sandbox.close_delay close_threshold = time.time() - self.config.sandbox.close_delay
running_loops = list(self._local_agent_loops_by_sid.items()) running_loops = list(self._local_agent_loops_by_sid.items())
running_loops.sort(key=lambda item: item[1].last_active_ts) running_loops.sort(key=lambda item: item[1].last_active_ts)

View File

@ -1,7 +1,7 @@
from fastapi import Depends, Request from fastapi import Depends, Request
from openhands.server.shared import ConversationStoreImpl, config, conversation_manager from openhands.server.shared import ConversationStoreImpl, config, conversation_manager
from openhands.server.user_auth import get_user_auth, get_user_id from openhands.server.user_auth import get_user_id
from openhands.storage.conversation.conversation_store import ConversationStore from openhands.storage.conversation.conversation_store import ConversationStore
@ -11,8 +11,7 @@ async def get_conversation_store(request: Request) -> ConversationStore | None:
) )
if conversation_store: if conversation_store:
return conversation_store return conversation_store
user_auth = await get_user_auth(request) user_id = get_user_id(request)
user_id = await user_auth.get_user_id()
conversation_store = await ConversationStoreImpl.get_instance(config, user_id) conversation_store = await ConversationStoreImpl.get_instance(config, user_id)
request.state.conversation_store = conversation_store request.state.conversation_store = conversation_store
return conversation_store return conversation_store