Add warm server functionality to local runtime (#9033)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell
2025-07-10 12:12:39 -06:00
committed by GitHub
parent 5cc47ee592
commit 050e80cc34
5 changed files with 385 additions and 135 deletions

View File

@@ -70,6 +70,7 @@ from openhands.memory.condenser.impl.llm_summarizing_condenser import (
LLMSummarizingCondenserConfig,
)
from openhands.microagent.microagent import BaseMicroagent
from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.storage.settings.file_settings_store import FileSettingsStore
@@ -480,6 +481,9 @@ After reviewing the file, please ask the user what they would like to do with it
else:
task_str = read_task(args, config.cli_multiline_input)
# Setup the runtime
get_runtime_cls(config.runtime).setup(config)
# Run the first session
new_session_requested = await run_session(
loop,
@@ -497,6 +501,9 @@ After reviewing the file, please ask the user what they would like to do with it
loop, config, settings_store, current_dir, None
)
# Teardown the runtime
get_runtime_cls(config.runtime).teardown(config)
def main():
loop = asyncio.new_event_loop()

View File

@@ -1087,3 +1087,11 @@ fi
Returns False by default.
"""
return False
@classmethod
def setup(cls, config: OpenHandsConfig):
"""Set up the environment for runtimes to be created."""
@classmethod
def teardown(cls, config: OpenHandsConfig):
"""Tear down the environment in which runtimes are created."""

View File

@@ -60,6 +60,9 @@ class ActionExecutionServerInfo:
# Global dictionary to track running server processes by session ID
_RUNNING_SERVERS: dict[str, ActionExecutionServerInfo] = {}
# Global list to track warm servers waiting for use
_WARM_SERVERS: list[ActionExecutionServerInfo] = []
def get_user_info() -> tuple[int, str | None]:
"""Get user ID and username in a cross-platform way."""
@@ -205,6 +208,9 @@ class LocalRuntime(ActionExecutionClient):
"""Start the action_execution_server on the local machine or connect to an existing one."""
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
# Get environment variables for warm server configuration
desired_num_warm_servers = int(os.getenv('DESIRED_NUM_WARM_SERVERS', '0'))
# Check if there's already a server running for this session ID
if self.sid in _RUNNING_SERVERS:
self.log('info', f'Connecting to existing server for session {self.sid}')
@@ -252,133 +258,96 @@ class LocalRuntime(ActionExecutionClient):
f'Using workspace directory: {self.config.workspace_mount_path_in_sandbox}'
)
# Start a new server
self._execution_server_port = self._find_available_port(
EXECUTION_SERVER_PORT_RANGE
)
self._vscode_port = int(
os.getenv('VSCODE_PORT')
or str(self._find_available_port(VSCODE_PORT_RANGE))
)
self._app_ports = [
int(
os.getenv('APP_PORT_1')
or str(self._find_available_port(APP_PORT_RANGE_1))
),
int(
os.getenv('APP_PORT_2')
or str(self._find_available_port(APP_PORT_RANGE_2))
),
]
self.api_url = (
f'{self.config.sandbox.local_runtime_url}:{self._execution_server_port}'
)
# Start the server process
cmd = get_action_execution_server_startup_command(
server_port=self._execution_server_port,
plugins=self.plugins,
app_config=self.config,
python_prefix=[],
python_executable=sys.executable,
override_user_id=self._user_id,
override_username=self._username,
)
self.log('info', f'Starting server with command: {cmd}')
env = os.environ.copy()
# Get the code repo path
code_repo_path = os.path.dirname(os.path.dirname(openhands.__file__))
env['PYTHONPATH'] = os.pathsep.join(
[code_repo_path, env.get('PYTHONPATH', '')]
)
env['OPENHANDS_REPO_PATH'] = code_repo_path
env['LOCAL_RUNTIME_MODE'] = '1'
env['VSCODE_PORT'] = str(self._vscode_port)
# Derive environment paths using sys.executable
interpreter_path = sys.executable
python_bin_path = os.path.dirname(interpreter_path)
# Prepend the interpreter's bin directory to PATH for subprocesses
env['PATH'] = f'{python_bin_path}{os.pathsep}{env.get("PATH", "")}'
logger.debug(f'Updated PATH for subprocesses: {env["PATH"]}')
# Check dependencies using the derived env_root_path if not skipped
if os.getenv('SKIP_DEPENDENCY_CHECK', '') != '1':
check_browser = self.config.enable_browser and sys.platform != 'win32'
check_dependencies(code_repo_path, check_browser)
self.server_process = subprocess.Popen( # noqa: S603
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
env=env,
cwd=code_repo_path, # Explicitly set the working directory
)
# Start a thread to read and log server output
def log_output() -> None:
if not self.server_process or not self.server_process.stdout:
self.log(
'error', 'Server process or stdout not available for logging.'
)
return
# Check if we have a warm server available
warm_server_available = False
if _WARM_SERVERS and not self.attach_to_existing:
try:
# Read lines while the process is running and stdout is available
while self.server_process.poll() is None:
if self._log_thread_exit_event.is_set(): # Check exit event
self.log('info', 'Log thread received exit signal.')
break # Exit loop if signaled
line = self.server_process.stdout.readline()
if not line:
# Process might have exited between poll() and readline()
break
self.log('info', f'Server: {line.strip()}')
# Pop a warm server from the list
self.log('info', 'Using a warm server')
server_info = _WARM_SERVERS.pop(0)
# Capture any remaining output after the process exits OR if signaled
if (
not self._log_thread_exit_event.is_set()
): # Check again before reading remaining
self.log(
'info', 'Server process exited, reading remaining output.'
# Use the warm server
self.server_process = server_info.process
self._execution_server_port = server_info.execution_server_port
self._log_thread = server_info.log_thread
self._log_thread_exit_event = server_info.log_thread_exit_event
self._vscode_port = server_info.vscode_port
self._app_ports = server_info.app_ports
# We need to clean up the warm server's temp workspace and create a new one
if server_info.temp_workspace:
shutil.rmtree(server_info.temp_workspace)
# Create a new temp workspace for this session
if self._temp_workspace is None:
self._temp_workspace = tempfile.mkdtemp(
prefix=f'openhands_workspace_{self.sid}',
)
self.config.workspace_mount_path_in_sandbox = (
self._temp_workspace
)
for line in self.server_process.stdout:
if (
self._log_thread_exit_event.is_set()
): # Check inside loop too
self.log(
'info',
'Log thread received exit signal while reading remaining output.',
)
break
self.log('info', f'Server (remaining): {line.strip()}')
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._execution_server_port}'
# Store the server process in the global dictionary with the new workspace
_RUNNING_SERVERS[self.sid] = ActionExecutionServerInfo(
process=self.server_process,
execution_server_port=self._execution_server_port,
vscode_port=self._vscode_port,
app_ports=self._app_ports,
log_thread=self._log_thread,
log_thread_exit_event=self._log_thread_exit_event,
temp_workspace=self._temp_workspace,
workspace_mount_path=self.config.workspace_mount_path_in_sandbox,
)
warm_server_available = True
except IndexError:
# No warm servers available
self.log('info', 'No warm servers available, starting a new server')
warm_server_available = False
except Exception as e:
# Log the error, but don't prevent the thread from potentially exiting
self.log('error', f'Error reading server output: {e}')
finally:
self.log(
'info', 'Log output thread finished.'
) # Add log for thread exit
# Error using warm server
self.log('error', f'Error using warm server: {e}')
warm_server_available = False
self._log_thread = threading.Thread(target=log_output, daemon=True)
self._log_thread.start()
# If no warm server is available, start a new one
if not warm_server_available:
# Create a new server
server_info, api_url = _create_server(
config=self.config,
plugins=self.plugins,
workspace_prefix=self.sid,
)
# Store the server process in the global dictionary
_RUNNING_SERVERS[self.sid] = ActionExecutionServerInfo(
process=self.server_process,
execution_server_port=self._execution_server_port,
vscode_port=self._vscode_port,
app_ports=self._app_ports,
log_thread=self._log_thread,
log_thread_exit_event=self._log_thread_exit_event,
temp_workspace=self._temp_workspace,
workspace_mount_path=self.config.workspace_mount_path_in_sandbox,
)
# Set instance variables
self.server_process = server_info.process
self._execution_server_port = server_info.execution_server_port
self._vscode_port = server_info.vscode_port
self._app_ports = server_info.app_ports
self._log_thread = server_info.log_thread
self._log_thread_exit_event = server_info.log_thread_exit_event
# We need to use the existing temp workspace, not the one created by _create_server
if (
server_info.temp_workspace
and server_info.temp_workspace != self._temp_workspace
):
shutil.rmtree(server_info.temp_workspace)
self.api_url = api_url
# Store the server process in the global dictionary with the correct workspace
_RUNNING_SERVERS[self.sid] = ActionExecutionServerInfo(
process=self.server_process,
execution_server_port=self._execution_server_port,
vscode_port=self._vscode_port,
app_ports=self._app_ports,
log_thread=self._log_thread,
log_thread_exit_event=self._log_thread_exit_event,
temp_workspace=self._temp_workspace,
workspace_mount_path=self.config.workspace_mount_path_in_sandbox,
)
self.log('info', f'Waiting for server to become ready at {self.api_url}...')
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
@@ -396,14 +365,33 @@ class LocalRuntime(ActionExecutionClient):
self.set_runtime_status(RuntimeStatus.READY)
self._runtime_initialized = True
def _find_available_port(
self, port_range: tuple[int, int], max_attempts: int = 5
) -> int:
port = port_range[1]
for _ in range(max_attempts):
port = find_available_tcp_port(port_range[0], port_range[1])
return port
return port
# Check if we need to create more warm servers after connecting
if (
desired_num_warm_servers > 0
and len(_WARM_SERVERS) < desired_num_warm_servers
):
num_to_create = desired_num_warm_servers - len(_WARM_SERVERS)
self.log(
'info',
f'Creating {num_to_create} additional warm servers to reach desired count',
)
for _ in range(num_to_create):
_create_warm_server_in_background(self.config, self.plugins)
@classmethod
def setup(cls, config: OpenHandsConfig):
should_check_dependencies = os.getenv('SKIP_DEPENDENCY_CHECK', '') != '1'
if should_check_dependencies:
code_repo_path = os.path.dirname(os.path.dirname(openhands.__file__))
check_browser = config.enable_browser and sys.platform != 'win32'
check_dependencies(code_repo_path, check_browser)
initial_num_warm_servers = int(os.getenv('INITIAL_NUM_WARM_SERVERS', '0'))
# Initialize warm servers if needed
if initial_num_warm_servers > 0 and len(_WARM_SERVERS) == 0:
plugins = _get_plugins(config)
for _ in range(initial_num_warm_servers):
_create_warm_server_in_background(config, plugins)
@tenacity.retry(
wait=tenacity.wait_fixed(2),
@@ -453,6 +441,21 @@ class LocalRuntime(ActionExecutionClient):
json={'action': event_to_dict(action)},
)
)
# After executing the action, check if we need to create more warm servers
desired_num_warm_servers = int(
os.getenv('DESIRED_NUM_WARM_SERVERS', '0')
)
if (
desired_num_warm_servers > 0
and len(_WARM_SERVERS) < desired_num_warm_servers
):
self.log(
'info',
f'Creating a new warm server to maintain desired count of {desired_num_warm_servers}',
)
_create_warm_server_in_background(self.config, self.plugins)
return observation_from_dict(response.json())
except httpx.NetworkError:
raise AgentRuntimeDisconnectedError('Server connection lost')
@@ -519,6 +522,33 @@ class LocalRuntime(ActionExecutionClient):
del _RUNNING_SERVERS[conversation_id]
logger.info(f'LocalRuntime for conversation {conversation_id} deleted')
# Also clean up any warm servers if this is the last conversation being deleted
if not _RUNNING_SERVERS:
logger.info('No active conversations, cleaning up warm servers')
for server_info in _WARM_SERVERS[:]:
# Signal the log thread to exit
server_info.log_thread_exit_event.set()
# Terminate the server process
if server_info.process:
server_info.process.terminate()
try:
server_info.process.wait(timeout=5)
except subprocess.TimeoutExpired:
server_info.process.kill()
# Wait for the log thread to finish
server_info.log_thread.join(timeout=5)
# Clean up temp workspace
if server_info.temp_workspace:
shutil.rmtree(server_info.temp_workspace)
# Remove from warm servers list
_WARM_SERVERS.remove(server_info)
logger.info('All warm servers cleaned up')
@property
def runtime_url(self) -> str:
runtime_url = os.getenv('RUNTIME_URL')
@@ -556,3 +586,204 @@ class LocalRuntime(ActionExecutionClient):
for port in self._app_ports:
hosts[f'{self.runtime_url}:{port}'] = port
return hosts
def _python_bin_path():
# Derive environment paths using sys.executable
interpreter_path = sys.executable
python_bin_path = os.path.dirname(interpreter_path)
return python_bin_path
def _create_server(
config: OpenHandsConfig,
plugins: list[PluginRequirement],
workspace_prefix: str,
) -> tuple[ActionExecutionServerInfo, str]:
logger.info('Creating a server')
# Set up workspace directory
temp_workspace = tempfile.mkdtemp(
prefix=f'openhands_workspace_{workspace_prefix}',
)
workspace_mount_path = temp_workspace
# Find available ports
execution_server_port = find_available_tcp_port(*EXECUTION_SERVER_PORT_RANGE)
vscode_port = int(
os.getenv('VSCODE_PORT') or str(find_available_tcp_port(*VSCODE_PORT_RANGE))
)
app_ports = [
int(os.getenv('APP_PORT_1') or str(find_available_tcp_port(*APP_PORT_RANGE_1))),
int(os.getenv('APP_PORT_2') or str(find_available_tcp_port(*APP_PORT_RANGE_2))),
]
# Get user info
user_id, username = get_user_info()
# Start the server process
cmd = get_action_execution_server_startup_command(
server_port=execution_server_port,
plugins=plugins,
app_config=config,
python_prefix=['poetry', 'run'],
override_user_id=user_id,
override_username=username,
)
logger.info(f'Starting server with command: {cmd}')
env = os.environ.copy()
# Get the code repo path
code_repo_path = os.path.dirname(os.path.dirname(openhands.__file__))
env['PYTHONPATH'] = os.pathsep.join([code_repo_path, env.get('PYTHONPATH', '')])
env['OPENHANDS_REPO_PATH'] = code_repo_path
env['LOCAL_RUNTIME_MODE'] = '1'
env['VSCODE_PORT'] = str(vscode_port)
# Prepend the interpreter's bin directory to PATH for subprocesses
env['PATH'] = f'{_python_bin_path()}{os.pathsep}{env.get("PATH", "")}'
logger.debug(f'Updated PATH for subprocesses: {env["PATH"]}')
server_process = subprocess.Popen( # noqa: S603
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
env=env,
cwd=code_repo_path,
)
log_thread_exit_event = threading.Event()
# Start a thread to read and log server output
def log_output() -> None:
if not server_process or not server_process.stdout:
logger.error('server process or stdout not available for logging.')
return
try:
# Read lines while the process is running and stdout is available
while server_process.poll() is None:
if log_thread_exit_event.is_set():
logger.info('server log thread received exit signal.')
break
line = server_process.stdout.readline()
if not line:
break
logger.info(f'server: {line.strip()}')
# Capture any remaining output
if not log_thread_exit_event.is_set():
logger.info('server process exited, reading remaining output.')
for line in server_process.stdout:
if log_thread_exit_event.is_set():
break
logger.info(f'server (remaining): {line.strip()}')
except Exception as e:
logger.error(f'Error reading server output: {e}')
finally:
logger.info('server log output thread finished.')
log_thread = threading.Thread(target=log_output, daemon=True)
log_thread.start()
# Create server info object
server_info = ActionExecutionServerInfo(
process=server_process,
execution_server_port=execution_server_port,
vscode_port=vscode_port,
app_ports=app_ports,
log_thread=log_thread,
log_thread_exit_event=log_thread_exit_event,
temp_workspace=temp_workspace,
workspace_mount_path=workspace_mount_path,
)
# API URL for the server
api_url = f'{config.sandbox.local_runtime_url}:{execution_server_port}'
return server_info, api_url
def _create_warm_server(
config: OpenHandsConfig,
plugins: list[PluginRequirement],
) -> None:
"""Create a warm server in the background."""
try:
server_info, api_url = _create_server(
config=config,
plugins=plugins,
workspace_prefix='warm',
)
# Wait for the server to be ready
session = httpx.Client(timeout=30)
# Use tenacity to retry the connection
@tenacity.retry(
wait=tenacity.wait_fixed(2),
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
before_sleep=lambda retry_state: logger.debug(
f'Waiting for warm server to be ready... (attempt {retry_state.attempt_number})'
),
)
def wait_until_alive() -> bool:
if server_info.process.poll() is not None:
raise RuntimeError('Warm server process died')
try:
response = session.get(f'{api_url}/alive')
response.raise_for_status()
return True
except Exception as e:
logger.debug(f'Warm server not ready yet: {e}')
raise
wait_until_alive()
logger.info(f'Warm server ready at port {server_info.execution_server_port}')
# Add to the warm servers list
_WARM_SERVERS.append(server_info)
except Exception as e:
logger.error(f'Failed to create warm server: {e}')
# Clean up resources
if 'server_info' in locals():
server_info.log_thread_exit_event.set()
if server_info.process:
server_info.process.terminate()
try:
server_info.process.wait(timeout=5)
except subprocess.TimeoutExpired:
server_info.process.kill()
server_info.log_thread.join(timeout=5)
if server_info.temp_workspace:
shutil.rmtree(server_info.temp_workspace)
def _create_warm_server_in_background(
config: OpenHandsConfig,
plugins: list[PluginRequirement],
) -> None:
"""Start a new thread to create a warm server."""
thread = threading.Thread(
target=_create_warm_server, daemon=True, args=(config, plugins)
)
thread.start()
def _get_plugins(config: OpenHandsConfig) -> list[PluginRequirement]:
from openhands.controller.agent import Agent
from openhands.llm.llm import LLM
agent_config = config.get_agent_config(config.default_agent)
llm = LLM(
config=config.get_llm_config_from_agent(config.default_agent),
)
agent = Agent.get_cls(config.default_agent)(llm, agent_config)
plugins = agent.sandbox_plugins
return plugins

View File

@@ -22,6 +22,7 @@ from openhands.events.nested_event_store import NestedEventStore
from openhands.events.stream import EventStream
from openhands.integrations.provider import PROVIDER_TOKEN_TYPE, ProviderHandler
from openhands.llm.llm import LLM
from openhands.runtime import get_runtime_cls
from openhands.runtime.impl.docker.docker_runtime import DockerRuntime
from openhands.server.config.server_config import ServerConfig
from openhands.server.conversation_manager.conversation_manager import (
@@ -56,12 +57,12 @@ class DockerNestedConversationManager(ConversationManager):
_runtime_container_image: str | None = None
async def __aenter__(self):
# No action is required on startup for this implementation
pass
runtime_cls = get_runtime_cls(self.config.runtime)
runtime_cls.setup(self.config)
async def __aexit__(self, exc_type, exc_value, traceback):
# No action is required on shutdown for this implementation
pass
runtime_cls = get_runtime_cls(self.config.runtime)
runtime_cls.teardown(self.config)
async def attach_to_conversation(
self, sid: str, user_id: str | None = None
@@ -86,9 +87,7 @@ class DockerNestedConversationManager(ConversationManager):
async def get_running_agent_loops(
self, user_id: str | None = None, filter_to_sids: set[str] | None = None
) -> set[str]:
"""
Get the running agent loops directly from docker.
"""
"""Get the running agent loops directly from docker."""
containers: list[Container] = self.docker_client.containers.list()
names = (container.name or '' for container in containers)
conversation_ids = {
@@ -380,8 +379,10 @@ class DockerNestedConversationManager(ConversationManager):
def get_agent_session(self, sid: str):
"""Get the agent session for a given session ID.
Args:
sid: The session ID.
Returns:
The agent session, or None if not found.
"""

View File

@@ -12,6 +12,7 @@ from openhands.core.logger import openhands_logger as logger
from openhands.core.schema.agent import AgentState
from openhands.events.action import MessageAction
from openhands.events.stream import EventStreamSubscriber, session_exists
from openhands.runtime import get_runtime_cls
from openhands.server.config.server_config import ServerConfig
from openhands.server.data_models.agent_loop_info import AgentLoopInfo
from openhands.server.monitoring import MonitoringListener
@@ -72,12 +73,14 @@ class StandaloneConversationManager(ConversationManager):
# Grab a reference to the main event loop. This is the loop in which `await sio.emit` must be called
self._loop = asyncio.get_event_loop()
self._cleanup_task = asyncio.create_task(self._cleanup_stale())
get_runtime_cls(self.config.runtime).setup(self.config)
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if self._cleanup_task:
self._cleanup_task.cancel()
self._cleanup_task = None
get_runtime_cls(self.config.runtime).teardown(self.config)
async def attach_to_conversation(
self, sid: str, user_id: str | None = None