diff --git a/openhands/cli/main.py b/openhands/cli/main.py index 89ed49d3ce..0433d022a9 100644 --- a/openhands/cli/main.py +++ b/openhands/cli/main.py @@ -70,6 +70,7 @@ from openhands.memory.condenser.impl.llm_summarizing_condenser import ( LLMSummarizingCondenserConfig, ) from openhands.microagent.microagent import BaseMicroagent +from openhands.runtime import get_runtime_cls from openhands.runtime.base import Runtime from openhands.storage.settings.file_settings_store import FileSettingsStore @@ -480,6 +481,9 @@ After reviewing the file, please ask the user what they would like to do with it else: task_str = read_task(args, config.cli_multiline_input) + # Setup the runtime + get_runtime_cls(config.runtime).setup(config) + # Run the first session new_session_requested = await run_session( loop, @@ -497,6 +501,9 @@ After reviewing the file, please ask the user what they would like to do with it loop, config, settings_store, current_dir, None ) + # Teardown the runtime + get_runtime_cls(config.runtime).teardown(config) + def main(): loop = asyncio.new_event_loop() diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index ce3624acd4..b5f983dc28 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -1087,3 +1087,11 @@ fi Returns False by default. """ return False + + @classmethod + def setup(cls, config: OpenHandsConfig): + """Set up the environment for runtimes to be created.""" + + @classmethod + def teardown(cls, config: OpenHandsConfig): + """Tear down the environment in which runtimes are created.""" diff --git a/openhands/runtime/impl/local/local_runtime.py b/openhands/runtime/impl/local/local_runtime.py index 34eea74056..90b21954b6 100644 --- a/openhands/runtime/impl/local/local_runtime.py +++ b/openhands/runtime/impl/local/local_runtime.py @@ -60,6 +60,9 @@ class ActionExecutionServerInfo: # Global dictionary to track running server processes by session ID _RUNNING_SERVERS: dict[str, ActionExecutionServerInfo] = {} +# Global list to track warm servers waiting for use +_WARM_SERVERS: list[ActionExecutionServerInfo] = [] + def get_user_info() -> tuple[int, str | None]: """Get user ID and username in a cross-platform way.""" @@ -205,6 +208,9 @@ class LocalRuntime(ActionExecutionClient): """Start the action_execution_server on the local machine or connect to an existing one.""" self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME) + # Get environment variables for warm server configuration + desired_num_warm_servers = int(os.getenv('DESIRED_NUM_WARM_SERVERS', '0')) + # Check if there's already a server running for this session ID if self.sid in _RUNNING_SERVERS: self.log('info', f'Connecting to existing server for session {self.sid}') @@ -252,133 +258,96 @@ class LocalRuntime(ActionExecutionClient): f'Using workspace directory: {self.config.workspace_mount_path_in_sandbox}' ) - # Start a new server - self._execution_server_port = self._find_available_port( - EXECUTION_SERVER_PORT_RANGE - ) - self._vscode_port = int( - os.getenv('VSCODE_PORT') - or str(self._find_available_port(VSCODE_PORT_RANGE)) - ) - self._app_ports = [ - int( - os.getenv('APP_PORT_1') - or str(self._find_available_port(APP_PORT_RANGE_1)) - ), - int( - os.getenv('APP_PORT_2') - or str(self._find_available_port(APP_PORT_RANGE_2)) - ), - ] - self.api_url = ( - f'{self.config.sandbox.local_runtime_url}:{self._execution_server_port}' - ) - - # Start the server process - cmd = get_action_execution_server_startup_command( - server_port=self._execution_server_port, - plugins=self.plugins, - app_config=self.config, - python_prefix=[], - python_executable=sys.executable, - override_user_id=self._user_id, - override_username=self._username, - ) - - self.log('info', f'Starting server with command: {cmd}') - env = os.environ.copy() - # Get the code repo path - code_repo_path = os.path.dirname(os.path.dirname(openhands.__file__)) - env['PYTHONPATH'] = os.pathsep.join( - [code_repo_path, env.get('PYTHONPATH', '')] - ) - env['OPENHANDS_REPO_PATH'] = code_repo_path - env['LOCAL_RUNTIME_MODE'] = '1' - env['VSCODE_PORT'] = str(self._vscode_port) - - # Derive environment paths using sys.executable - interpreter_path = sys.executable - python_bin_path = os.path.dirname(interpreter_path) - - # Prepend the interpreter's bin directory to PATH for subprocesses - env['PATH'] = f'{python_bin_path}{os.pathsep}{env.get("PATH", "")}' - logger.debug(f'Updated PATH for subprocesses: {env["PATH"]}') - - # Check dependencies using the derived env_root_path if not skipped - if os.getenv('SKIP_DEPENDENCY_CHECK', '') != '1': - check_browser = self.config.enable_browser and sys.platform != 'win32' - check_dependencies(code_repo_path, check_browser) - - self.server_process = subprocess.Popen( # noqa: S603 - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1, - env=env, - cwd=code_repo_path, # Explicitly set the working directory - ) - - # Start a thread to read and log server output - def log_output() -> None: - if not self.server_process or not self.server_process.stdout: - self.log( - 'error', 'Server process or stdout not available for logging.' - ) - return - + # Check if we have a warm server available + warm_server_available = False + if _WARM_SERVERS and not self.attach_to_existing: try: - # Read lines while the process is running and stdout is available - while self.server_process.poll() is None: - if self._log_thread_exit_event.is_set(): # Check exit event - self.log('info', 'Log thread received exit signal.') - break # Exit loop if signaled - line = self.server_process.stdout.readline() - if not line: - # Process might have exited between poll() and readline() - break - self.log('info', f'Server: {line.strip()}') + # Pop a warm server from the list + self.log('info', 'Using a warm server') + server_info = _WARM_SERVERS.pop(0) - # Capture any remaining output after the process exits OR if signaled - if ( - not self._log_thread_exit_event.is_set() - ): # Check again before reading remaining - self.log( - 'info', 'Server process exited, reading remaining output.' + # Use the warm server + self.server_process = server_info.process + self._execution_server_port = server_info.execution_server_port + self._log_thread = server_info.log_thread + self._log_thread_exit_event = server_info.log_thread_exit_event + self._vscode_port = server_info.vscode_port + self._app_ports = server_info.app_ports + + # We need to clean up the warm server's temp workspace and create a new one + if server_info.temp_workspace: + shutil.rmtree(server_info.temp_workspace) + + # Create a new temp workspace for this session + if self._temp_workspace is None: + self._temp_workspace = tempfile.mkdtemp( + prefix=f'openhands_workspace_{self.sid}', + ) + self.config.workspace_mount_path_in_sandbox = ( + self._temp_workspace ) - for line in self.server_process.stdout: - if ( - self._log_thread_exit_event.is_set() - ): # Check inside loop too - self.log( - 'info', - 'Log thread received exit signal while reading remaining output.', - ) - break - self.log('info', f'Server (remaining): {line.strip()}') + self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._execution_server_port}' + + # Store the server process in the global dictionary with the new workspace + _RUNNING_SERVERS[self.sid] = ActionExecutionServerInfo( + process=self.server_process, + execution_server_port=self._execution_server_port, + vscode_port=self._vscode_port, + app_ports=self._app_ports, + log_thread=self._log_thread, + log_thread_exit_event=self._log_thread_exit_event, + temp_workspace=self._temp_workspace, + workspace_mount_path=self.config.workspace_mount_path_in_sandbox, + ) + + warm_server_available = True + except IndexError: + # No warm servers available + self.log('info', 'No warm servers available, starting a new server') + warm_server_available = False except Exception as e: - # Log the error, but don't prevent the thread from potentially exiting - self.log('error', f'Error reading server output: {e}') - finally: - self.log( - 'info', 'Log output thread finished.' - ) # Add log for thread exit + # Error using warm server + self.log('error', f'Error using warm server: {e}') + warm_server_available = False - self._log_thread = threading.Thread(target=log_output, daemon=True) - self._log_thread.start() + # If no warm server is available, start a new one + if not warm_server_available: + # Create a new server + server_info, api_url = _create_server( + config=self.config, + plugins=self.plugins, + workspace_prefix=self.sid, + ) - # Store the server process in the global dictionary - _RUNNING_SERVERS[self.sid] = ActionExecutionServerInfo( - process=self.server_process, - execution_server_port=self._execution_server_port, - vscode_port=self._vscode_port, - app_ports=self._app_ports, - log_thread=self._log_thread, - log_thread_exit_event=self._log_thread_exit_event, - temp_workspace=self._temp_workspace, - workspace_mount_path=self.config.workspace_mount_path_in_sandbox, - ) + # Set instance variables + self.server_process = server_info.process + self._execution_server_port = server_info.execution_server_port + self._vscode_port = server_info.vscode_port + self._app_ports = server_info.app_ports + self._log_thread = server_info.log_thread + self._log_thread_exit_event = server_info.log_thread_exit_event + + # We need to use the existing temp workspace, not the one created by _create_server + if ( + server_info.temp_workspace + and server_info.temp_workspace != self._temp_workspace + ): + shutil.rmtree(server_info.temp_workspace) + + self.api_url = api_url + + # Store the server process in the global dictionary with the correct workspace + _RUNNING_SERVERS[self.sid] = ActionExecutionServerInfo( + process=self.server_process, + execution_server_port=self._execution_server_port, + vscode_port=self._vscode_port, + app_ports=self._app_ports, + log_thread=self._log_thread, + log_thread_exit_event=self._log_thread_exit_event, + temp_workspace=self._temp_workspace, + workspace_mount_path=self.config.workspace_mount_path_in_sandbox, + ) self.log('info', f'Waiting for server to become ready at {self.api_url}...') self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME) @@ -396,14 +365,33 @@ class LocalRuntime(ActionExecutionClient): self.set_runtime_status(RuntimeStatus.READY) self._runtime_initialized = True - def _find_available_port( - self, port_range: tuple[int, int], max_attempts: int = 5 - ) -> int: - port = port_range[1] - for _ in range(max_attempts): - port = find_available_tcp_port(port_range[0], port_range[1]) - return port - return port + # Check if we need to create more warm servers after connecting + if ( + desired_num_warm_servers > 0 + and len(_WARM_SERVERS) < desired_num_warm_servers + ): + num_to_create = desired_num_warm_servers - len(_WARM_SERVERS) + self.log( + 'info', + f'Creating {num_to_create} additional warm servers to reach desired count', + ) + for _ in range(num_to_create): + _create_warm_server_in_background(self.config, self.plugins) + + @classmethod + def setup(cls, config: OpenHandsConfig): + should_check_dependencies = os.getenv('SKIP_DEPENDENCY_CHECK', '') != '1' + if should_check_dependencies: + code_repo_path = os.path.dirname(os.path.dirname(openhands.__file__)) + check_browser = config.enable_browser and sys.platform != 'win32' + check_dependencies(code_repo_path, check_browser) + + initial_num_warm_servers = int(os.getenv('INITIAL_NUM_WARM_SERVERS', '0')) + # Initialize warm servers if needed + if initial_num_warm_servers > 0 and len(_WARM_SERVERS) == 0: + plugins = _get_plugins(config) + for _ in range(initial_num_warm_servers): + _create_warm_server_in_background(config, plugins) @tenacity.retry( wait=tenacity.wait_fixed(2), @@ -453,6 +441,21 @@ class LocalRuntime(ActionExecutionClient): json={'action': event_to_dict(action)}, ) ) + + # After executing the action, check if we need to create more warm servers + desired_num_warm_servers = int( + os.getenv('DESIRED_NUM_WARM_SERVERS', '0') + ) + if ( + desired_num_warm_servers > 0 + and len(_WARM_SERVERS) < desired_num_warm_servers + ): + self.log( + 'info', + f'Creating a new warm server to maintain desired count of {desired_num_warm_servers}', + ) + _create_warm_server_in_background(self.config, self.plugins) + return observation_from_dict(response.json()) except httpx.NetworkError: raise AgentRuntimeDisconnectedError('Server connection lost') @@ -519,6 +522,33 @@ class LocalRuntime(ActionExecutionClient): del _RUNNING_SERVERS[conversation_id] logger.info(f'LocalRuntime for conversation {conversation_id} deleted') + # Also clean up any warm servers if this is the last conversation being deleted + if not _RUNNING_SERVERS: + logger.info('No active conversations, cleaning up warm servers') + for server_info in _WARM_SERVERS[:]: + # Signal the log thread to exit + server_info.log_thread_exit_event.set() + + # Terminate the server process + if server_info.process: + server_info.process.terminate() + try: + server_info.process.wait(timeout=5) + except subprocess.TimeoutExpired: + server_info.process.kill() + + # Wait for the log thread to finish + server_info.log_thread.join(timeout=5) + + # Clean up temp workspace + if server_info.temp_workspace: + shutil.rmtree(server_info.temp_workspace) + + # Remove from warm servers list + _WARM_SERVERS.remove(server_info) + + logger.info('All warm servers cleaned up') + @property def runtime_url(self) -> str: runtime_url = os.getenv('RUNTIME_URL') @@ -556,3 +586,204 @@ class LocalRuntime(ActionExecutionClient): for port in self._app_ports: hosts[f'{self.runtime_url}:{port}'] = port return hosts + + +def _python_bin_path(): + # Derive environment paths using sys.executable + interpreter_path = sys.executable + python_bin_path = os.path.dirname(interpreter_path) + return python_bin_path + + +def _create_server( + config: OpenHandsConfig, + plugins: list[PluginRequirement], + workspace_prefix: str, +) -> tuple[ActionExecutionServerInfo, str]: + logger.info('Creating a server') + + # Set up workspace directory + temp_workspace = tempfile.mkdtemp( + prefix=f'openhands_workspace_{workspace_prefix}', + ) + workspace_mount_path = temp_workspace + + # Find available ports + execution_server_port = find_available_tcp_port(*EXECUTION_SERVER_PORT_RANGE) + vscode_port = int( + os.getenv('VSCODE_PORT') or str(find_available_tcp_port(*VSCODE_PORT_RANGE)) + ) + app_ports = [ + int(os.getenv('APP_PORT_1') or str(find_available_tcp_port(*APP_PORT_RANGE_1))), + int(os.getenv('APP_PORT_2') or str(find_available_tcp_port(*APP_PORT_RANGE_2))), + ] + + # Get user info + user_id, username = get_user_info() + + # Start the server process + cmd = get_action_execution_server_startup_command( + server_port=execution_server_port, + plugins=plugins, + app_config=config, + python_prefix=['poetry', 'run'], + override_user_id=user_id, + override_username=username, + ) + + logger.info(f'Starting server with command: {cmd}') + + env = os.environ.copy() + # Get the code repo path + code_repo_path = os.path.dirname(os.path.dirname(openhands.__file__)) + env['PYTHONPATH'] = os.pathsep.join([code_repo_path, env.get('PYTHONPATH', '')]) + env['OPENHANDS_REPO_PATH'] = code_repo_path + env['LOCAL_RUNTIME_MODE'] = '1' + env['VSCODE_PORT'] = str(vscode_port) + + # Prepend the interpreter's bin directory to PATH for subprocesses + env['PATH'] = f'{_python_bin_path()}{os.pathsep}{env.get("PATH", "")}' + + logger.debug(f'Updated PATH for subprocesses: {env["PATH"]}') + + server_process = subprocess.Popen( # noqa: S603 + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1, + env=env, + cwd=code_repo_path, + ) + + log_thread_exit_event = threading.Event() + + # Start a thread to read and log server output + def log_output() -> None: + if not server_process or not server_process.stdout: + logger.error('server process or stdout not available for logging.') + return + + try: + # Read lines while the process is running and stdout is available + while server_process.poll() is None: + if log_thread_exit_event.is_set(): + logger.info('server log thread received exit signal.') + break + line = server_process.stdout.readline() + if not line: + break + logger.info(f'server: {line.strip()}') + + # Capture any remaining output + if not log_thread_exit_event.is_set(): + logger.info('server process exited, reading remaining output.') + for line in server_process.stdout: + if log_thread_exit_event.is_set(): + break + logger.info(f'server (remaining): {line.strip()}') + + except Exception as e: + logger.error(f'Error reading server output: {e}') + finally: + logger.info('server log output thread finished.') + + log_thread = threading.Thread(target=log_output, daemon=True) + log_thread.start() + + # Create server info object + server_info = ActionExecutionServerInfo( + process=server_process, + execution_server_port=execution_server_port, + vscode_port=vscode_port, + app_ports=app_ports, + log_thread=log_thread, + log_thread_exit_event=log_thread_exit_event, + temp_workspace=temp_workspace, + workspace_mount_path=workspace_mount_path, + ) + + # API URL for the server + api_url = f'{config.sandbox.local_runtime_url}:{execution_server_port}' + + return server_info, api_url + + +def _create_warm_server( + config: OpenHandsConfig, + plugins: list[PluginRequirement], +) -> None: + """Create a warm server in the background.""" + try: + server_info, api_url = _create_server( + config=config, + plugins=plugins, + workspace_prefix='warm', + ) + + # Wait for the server to be ready + session = httpx.Client(timeout=30) + + # Use tenacity to retry the connection + @tenacity.retry( + wait=tenacity.wait_fixed(2), + stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), + before_sleep=lambda retry_state: logger.debug( + f'Waiting for warm server to be ready... (attempt {retry_state.attempt_number})' + ), + ) + def wait_until_alive() -> bool: + if server_info.process.poll() is not None: + raise RuntimeError('Warm server process died') + + try: + response = session.get(f'{api_url}/alive') + response.raise_for_status() + return True + except Exception as e: + logger.debug(f'Warm server not ready yet: {e}') + raise + + wait_until_alive() + logger.info(f'Warm server ready at port {server_info.execution_server_port}') + + # Add to the warm servers list + _WARM_SERVERS.append(server_info) + except Exception as e: + logger.error(f'Failed to create warm server: {e}') + # Clean up resources + if 'server_info' in locals(): + server_info.log_thread_exit_event.set() + if server_info.process: + server_info.process.terminate() + try: + server_info.process.wait(timeout=5) + except subprocess.TimeoutExpired: + server_info.process.kill() + server_info.log_thread.join(timeout=5) + if server_info.temp_workspace: + shutil.rmtree(server_info.temp_workspace) + + +def _create_warm_server_in_background( + config: OpenHandsConfig, + plugins: list[PluginRequirement], +) -> None: + """Start a new thread to create a warm server.""" + thread = threading.Thread( + target=_create_warm_server, daemon=True, args=(config, plugins) + ) + thread.start() + + +def _get_plugins(config: OpenHandsConfig) -> list[PluginRequirement]: + from openhands.controller.agent import Agent + from openhands.llm.llm import LLM + + agent_config = config.get_agent_config(config.default_agent) + llm = LLM( + config=config.get_llm_config_from_agent(config.default_agent), + ) + agent = Agent.get_cls(config.default_agent)(llm, agent_config) + plugins = agent.sandbox_plugins + return plugins diff --git a/openhands/server/conversation_manager/docker_nested_conversation_manager.py b/openhands/server/conversation_manager/docker_nested_conversation_manager.py index 25dfba44f5..4cc47adb44 100644 --- a/openhands/server/conversation_manager/docker_nested_conversation_manager.py +++ b/openhands/server/conversation_manager/docker_nested_conversation_manager.py @@ -22,6 +22,7 @@ from openhands.events.nested_event_store import NestedEventStore from openhands.events.stream import EventStream from openhands.integrations.provider import PROVIDER_TOKEN_TYPE, ProviderHandler from openhands.llm.llm import LLM +from openhands.runtime import get_runtime_cls from openhands.runtime.impl.docker.docker_runtime import DockerRuntime from openhands.server.config.server_config import ServerConfig from openhands.server.conversation_manager.conversation_manager import ( @@ -56,12 +57,12 @@ class DockerNestedConversationManager(ConversationManager): _runtime_container_image: str | None = None async def __aenter__(self): - # No action is required on startup for this implementation - pass + runtime_cls = get_runtime_cls(self.config.runtime) + runtime_cls.setup(self.config) async def __aexit__(self, exc_type, exc_value, traceback): - # No action is required on shutdown for this implementation - pass + runtime_cls = get_runtime_cls(self.config.runtime) + runtime_cls.teardown(self.config) async def attach_to_conversation( self, sid: str, user_id: str | None = None @@ -86,9 +87,7 @@ class DockerNestedConversationManager(ConversationManager): async def get_running_agent_loops( self, user_id: str | None = None, filter_to_sids: set[str] | None = None ) -> set[str]: - """ - Get the running agent loops directly from docker. - """ + """Get the running agent loops directly from docker.""" containers: list[Container] = self.docker_client.containers.list() names = (container.name or '' for container in containers) conversation_ids = { @@ -380,8 +379,10 @@ class DockerNestedConversationManager(ConversationManager): def get_agent_session(self, sid: str): """Get the agent session for a given session ID. + Args: sid: The session ID. + Returns: The agent session, or None if not found. """ diff --git a/openhands/server/conversation_manager/standalone_conversation_manager.py b/openhands/server/conversation_manager/standalone_conversation_manager.py index 977e29439d..cb0b1d5e25 100644 --- a/openhands/server/conversation_manager/standalone_conversation_manager.py +++ b/openhands/server/conversation_manager/standalone_conversation_manager.py @@ -12,6 +12,7 @@ from openhands.core.logger import openhands_logger as logger from openhands.core.schema.agent import AgentState from openhands.events.action import MessageAction from openhands.events.stream import EventStreamSubscriber, session_exists +from openhands.runtime import get_runtime_cls from openhands.server.config.server_config import ServerConfig from openhands.server.data_models.agent_loop_info import AgentLoopInfo from openhands.server.monitoring import MonitoringListener @@ -72,12 +73,14 @@ class StandaloneConversationManager(ConversationManager): # Grab a reference to the main event loop. This is the loop in which `await sio.emit` must be called self._loop = asyncio.get_event_loop() self._cleanup_task = asyncio.create_task(self._cleanup_stale()) + get_runtime_cls(self.config.runtime).setup(self.config) return self async def __aexit__(self, exc_type, exc_value, traceback): if self._cleanup_task: self._cleanup_task.cancel() self._cleanup_task = None + get_runtime_cls(self.config.runtime).teardown(self.config) async def attach_to_conversation( self, sid: str, user_id: str | None = None