diff --git a/evaluation/benchmarks/EDA/run_infer.py b/evaluation/benchmarks/EDA/run_infer.py index c866b5090b..e8cee3df3e 100644 --- a/evaluation/benchmarks/EDA/run_infer.py +++ b/evaluation/benchmarks/EDA/run_infer.py @@ -63,7 +63,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/agent_bench/run_infer.py b/evaluation/benchmarks/agent_bench/run_infer.py index f008c9dc8a..a64c66f22c 100644 --- a/evaluation/benchmarks/agent_bench/run_infer.py +++ b/evaluation/benchmarks/agent_bench/run_infer.py @@ -43,7 +43,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-slim', diff --git a/evaluation/benchmarks/aider_bench/run_infer.py b/evaluation/benchmarks/aider_bench/run_infer.py index e059a6b46f..bc850dbc62 100644 --- a/evaluation/benchmarks/aider_bench/run_infer.py +++ b/evaluation/benchmarks/aider_bench/run_infer.py @@ -50,7 +50,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.11-bookworm', diff --git a/evaluation/benchmarks/biocoder/run_infer.py b/evaluation/benchmarks/biocoder/run_infer.py index 2da7b09f0f..c33c75e5a2 100644 --- a/evaluation/benchmarks/biocoder/run_infer.py +++ b/evaluation/benchmarks/biocoder/run_infer.py @@ -61,7 +61,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image=BIOCODER_BENCH_CONTAINER_IMAGE, diff --git a/evaluation/benchmarks/bird/run_infer.py b/evaluation/benchmarks/bird/run_infer.py index d35084fdbc..14946ebacb 100644 --- a/evaluation/benchmarks/bird/run_infer.py +++ b/evaluation/benchmarks/bird/run_infer.py @@ -74,7 +74,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/browsing_delegation/run_infer.py b/evaluation/benchmarks/browsing_delegation/run_infer.py index 38fb6cae25..016b6c3f58 100644 --- a/evaluation/benchmarks/browsing_delegation/run_infer.py +++ b/evaluation/benchmarks/browsing_delegation/run_infer.py @@ -39,7 +39,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/commit0_bench/run_infer.py b/evaluation/benchmarks/commit0_bench/run_infer.py index 1ef347931f..d8f1f64b1a 100644 --- a/evaluation/benchmarks/commit0_bench/run_infer.py +++ b/evaluation/benchmarks/commit0_bench/run_infer.py @@ -124,7 +124,7 @@ def get_config( default_agent=metadata.agent_class, run_as_openhands=False, max_iterations=metadata.max_iterations, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), sandbox=SandboxConfig( base_container_image=base_container_image, enable_auto_lint=True, diff --git a/evaluation/benchmarks/discoverybench/run_infer.py b/evaluation/benchmarks/discoverybench/run_infer.py index 55e958d9fd..0d5b47410c 100644 --- a/evaluation/benchmarks/discoverybench/run_infer.py +++ b/evaluation/benchmarks/discoverybench/run_infer.py @@ -65,7 +65,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/gaia/run_infer.py b/evaluation/benchmarks/gaia/run_infer.py index 99c29b211d..8aaa479e92 100644 --- a/evaluation/benchmarks/gaia/run_infer.py +++ b/evaluation/benchmarks/gaia/run_infer.py @@ -50,7 +50,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/gorilla/run_infer.py b/evaluation/benchmarks/gorilla/run_infer.py index 64263242d7..e453b1f570 100644 --- a/evaluation/benchmarks/gorilla/run_infer.py +++ b/evaluation/benchmarks/gorilla/run_infer.py @@ -43,7 +43,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/gpqa/run_infer.py b/evaluation/benchmarks/gpqa/run_infer.py index d9e1caec77..08e6682792 100644 --- a/evaluation/benchmarks/gpqa/run_infer.py +++ b/evaluation/benchmarks/gpqa/run_infer.py @@ -64,7 +64,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/humanevalfix/run_infer.py b/evaluation/benchmarks/humanevalfix/run_infer.py index 3b5a5bca2f..b2fb6d677a 100644 --- a/evaluation/benchmarks/humanevalfix/run_infer.py +++ b/evaluation/benchmarks/humanevalfix/run_infer.py @@ -85,7 +85,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/logic_reasoning/run_infer.py b/evaluation/benchmarks/logic_reasoning/run_infer.py index 0a1447f061..d84c5f8ca8 100644 --- a/evaluation/benchmarks/logic_reasoning/run_infer.py +++ b/evaluation/benchmarks/logic_reasoning/run_infer.py @@ -48,7 +48,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='xingyaoww/od-eval-logic-reasoning:v1.0', diff --git a/evaluation/benchmarks/miniwob/run_infer.py b/evaluation/benchmarks/miniwob/run_infer.py index dd93fbaf0a..acc1431c81 100644 --- a/evaluation/benchmarks/miniwob/run_infer.py +++ b/evaluation/benchmarks/miniwob/run_infer.py @@ -58,7 +58,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='xingyaoww/od-eval-miniwob:v1.0', diff --git a/evaluation/benchmarks/mint/run_infer.py b/evaluation/benchmarks/mint/run_infer.py index 7106f4a59d..a98fa8d918 100644 --- a/evaluation/benchmarks/mint/run_infer.py +++ b/evaluation/benchmarks/mint/run_infer.py @@ -106,7 +106,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='xingyaoww/od-eval-mint:v1.0', diff --git a/evaluation/benchmarks/ml_bench/run_infer.py b/evaluation/benchmarks/ml_bench/run_infer.py index ab94b925ab..1c084fc149 100644 --- a/evaluation/benchmarks/ml_bench/run_infer.py +++ b/evaluation/benchmarks/ml_bench/run_infer.py @@ -80,7 +80,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='public.ecr.aws/i5g0m1f6/ml-bench', diff --git a/evaluation/benchmarks/scienceagentbench/run_infer.py b/evaluation/benchmarks/scienceagentbench/run_infer.py index db4abf0f48..ebe1b783cf 100644 --- a/evaluation/benchmarks/scienceagentbench/run_infer.py +++ b/evaluation/benchmarks/scienceagentbench/run_infer.py @@ -62,7 +62,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), max_budget_per_task=4, max_iterations=metadata.max_iterations, sandbox=SandboxConfig( diff --git a/evaluation/benchmarks/swe_bench/eval_infer.py b/evaluation/benchmarks/swe_bench/eval_infer.py index 95f65245f2..c5d479dd50 100644 --- a/evaluation/benchmarks/swe_bench/eval_infer.py +++ b/evaluation/benchmarks/swe_bench/eval_infer.py @@ -76,7 +76,7 @@ def get_config(instance: pd.Series) -> AppConfig: ) config = AppConfig( run_as_openhands=False, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), sandbox=SandboxConfig( base_container_image=base_container_image, use_host_network=False, diff --git a/evaluation/benchmarks/swe_bench/run_infer.py b/evaluation/benchmarks/swe_bench/run_infer.py index be4761da13..61c045037b 100644 --- a/evaluation/benchmarks/swe_bench/run_infer.py +++ b/evaluation/benchmarks/swe_bench/run_infer.py @@ -121,7 +121,7 @@ def get_config( default_agent=metadata.agent_class, run_as_openhands=False, max_iterations=metadata.max_iterations, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), sandbox=SandboxConfig( base_container_image=base_container_image, enable_auto_lint=True, diff --git a/evaluation/benchmarks/toolqa/run_infer.py b/evaluation/benchmarks/toolqa/run_infer.py index f88163a048..6f6f1a0e20 100644 --- a/evaluation/benchmarks/toolqa/run_infer.py +++ b/evaluation/benchmarks/toolqa/run_infer.py @@ -44,7 +44,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/benchmarks/webarena/run_infer.py b/evaluation/benchmarks/webarena/run_infer.py index d18918cf96..ac51a201a7 100644 --- a/evaluation/benchmarks/webarena/run_infer.py +++ b/evaluation/benchmarks/webarena/run_infer.py @@ -53,7 +53,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', + runtime='docker', max_iterations=metadata.max_iterations, sandbox=SandboxConfig( base_container_image='python:3.12-bookworm', diff --git a/evaluation/integration_tests/run_infer.py b/evaluation/integration_tests/run_infer.py index 2da68b9b82..fe85d23bf5 100644 --- a/evaluation/integration_tests/run_infer.py +++ b/evaluation/integration_tests/run_infer.py @@ -42,7 +42,7 @@ def get_config( config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime=os.environ.get('RUNTIME', 'eventstream'), + runtime=os.environ.get('RUNTIME', 'docker'), max_iterations=metadata.max_iterations, sandbox=SandboxConfig( # use default base_container_image diff --git a/openhands/core/config/app_config.py b/openhands/core/config/app_config.py index c049e68174..bec59f1dd5 100644 --- a/openhands/core/config/app_config.py +++ b/openhands/core/config/app_config.py @@ -49,7 +49,7 @@ class AppConfig: default_agent: str = OH_DEFAULT_AGENT sandbox: SandboxConfig = field(default_factory=SandboxConfig) security: SecurityConfig = field(default_factory=SecurityConfig) - runtime: str = 'eventstream' + runtime: str = 'docker' file_store: str = 'memory' file_store_path: str = '/tmp/file_store' trajectories_path: str | None = None diff --git a/openhands/resolver/resolve_issue.py b/openhands/resolver/resolve_issue.py index 0eb072df65..42f2ba05d8 100644 --- a/openhands/resolver/resolve_issue.py +++ b/openhands/resolver/resolve_issue.py @@ -182,7 +182,7 @@ async def process_issue( config = AppConfig( default_agent='CodeActAgent', - runtime='eventstream', + runtime='docker', max_budget_per_task=4, max_iterations=max_iterations, sandbox=SandboxConfig( diff --git a/openhands/runtime/README.md b/openhands/runtime/README.md index ca084706fb..3018433c1a 100644 --- a/openhands/runtime/README.md +++ b/openhands/runtime/README.md @@ -3,13 +3,13 @@ ## Introduction The OpenHands Runtime folder contains the core components responsible for executing actions and managing the runtime environment for the OpenHands project. This README provides an overview of the main components and their interactions. -You can learn more about how the runtime works in the [EventStream Runtime](https://docs.all-hands.dev/modules/usage/architecture/runtime) documentation. +You can learn more about how the runtime works in the [Docker Runtime](https://docs.all-hands.dev/modules/usage/architecture/runtime) documentation. ## Main Components -### 1. impl/*runtime.py +### 1. base.py -The `impl/*runtime.py` file defines the `Runtime` class, which serves as the primary [interface](./base.py) for agent interactions with the external environment. It handles various operations including: +The `base.py` file defines the `Runtime` class, which serves as the primary [interface](./base.py) for agent interactions with the external environment. It handles various operations including: - Bash sandbox execution - Browser interactions @@ -23,9 +23,16 @@ Key features of the `Runtime` class: - Action execution methods for different types of actions (run, read, write, browse, etc.) - Abstract methods for file operations (to be implemented by subclasses) -### 2. action_execution_server.py +### 2. impl/action_execution/action_execution_client.py +The `action_execution_client.py` file contains the `ActionExecutionClient` class, which implements the Runtime interface. It is an abstract implementation, meaning +it still needs to be extended by a concrete implementation to be used. -The `action_executor_server.py` file contains the `ActionExecutor` class, which is responsible for executing actions received from the OpenHands backend and producing observations. This client runs inside a Docker sandbox. +This client interacts with an action_execution_server (defined below) via HTTP +calls to actually perform runtime actions. + +### 3. action_execution_server.py + +The `action_executor_server.py` file contains the `ActionExecutor` class, which is responsible for executing actions received via the `/execute_action` HTTP endpoint. It returns observations in the HTTP response. Key features of the `ActionExecutor` class: - Initialization of user environment and bash shell @@ -33,6 +40,19 @@ Key features of the `ActionExecutor` class: - Execution of various action types (bash commands, IPython cells, file operations, browsing) - Integration with BrowserEnv for web interactions +### 4. Other Implementations +The `./impl/` directory contains a few different Runtime implementations, all of +which extend the `ActionExecutionClient` class. These implementations +handle the lifecycle of a Docker container or other environment running the +ActionExecutor server. + +There are currently four implementations: +* Docker (runs locally in a Docker container) +* Remote (runs via a custom HTTP API for creating, pausing, resuming, and stopping runtimes in a remote environment) +* Modal (uses the Modal API) +* Runloop (uses the Runloop API) + + ## Workflow Description 1. **Initialization**: @@ -76,9 +96,9 @@ Key features of the `ActionExecutor` class: ## Runtime Types -### EventStream Runtime +### Docker Runtime -The EventStream Runtime is designed for local execution using Docker containers: +The Docker Runtime is designed for local execution using Docker containers: - Creates and manages a Docker container for each session - Executes actions within the container diff --git a/openhands/runtime/__init__.py b/openhands/runtime/__init__.py index 16534daf6b..9235380daa 100644 --- a/openhands/runtime/__init__.py +++ b/openhands/runtime/__init__.py @@ -1,8 +1,8 @@ from openhands.core.logger import openhands_logger as logger -from openhands.runtime.impl.e2b.sandbox import E2BBox -from openhands.runtime.impl.eventstream.eventstream_runtime import ( - EventStreamRuntime, +from openhands.runtime.impl.docker.docker_runtime import ( + DockerRuntime, ) +from openhands.runtime.impl.e2b.sandbox import E2BBox from openhands.runtime.impl.modal.modal_runtime import ModalRuntime from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime @@ -10,8 +10,8 @@ from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime def get_runtime_cls(name: str): # Local imports to avoid circular imports - if name == 'eventstream': - return EventStreamRuntime + if name == 'eventstream' or name == 'docker': + return DockerRuntime elif name == 'e2b': return E2BBox elif name == 'remote': @@ -30,6 +30,6 @@ __all__ = [ 'RemoteRuntime', 'ModalRuntime', 'RunloopRuntime', - 'EventStreamRuntime', + 'DockerRuntime', 'get_runtime_cls', ] diff --git a/openhands/runtime/impl/action_execution/action_execution_client.py b/openhands/runtime/impl/action_execution/action_execution_client.py new file mode 100644 index 0000000000..00c847a230 --- /dev/null +++ b/openhands/runtime/impl/action_execution/action_execution_client.py @@ -0,0 +1,289 @@ +import os +import tempfile +import threading +from abc import abstractmethod +from pathlib import Path +from typing import Any +from zipfile import ZipFile + +import requests + +from openhands.core.config import AppConfig +from openhands.core.exceptions import ( + AgentRuntimeTimeoutError, +) +from openhands.events import EventStream +from openhands.events.action import ( + ActionConfirmationStatus, + BrowseInteractiveAction, + BrowseURLAction, + CmdRunAction, + FileEditAction, + FileReadAction, + FileWriteAction, + IPythonRunCellAction, +) +from openhands.events.action.action import Action +from openhands.events.observation import ( + ErrorObservation, + NullObservation, + Observation, + UserRejectObservation, +) +from openhands.events.serialization import event_to_dict, observation_from_dict +from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS +from openhands.runtime.base import Runtime +from openhands.runtime.plugins import PluginRequirement +from openhands.runtime.utils.request import send_request + + +class ActionExecutionClient(Runtime): + """Base class for runtimes that interact with the action execution server. + + This class contains shared logic between DockerRuntime and RemoteRuntime + for interacting with the HTTP server defined in action_execution_server.py. + """ + + def __init__( + self, + config: AppConfig, + event_stream: EventStream, + sid: str = 'default', + plugins: list[PluginRequirement] | None = None, + env_vars: dict[str, str] | None = None, + status_callback: Any | None = None, + attach_to_existing: bool = False, + headless_mode: bool = True, + ): + self.session = requests.Session() + self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time + self._runtime_initialized: bool = False + self._vscode_token: str | None = None # initial dummy value + super().__init__( + config, + event_stream, + sid, + plugins, + env_vars, + status_callback, + attach_to_existing, + headless_mode, + ) + + @abstractmethod + def _get_action_execution_server_host(self) -> str: + pass + + def _send_action_server_request( + self, + method: str, + url: str, + **kwargs, + ) -> requests.Response: + """Send a request to the action execution server. + + Args: + method: HTTP method (GET, POST, etc.) + url: URL to send the request to + **kwargs: Additional arguments to pass to requests.request() + + Returns: + Response from the server + + Raises: + AgentRuntimeError: If the request fails + """ + return send_request(self.session, method, url, **kwargs) + + def check_if_alive(self) -> None: + with self._send_action_server_request( + 'GET', + f'{self._get_action_execution_server_host()}/alive', + timeout=5, + ): + pass + + def list_files(self, path: str | None = None) -> list[str]: + """List files in the sandbox. + + If path is None, list files in the sandbox's initial working directory (e.g., /workspace). + """ + + try: + data = {} + if path is not None: + data['path'] = path + + with send_request( + self.session, + 'POST', + f'{self._get_action_execution_server_host()}/list_files', + json=data, + timeout=10, + ) as response: + response_json = response.json() + assert isinstance(response_json, list) + return response_json + except requests.Timeout: + raise TimeoutError('List files operation timed out') + + def copy_from(self, path: str) -> Path: + """Zip all files in the sandbox and return as a stream of bytes.""" + + try: + params = {'path': path} + with send_request( + self.session, + 'GET', + f'{self._get_action_execution_server_host()}/download_files', + params=params, + stream=True, + timeout=30, + ) as response: + temp_file = tempfile.NamedTemporaryFile(delete=False) + for chunk in response.iter_content(chunk_size=8192): + if chunk: # filter out keep-alive new chunks + temp_file.write(chunk) + return Path(temp_file.name) + except requests.Timeout: + raise TimeoutError('Copy operation timed out') + + def copy_to( + self, host_src: str, sandbox_dest: str, recursive: bool = False + ) -> None: + if not os.path.exists(host_src): + raise FileNotFoundError(f'Source file {host_src} does not exist') + + try: + if recursive: + with tempfile.NamedTemporaryFile( + suffix='.zip', delete=False + ) as temp_zip: + temp_zip_path = temp_zip.name + + with ZipFile(temp_zip_path, 'w') as zipf: + for root, _, files in os.walk(host_src): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath( + file_path, os.path.dirname(host_src) + ) + zipf.write(file_path, arcname) + + upload_data = {'file': open(temp_zip_path, 'rb')} + else: + upload_data = {'file': open(host_src, 'rb')} + + params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()} + + with self._send_action_server_request( + 'POST', + f'{self._get_action_execution_server_host()}/upload_file', + files=upload_data, + params=params, + timeout=300, + ) as response: + self.log( + 'debug', + f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}', + ) + finally: + if recursive: + os.unlink(temp_zip_path) + self.log( + 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}' + ) + + def get_vscode_token(self) -> str: + if self.vscode_enabled and self._runtime_initialized: + if self._vscode_token is not None: # cached value + return self._vscode_token + with send_request( + self.session, + 'GET', + f'{self._get_action_execution_server_host()}/vscode/connection_token', + timeout=10, + ) as response: + response_json = response.json() + assert isinstance(response_json, dict) + if response_json['token'] is None: + return '' + self._vscode_token = response_json['token'] + return response_json['token'] + else: + return '' + + def send_action_for_execution(self, action: Action) -> Observation: + if isinstance(action, FileEditAction): + return self.edit(action) + + # set timeout to default if not set + if action.timeout is None: + action.timeout = self.config.sandbox.timeout + + with self.action_semaphore: + if not action.runnable: + return NullObservation('') + if ( + hasattr(action, 'confirmation_state') + and action.confirmation_state + == ActionConfirmationStatus.AWAITING_CONFIRMATION + ): + return NullObservation('') + action_type = action.action # type: ignore[attr-defined] + if action_type not in ACTION_TYPE_TO_CLASS: + raise ValueError(f'Action {action_type} does not exist.') + if not hasattr(self, action_type): + return ErrorObservation( + f'Action {action_type} is not supported in the current runtime.', + error_id='AGENT_ERROR$BAD_ACTION', + ) + if ( + getattr(action, 'confirmation_state', None) + == ActionConfirmationStatus.REJECTED + ): + return UserRejectObservation( + 'Action has been rejected by the user! Waiting for further user input.' + ) + + assert action.timeout is not None + + try: + with send_request( + self.session, + 'POST', + f'{self._get_action_execution_server_host()}/execute_action', + json={'action': event_to_dict(action)}, + # wait a few more seconds to get the timeout error from client side + timeout=action.timeout + 5, + ) as response: + output = response.json() + obs = observation_from_dict(output) + obs._cause = action.id # type: ignore[attr-defined] + except requests.Timeout: + raise AgentRuntimeTimeoutError( + f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s' + ) + + return obs + + def run(self, action: CmdRunAction) -> Observation: + return self.send_action_for_execution(action) + + def run_ipython(self, action: IPythonRunCellAction) -> Observation: + return self.send_action_for_execution(action) + + def read(self, action: FileReadAction) -> Observation: + return self.send_action_for_execution(action) + + def write(self, action: FileWriteAction) -> Observation: + return self.send_action_for_execution(action) + + def browse(self, action: BrowseURLAction) -> Observation: + return self.send_action_for_execution(action) + + def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: + return self.send_action_for_execution(action) + + def close(self) -> None: + self.session.close() diff --git a/openhands/runtime/impl/eventstream/containers.py b/openhands/runtime/impl/docker/containers.py similarity index 100% rename from openhands/runtime/impl/eventstream/containers.py rename to openhands/runtime/impl/docker/containers.py diff --git a/openhands/runtime/impl/eventstream/eventstream_runtime.py b/openhands/runtime/impl/docker/docker_runtime.py similarity index 59% rename from openhands/runtime/impl/eventstream/eventstream_runtime.py rename to openhands/runtime/impl/docker/docker_runtime.py index ae5bea2990..41882eb521 100644 --- a/openhands/runtime/impl/eventstream/eventstream_runtime.py +++ b/openhands/runtime/impl/docker/docker_runtime.py @@ -1,11 +1,6 @@ import atexit -import os -import tempfile -import threading from functools import lru_cache -from pathlib import Path from typing import Callable -from zipfile import ZipFile import docker import requests @@ -14,40 +9,20 @@ import tenacity from openhands.core.config import AppConfig from openhands.core.exceptions import ( AgentRuntimeDisconnectedError, - AgentRuntimeError, AgentRuntimeNotFoundError, AgentRuntimeNotReadyError, - AgentRuntimeTimeoutError, ) from openhands.core.logger import DEBUG from openhands.core.logger import openhands_logger as logger from openhands.events import EventStream -from openhands.events.action import ( - ActionConfirmationStatus, - BrowseInteractiveAction, - BrowseURLAction, - CmdRunAction, - FileEditAction, - FileReadAction, - FileWriteAction, - IPythonRunCellAction, -) -from openhands.events.action.action import Action -from openhands.events.observation import ( - ErrorObservation, - NullObservation, - Observation, - UserRejectObservation, -) -from openhands.events.serialization import event_to_dict, observation_from_dict -from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS -from openhands.runtime.base import Runtime from openhands.runtime.builder import DockerRuntimeBuilder -from openhands.runtime.impl.eventstream.containers import remove_all_containers +from openhands.runtime.impl.action_execution.action_execution_client import ( + ActionExecutionClient, +) +from openhands.runtime.impl.docker.containers import remove_all_containers from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils import find_available_tcp_port from openhands.runtime.utils.log_streamer import LogStreamer -from openhands.runtime.utils.request import send_request from openhands.runtime.utils.runtime_build import build_runtime_image from openhands.utils.async_utils import call_sync_from_async from openhands.utils.tenacity_stop import stop_if_should_exit @@ -62,7 +37,7 @@ def remove_all_runtime_containers(): _atexit_registered = False -class EventStreamRuntime(Runtime): +class DockerRuntime(ActionExecutionClient): """This runtime will subscribe the event stream. When receive an event, it will send the event to runtime-client which run inside the docker environment. @@ -74,30 +49,6 @@ class EventStreamRuntime(Runtime): env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None. """ - # Need to provide this method to allow inheritors to init the Runtime - # without initting the EventStreamRuntime. - def init_base_runtime( - self, - config: AppConfig, - event_stream: EventStream, - sid: str = 'default', - plugins: list[PluginRequirement] | None = None, - env_vars: dict[str, str] | None = None, - status_callback: Callable | None = None, - attach_to_existing: bool = False, - headless_mode: bool = True, - ): - super().__init__( - config, - event_stream, - sid, - plugins, - env_vars, - status_callback, - attach_to_existing, - headless_mode, - ) - def __init__( self, config: AppConfig, @@ -117,10 +68,8 @@ class EventStreamRuntime(Runtime): self.config = config self._host_port = 30000 # initial dummy value self._container_port = 30001 # initial dummy value - self._vscode_url: str | None = None # initial dummy value self._runtime_initialized: bool = False self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}' - self.session = requests.Session() self.status_callback = status_callback self.docker_client: docker.DockerClient = self._init_docker_client() @@ -128,14 +77,13 @@ class EventStreamRuntime(Runtime): self.runtime_container_image = self.config.sandbox.runtime_container_image self.container_name = CONTAINER_NAME_PREFIX + sid self.container = None - self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time self.runtime_builder = DockerRuntimeBuilder(self.docker_client) # Buffer for container logs self.log_streamer: LogStreamer | None = None - self.init_base_runtime( + super().__init__( config, event_stream, sid, @@ -153,6 +101,9 @@ class EventStreamRuntime(Runtime): f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}', ) + def _get_action_execution_server_host(self): + return self.api_url + async def connect(self): self.send_status_message('STATUS$STARTING_RUNTIME') try: @@ -377,26 +328,18 @@ class EventStreamRuntime(Runtime): if not self.log_streamer: raise AgentRuntimeNotReadyError('Runtime client is not ready.') - with send_request( - self.session, - 'GET', - f'{self.api_url}/alive', - timeout=5, - ): - pass + self.check_if_alive() def close(self, rm_all_containers: bool | None = None): - """Closes the EventStreamRuntime and associated objects + """Closes the DockerRuntime and associated objects Parameters: - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix """ + super().close() if self.log_streamer: self.log_streamer.close() - if self.session: - self.session.close() - if rm_all_containers is None: rm_all_containers = self.config.sandbox.rm_all_containers @@ -407,178 +350,6 @@ class EventStreamRuntime(Runtime): ) remove_all_containers(close_prefix) - def run_action(self, action: Action) -> Observation: - if isinstance(action, FileEditAction): - return self.edit(action) - - # set timeout to default if not set - if action.timeout is None: - action.timeout = self.config.sandbox.timeout - - with self.action_semaphore: - if not action.runnable: - return NullObservation('') - if ( - hasattr(action, 'confirmation_state') - and action.confirmation_state - == ActionConfirmationStatus.AWAITING_CONFIRMATION - ): - return NullObservation('') - action_type = action.action # type: ignore[attr-defined] - if action_type not in ACTION_TYPE_TO_CLASS: - raise ValueError(f'Action {action_type} does not exist.') - if not hasattr(self, action_type): - return ErrorObservation( - f'Action {action_type} is not supported in the current runtime.', - error_id='AGENT_ERROR$BAD_ACTION', - ) - if ( - getattr(action, 'confirmation_state', None) - == ActionConfirmationStatus.REJECTED - ): - return UserRejectObservation( - 'Action has been rejected by the user! Waiting for further user input.' - ) - - assert action.timeout is not None - - try: - with send_request( - self.session, - 'POST', - f'{self.api_url}/execute_action', - json={'action': event_to_dict(action)}, - # wait a few more seconds to get the timeout error from client side - timeout=action.timeout + 5, - ) as response: - output = response.json() - obs = observation_from_dict(output) - obs._cause = action.id # type: ignore[attr-defined] - except requests.Timeout: - raise AgentRuntimeTimeoutError( - f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s' - ) - - return obs - - def run(self, action: CmdRunAction) -> Observation: - return self.run_action(action) - - def run_ipython(self, action: IPythonRunCellAction) -> Observation: - return self.run_action(action) - - def read(self, action: FileReadAction) -> Observation: - return self.run_action(action) - - def write(self, action: FileWriteAction) -> Observation: - return self.run_action(action) - - def browse(self, action: BrowseURLAction) -> Observation: - return self.run_action(action) - - def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: - return self.run_action(action) - - # ==================================================================== - # Implement these methods (for file operations) in the subclass - # ==================================================================== - - def copy_to( - self, host_src: str, sandbox_dest: str, recursive: bool = False - ) -> None: - if not os.path.exists(host_src): - raise FileNotFoundError(f'Source file {host_src} does not exist') - - try: - if recursive: - # For recursive copy, create a zip file - with tempfile.NamedTemporaryFile( - suffix='.zip', delete=False - ) as temp_zip: - temp_zip_path = temp_zip.name - - with ZipFile(temp_zip_path, 'w') as zipf: - for root, _, files in os.walk(host_src): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath( - file_path, os.path.dirname(host_src) - ) - zipf.write(file_path, arcname) - - upload_data = {'file': open(temp_zip_path, 'rb')} - else: - # For single file copy - upload_data = {'file': open(host_src, 'rb')} - - params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()} - - with send_request( - self.session, - 'POST', - f'{self.api_url}/upload_file', - files=upload_data, - params=params, - timeout=300, - ): - pass - - except requests.Timeout: - raise AgentRuntimeTimeoutError('Copy operation timed out') - except Exception as e: - raise AgentRuntimeError(f'Copy operation failed: {str(e)}') - finally: - if recursive: - os.unlink(temp_zip_path) - self.log( - 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}' - ) - - def list_files(self, path: str | None = None) -> list[str]: - """List files in the sandbox. - - If path is None, list files in the sandbox's initial working directory (e.g., /workspace). - """ - - try: - data = {} - if path is not None: - data['path'] = path - - with send_request( - self.session, - 'POST', - f'{self.api_url}/list_files', - json=data, - timeout=10, - ) as response: - response_json = response.json() - assert isinstance(response_json, list) - return response_json - except requests.Timeout: - raise TimeoutError('List files operation timed out') - - def copy_from(self, path: str) -> Path: - """Zip all files in the sandbox and return as a stream of bytes.""" - - try: - params = {'path': path} - with send_request( - self.session, - 'GET', - f'{self.api_url}/download_files', - params=params, - stream=True, - timeout=30, - ) as response: - temp_file = tempfile.NamedTemporaryFile(delete=False) - for chunk in response.iter_content(chunk_size=8192): - if chunk: # filter out keep-alive new chunks - temp_file.write(chunk) - return Path(temp_file.name) - except requests.Timeout: - raise TimeoutError('Copy operation timed out') - def _is_port_in_use_docker(self, port): containers = self.docker_client.containers.list() for container in containers: @@ -598,27 +369,9 @@ class EventStreamRuntime(Runtime): @property def vscode_url(self) -> str | None: - if self.vscode_enabled and self._runtime_initialized: - if ( - hasattr(self, '_vscode_url') and self._vscode_url is not None - ): # cached value - return self._vscode_url - - with send_request( - self.session, - 'GET', - f'{self.api_url}/vscode/connection_token', - timeout=10, - ) as response: - response_json = response.json() - assert isinstance(response_json, dict) - if response_json['token'] is None: - return None - self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}' - self.log( - 'debug', - f'VSCode URL: {self._vscode_url}', - ) - return self._vscode_url - else: + token = super().get_vscode_token() + print('got token', token) + if not token: return None + vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}' + return vscode_url diff --git a/openhands/runtime/impl/modal/modal_runtime.py b/openhands/runtime/impl/modal/modal_runtime.py index 026e7c0b53..473c4ae97b 100644 --- a/openhands/runtime/impl/modal/modal_runtime.py +++ b/openhands/runtime/impl/modal/modal_runtime.py @@ -1,8 +1,7 @@ import os import tempfile -import threading from pathlib import Path -from typing import Callable, Generator +from typing import Callable import modal import requests @@ -10,9 +9,8 @@ import tenacity from openhands.core.config import AppConfig from openhands.events import EventStream -from openhands.runtime.impl.eventstream.eventstream_runtime import ( - EventStreamRuntime, - LogStreamer, +from openhands.runtime.impl.action_execution.action_execution_client import ( + ActionExecutionClient, ) from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils.command import get_remote_startup_command @@ -21,52 +19,13 @@ from openhands.runtime.utils.runtime_build import ( prep_build_folder, ) from openhands.utils.async_utils import call_sync_from_async +from openhands.utils.tenacity_stop import stop_if_should_exit # FIXME: this will not work in HA mode. We need a better way to track IDs MODAL_RUNTIME_IDS: dict[str, str] = {} -# Modal's log generator returns strings, but the upstream LogBuffer expects bytes. -def bytes_shim(string_generator) -> Generator[bytes, None, None]: - for line in string_generator: - yield line.encode('utf-8') - - -class ModalLogStreamer(LogStreamer): - """Streams Modal sandbox logs to stdout. - - This class provides a way to stream logs from a Modal sandbox directly to stdout - through the provided logging function. - """ - - def __init__( - self, - sandbox: modal.Sandbox, - logFn: Callable, - ): - self.log = logFn - self._stop_event = threading.Event() - self.log_generator = bytes_shim(sandbox.stderr) - - # Start the stdout streaming thread - self.stdout_thread = threading.Thread(target=self._stream_logs) - self.stdout_thread.daemon = True - self.stdout_thread.start() - - def _stream_logs(self): - """Stream logs from the Modal sandbox.""" - try: - for log_line in self.log_generator: - if self._stop_event.is_set(): - break - if log_line: - decoded_line = log_line.decode('utf-8').rstrip() - self.log('debug', f'[inside sandbox] {decoded_line}') - except Exception as e: - self.log('error', f'Error streaming modal logs: {e}') - - -class ModalRuntime(EventStreamRuntime): +class ModalRuntime(ActionExecutionClient): """This runtime will subscribe the event stream. When receive an event, it will send the event to runtime-client which run inside the Modal sandbox environment. @@ -116,14 +75,9 @@ class ModalRuntime(EventStreamRuntime): # This value is arbitrary as it's private to the container self.container_port = 3000 - self.session = requests.Session() self.status_callback = status_callback self.base_container_image_id = self.config.sandbox.base_container_image self.runtime_container_image_id = self.config.sandbox.runtime_container_image - self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time - - # Buffer for container logs - self.log_streamer: LogStreamer | None = None if self.config.sandbox.runtime_extra_deps: self.log( @@ -131,7 +85,7 @@ class ModalRuntime(EventStreamRuntime): f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}', ) - self.init_base_runtime( + super().__init__( config, event_stream, sid, @@ -170,7 +124,6 @@ class ModalRuntime(EventStreamRuntime): self.send_status_message('STATUS$CONTAINER_STARTED') - self.log_streamer = ModalLogStreamer(self.sandbox, self.log) if self.sandbox is None: raise Exception('Sandbox not initialized') tunnel = self.sandbox.tunnels()[self.container_port] @@ -187,6 +140,20 @@ class ModalRuntime(EventStreamRuntime): if not self.attach_to_existing: self.send_status_message(' ') + def _get_action_execution_server_host(self): + return self.api_url + + @tenacity.retry( + stop=tenacity.stop_after_delay(120) | stop_if_should_exit(), + retry=tenacity.retry_if_exception_type( + (ConnectionError, requests.exceptions.ConnectionError) + ), + reraise=True, + wait=tenacity.wait_fixed(2), + ) + def _wait_until_alive(self): + self.check_if_alive() + def _get_image_definition( self, base_container_image_id: str | None, @@ -292,11 +259,7 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc def close(self): """Closes the ModalRuntime and associated objects.""" - if self.log_streamer: - self.log_streamer.close() - - if self.session: - self.session.close() + super().close() if not self.attach_to_existing and self.sandbox: self.sandbox.terminate() diff --git a/openhands/runtime/impl/remote/remote_runtime.py b/openhands/runtime/impl/remote/remote_runtime.py index 69b619b639..ba59f2281d 100644 --- a/openhands/runtime/impl/remote/remote_runtime.py +++ b/openhands/runtime/impl/remote/remote_runtime.py @@ -1,10 +1,6 @@ import os -import tempfile -import threading -from pathlib import Path from typing import Callable, Optional from urllib.parse import urlparse -from zipfile import ZipFile import requests import tenacity @@ -15,29 +11,13 @@ from openhands.core.exceptions import ( AgentRuntimeError, AgentRuntimeNotFoundError, AgentRuntimeNotReadyError, - AgentRuntimeTimeoutError, AgentRuntimeUnavailableError, ) from openhands.events import EventStream -from openhands.events.action import ( - BrowseInteractiveAction, - BrowseURLAction, - CmdRunAction, - FileEditAction, - FileReadAction, - FileWriteAction, - IPythonRunCellAction, -) -from openhands.events.action.action import Action -from openhands.events.observation import ( - ErrorObservation, - NullObservation, - Observation, -) -from openhands.events.serialization import event_to_dict, observation_from_dict -from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS -from openhands.runtime.base import Runtime from openhands.runtime.builder.remote import RemoteRuntimeBuilder +from openhands.runtime.impl.action_execution.action_execution_client import ( + ActionExecutionClient, +) from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils.command import get_remote_startup_command from openhands.runtime.utils.request import ( @@ -49,7 +29,7 @@ from openhands.utils.async_utils import call_sync_from_async from openhands.utils.tenacity_stop import stop_if_should_exit -class RemoteRuntime(Runtime): +class RemoteRuntime(ActionExecutionClient): """This runtime will connect to a remote oh-runtime-client.""" port: int = 60000 # default port for the remote runtime client @@ -65,10 +45,6 @@ class RemoteRuntime(Runtime): attach_to_existing: bool = False, headless_mode: bool = True, ): - # We need to set session and action_semaphore before the __init__ below, or we get odd errors - self.session = requests.Session() - self.action_semaphore = threading.Semaphore(1) - super().__init__( config, event_stream, @@ -98,7 +74,9 @@ class RemoteRuntime(Runtime): self.runtime_id: str | None = None self.runtime_url: str | None = None self._runtime_initialized: bool = False - self._vscode_url: str | None = None # initial dummy value + + def _get_action_execution_server_host(self): + return self.runtime_url async def connect(self): try: @@ -148,10 +126,9 @@ class RemoteRuntime(Runtime): def _check_existing_runtime(self) -> bool: try: - with self._send_request( + with self._send_runtime_api_request( 'GET', f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}', - is_retry=False, timeout=60, ) as response: data = response.json() @@ -179,10 +156,9 @@ class RemoteRuntime(Runtime): def _build_runtime(self): self.log('debug', f'Building RemoteRuntime config:\n{self.config}') - with self._send_request( + with self._send_runtime_api_request( 'GET', f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix', - is_retry=False, timeout=60, ) as response: response_json = response.json() @@ -210,10 +186,9 @@ class RemoteRuntime(Runtime): force_rebuild=self.config.sandbox.force_rebuild_runtime, ) - with self._send_request( + with self._send_runtime_api_request( 'GET', f'{self.config.sandbox.remote_runtime_api_url}/image_exists', - is_retry=False, params={'image': self.container_image}, timeout=60, ) as response: @@ -252,10 +227,9 @@ class RemoteRuntime(Runtime): # Start the sandbox using the /start endpoint try: - with self._send_request( + with self._send_runtime_api_request( 'POST', f'{self.config.sandbox.remote_runtime_api_url}/start', - is_retry=False, json=start_request, timeout=60, ) as response: @@ -269,10 +243,9 @@ class RemoteRuntime(Runtime): raise AgentRuntimeUnavailableError() from e def _resume_runtime(self): - with self._send_request( + with self._send_runtime_api_request( 'POST', f'{self.config.sandbox.remote_runtime_api_url}/resume', - is_retry=False, json={'runtime_id': self.runtime_id}, timeout=60, ): @@ -290,34 +263,19 @@ class RemoteRuntime(Runtime): @property def vscode_url(self) -> str | None: - if self.vscode_enabled and self._runtime_initialized: - if ( - hasattr(self, '_vscode_url') and self._vscode_url is not None - ): # cached value - return self._vscode_url - - with self._send_request( - 'GET', - f'{self.runtime_url}/vscode/connection_token', - timeout=60, - ) as response: - response_json = response.json() - assert isinstance(response_json, dict) - if response_json['token'] is None: - return None - # parse runtime_url to get vscode_url - _parsed_url = urlparse(self.runtime_url) - assert isinstance(_parsed_url.scheme, str) and isinstance( - _parsed_url.netloc, str - ) - self._vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}' - self.log( - 'debug', - f'VSCode URL: {self._vscode_url}', - ) - return self._vscode_url - else: + token = super().get_vscode_token() + if not token: return None + _parsed_url = urlparse(self.runtime_url) + assert isinstance(_parsed_url.scheme, str) and isinstance( + _parsed_url.netloc, str + ) + vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}' + self.log( + 'debug', + f'VSCode URL: {vscode_url}', + ) + return vscode_url def _wait_until_alive(self): retry_decorator = tenacity.retry( @@ -333,7 +291,7 @@ class RemoteRuntime(Runtime): def _wait_until_alive_impl(self): self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}') - with self._send_request( + with self._send_runtime_api_request( 'GET', f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}', timeout=60, @@ -350,12 +308,7 @@ class RemoteRuntime(Runtime): # Retry a period of time to give the cluster time to start the pod if pod_status == 'ready': try: - with self._send_request( - 'GET', - f'{self.runtime_url}/alive', - timeout=60, - ): # will raise exception if we don't get 200 back. - pass + self.check_if_alive() except requests.HTTPError as e: self.log( 'warning', f"Runtime /alive failed, but pod says it's ready: {e}" @@ -390,182 +343,39 @@ class RemoteRuntime(Runtime): def close(self, timeout: int = 10): if self.config.sandbox.keep_runtime_alive or self.attach_to_existing: - self.session.close() + super().close() return - if self.runtime_id and self.session: - try: - with self._send_request( - 'POST', - f'{self.config.sandbox.remote_runtime_api_url}/stop', - is_retry=False, - json={'runtime_id': self.runtime_id}, - timeout=timeout, - ): - self.log('debug', 'Runtime stopped.') - except Exception as e: - raise e - finally: - self.session.close() - - def run_action(self, action: Action, is_retry: bool = False) -> Observation: - if action.timeout is None: - action.timeout = self.config.sandbox.timeout - if isinstance(action, FileEditAction): - return self.edit(action) - with self.action_semaphore: - if not action.runnable: - return NullObservation('') - action_type = action.action # type: ignore[attr-defined] - if action_type not in ACTION_TYPE_TO_CLASS: - raise ValueError(f'Action {action_type} does not exist.') - if not hasattr(self, action_type): - return ErrorObservation( - f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.', - error_id='AGENT_ERROR$BAD_ACTION', - ) - - assert action.timeout is not None - - try: - request_body = {'action': event_to_dict(action)} - self.log('debug', f'Request body: {request_body}') - with self._send_request( - 'POST', - f'{self.runtime_url}/execute_action', - is_retry=False, - json=request_body, - # wait a few more seconds to get the timeout error from client side - timeout=action.timeout + 5, - ) as response: - output = response.json() - obs = observation_from_dict(output) - obs._cause = action.id # type: ignore[attr-defined] - except requests.Timeout: - raise AgentRuntimeTimeoutError( - f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s' - ) - return obs - - def _send_request(self, method, url, is_retry=False, **kwargs): - is_runtime_request = self.runtime_url and self.runtime_url in url try: - return send_request(self.session, method, url, **kwargs) + with self._send_runtime_api_request( + 'POST', + f'{self.config.sandbox.remote_runtime_api_url}/stop', + json={'runtime_id': self.runtime_id}, + timeout=timeout, + ): + self.log('debug', 'Runtime stopped.') + except Exception as e: + raise e + finally: + super().close() + + def _send_runtime_api_request(self, method, url, **kwargs): + return send_request(self.session, method, url, **kwargs) + + def _send_action_server_request(self, method, url, **kwargs): + try: + super()._send_action_server_request(method, url, **kwargs) except requests.Timeout: self.log('error', 'No response received within the timeout period.') raise except RequestHTTPError as e: - if is_runtime_request and e.response.status_code in (404, 502): + if e.response.status_code in (404, 502): raise AgentRuntimeDisconnectedError( f'{e.response.status_code} error while connecting to {self.runtime_url}' ) from e - elif is_runtime_request and e.response.status_code == 503: - if not is_retry: - self.log('warning', 'Runtime appears to be paused. Resuming...') - self._resume_runtime() - self._wait_until_alive() - return self._send_request(method, url, True, **kwargs) - else: - raise AgentRuntimeUnavailableError( - f'{e.response.status_code} error while connecting to {self.runtime_url}' - ) from e - + elif e.response.status_code == 503: + self.log('warning', 'Runtime appears to be paused. Resuming...') + self._resume_runtime() + self._wait_until_alive() + return super()._send_action_server_request(method, url, **kwargs) else: raise e - - def run(self, action: CmdRunAction) -> Observation: - return self.run_action(action) - - def run_ipython(self, action: IPythonRunCellAction) -> Observation: - return self.run_action(action) - - def read(self, action: FileReadAction) -> Observation: - return self.run_action(action) - - def write(self, action: FileWriteAction) -> Observation: - return self.run_action(action) - - def browse(self, action: BrowseURLAction) -> Observation: - return self.run_action(action) - - def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: - return self.run_action(action) - - def copy_to( - self, host_src: str, sandbox_dest: str, recursive: bool = False - ) -> None: - if not os.path.exists(host_src): - raise FileNotFoundError(f'Source file {host_src} does not exist') - - try: - if recursive: - with tempfile.NamedTemporaryFile( - suffix='.zip', delete=False - ) as temp_zip: - temp_zip_path = temp_zip.name - - with ZipFile(temp_zip_path, 'w') as zipf: - for root, _, files in os.walk(host_src): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath( - file_path, os.path.dirname(host_src) - ) - zipf.write(file_path, arcname) - - upload_data = {'file': open(temp_zip_path, 'rb')} - else: - upload_data = {'file': open(host_src, 'rb')} - - params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()} - - with self._send_request( - 'POST', - f'{self.runtime_url}/upload_file', - is_retry=False, - files=upload_data, - params=params, - timeout=300, - ) as response: - self.log( - 'debug', - f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}', - ) - finally: - if recursive: - os.unlink(temp_zip_path) - self.log( - 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}' - ) - - def list_files(self, path: str | None = None) -> list[str]: - data = {} - if path is not None: - data['path'] = path - - with self._send_request( - 'POST', - f'{self.runtime_url}/list_files', - is_retry=False, - json=data, - timeout=30, - ) as response: - response_json = response.json() - assert isinstance(response_json, list) - return response_json - - def copy_from(self, path: str) -> Path: - """Zip all files in the sandbox and return as a stream of bytes.""" - params = {'path': path} - with self._send_request( - 'GET', - f'{self.runtime_url}/download_files', - is_retry=False, - params=params, - stream=True, - timeout=30, - ) as response: - temp_file = tempfile.NamedTemporaryFile(delete=False) - for chunk in response.iter_content(chunk_size=8192): - if chunk: # filter out keep-alive new chunks - temp_file.write(chunk) - return Path(temp_file.name) diff --git a/openhands/runtime/impl/runloop/runloop_runtime.py b/openhands/runtime/impl/runloop/runloop_runtime.py index 368244a03c..2e51ea4093 100644 --- a/openhands/runtime/impl/runloop/runloop_runtime.py +++ b/openhands/runtime/impl/runloop/runloop_runtime.py @@ -1,82 +1,26 @@ import logging -import threading -import time from typing import Callable -import requests import tenacity from runloop_api_client import Runloop from runloop_api_client.types import DevboxView from runloop_api_client.types.shared_params import LaunchParameters from openhands.core.config import AppConfig -from openhands.core.exceptions import ( - AgentRuntimeNotReadyError, - AgentRuntimeUnavailableError, -) from openhands.core.logger import openhands_logger as logger from openhands.events import EventStream -from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime +from openhands.runtime.impl.action_execution.action_execution_client import ( + ActionExecutionClient, +) from openhands.runtime.plugins import PluginRequirement from openhands.runtime.utils.command import get_remote_startup_command -from openhands.runtime.utils.log_streamer import LogStreamer -from openhands.runtime.utils.request import send_request from openhands.utils.tenacity_stop import stop_if_should_exit CONTAINER_NAME_PREFIX = 'openhands-runtime-' -class RunloopLogStreamer(LogStreamer): - """Streams Runloop devbox logs to stdout. - - This class provides a way to stream logs from a Runloop devbox directly to stdout - through the provided logging function. - """ - - def __init__( - self, - runloop_api_client: Runloop, - devbox_id: str, - logFn: Callable, - ): - self.runloop_api_client = runloop_api_client - self.devbox_id = devbox_id - self.log = logFn - self.log_index = 0 - self._stop_event = threading.Event() - - # Start the stdout streaming thread - self.stdout_thread = threading.Thread(target=self._stream_logs) - self.stdout_thread.daemon = True - self.stdout_thread.start() - - def _stream_logs(self): - """Stream logs from the Runloop devbox.""" - try: - while True: - raw_logs = self.runloop_api_client.devboxes.logs.list( - self.devbox_id - ).logs[self.log_index :] - logs = [ - log.message - for log in raw_logs - if log.message and log.cmd_id is None - ] - - self.log_index += len(raw_logs) - if self._stop_event.is_set(): - break - if logs: - for log_line in logs: - self.log('debug', f'[inside devbox] {log_line}') - - time.sleep(1) - except Exception as e: - self.log('error', f'Error streaming runloop logs: {e}') - - -class RunloopRuntime(EventStreamRuntime): - """The RunloopRuntime class is an EventStreamRuntime that utilizes Runloop Devbox as a runtime environment.""" +class RunloopRuntime(ActionExecutionClient): + """The RunloopRuntime class is an DockerRuntime that utilizes Runloop Devbox as a runtime environment.""" _sandbox_port: int = 4444 _vscode_port: int = 4445 @@ -98,10 +42,8 @@ class RunloopRuntime(EventStreamRuntime): self.runloop_api_client = Runloop( bearer_token=config.runloop_api_key, ) - self.session = requests.Session() self.container_name = CONTAINER_NAME_PREFIX + sid - self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time - self.init_base_runtime( + super().__init__( config, event_stream, sid, @@ -112,9 +54,11 @@ class RunloopRuntime(EventStreamRuntime): headless_mode, ) # Buffer for container logs - self.log_streamer: LogStreamer | None = None self._vscode_url: str | None = None + def _get_action_execution_server_host(self): + return self.api_url + @tenacity.retry( stop=tenacity.stop_after_attempt(120), wait=tenacity.wait_fixed(1), @@ -203,15 +147,11 @@ class RunloopRuntime(EventStreamRuntime): port=self._sandbox_port, ) - # Hook up logs - self.log_streamer = RunloopLogStreamer( - self.runloop_api_client, self.devbox.id, logger.info - ) self.api_url = tunnel.url logger.info(f'Container started. Server url: {self.api_url}') # End Runloop connect - # NOTE: Copied from EventStreamRuntime + # NOTE: Copied from DockerRuntime logger.info('Waiting for client to become ready...') self.send_status_message('STATUS$WAITING_FOR_CLIENT') self._wait_until_alive() @@ -230,27 +170,10 @@ class RunloopRuntime(EventStreamRuntime): reraise=(ConnectionRefusedError,), ) def _wait_until_alive(self): - if not self.log_streamer: - raise AgentRuntimeNotReadyError('Runtime client is not ready.') - response = send_request( - self.session, - 'GET', - f'{self.api_url}/alive', - timeout=5, - ) - if response.status_code == 200: - return - else: - msg = f'Action execution API is not alive. Response: {response}' - logger.error(msg) - raise AgentRuntimeUnavailableError(msg) + super().check_if_alive() def close(self, rm_all_containers: bool | None = True): - if self.log_streamer: - self.log_streamer.close() - - if self.session: - self.session.close() + super().close() if self.attach_to_existing: return @@ -260,42 +183,24 @@ class RunloopRuntime(EventStreamRuntime): @property def vscode_url(self) -> str | None: - if self.vscode_enabled and self.devbox and self.devbox.status == 'running': - if self._vscode_url is not None: - return self._vscode_url - - try: - with send_request( - self.session, - 'GET', - f'{self.api_url}/vscode/connection_token', - timeout=10, - ) as response: - response_json = response.json() - assert isinstance(response_json, dict) - if response_json['token'] is None: - return None - token = response_json['token'] - - self._vscode_url = ( - self.runloop_api_client.devboxes.create_tunnel( - id=self.devbox.id, - port=self._vscode_port, - ).url - + f'/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}' - ) - - self.log( - 'debug', - f'VSCode URL: {self._vscode_url}', - ) - - return self._vscode_url - except Exception as e: - self.log( - 'error', - f'Failed to create vscode tunnel {e}', - ) - return None - else: + if self._vscode_url is not None: # cached value + return self._vscode_url + token = super().get_vscode_token() + if not token: return None + if not self.devbox: + return None + self._vscode_url = ( + self.runloop_api_client.devboxes.create_tunnel( + id=self.devbox.id, + port=self._vscode_port, + ).url + + f'/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}' + ) + + self.log( + 'debug', + f'VSCode URL: {self._vscode_url}', + ) + + return self._vscode_url diff --git a/openhands/runtime/plugins/agent_skills/utils/config.py b/openhands/runtime/plugins/agent_skills/utils/config.py index f0084c5403..2b0f0f4ead 100644 --- a/openhands/runtime/plugins/agent_skills/utils/config.py +++ b/openhands/runtime/plugins/agent_skills/utils/config.py @@ -5,9 +5,9 @@ from openai import OpenAI # ================================================================================================== # OPENAI -# TODO: Move this to EventStream Actions when EventStreamRuntime is fully implemented +# TODO: Move this to EventStream Actions when DockerRuntime is fully implemented # NOTE: we need to get env vars inside functions because they will be set in IPython -# AFTER the agentskills is imported (the case for EventStreamRuntime) +# AFTER the agentskills is imported (the case for DockerRuntime) # ================================================================================================== def _get_openai_api_key(): return os.getenv('OPENAI_API_KEY', os.getenv('SANDBOX_ENV_OPENAI_API_KEY', '')) diff --git a/tests/runtime/conftest.py b/tests/runtime/conftest.py index aa08a2cb73..062fc2ed9f 100644 --- a/tests/runtime/conftest.py +++ b/tests/runtime/conftest.py @@ -12,7 +12,7 @@ from openhands.core.config import load_app_config from openhands.core.logger import openhands_logger as logger from openhands.events import EventStream from openhands.runtime.base import Runtime -from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime +from openhands.runtime.impl.docker.docker_runtime import DockerRuntime from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime from openhands.runtime.plugins import AgentSkillsRequirement, JupyterRequirement @@ -20,7 +20,7 @@ from openhands.storage import get_file_store from openhands.utils.async_utils import call_async_from_sync TEST_IN_CI = os.getenv('TEST_IN_CI', 'False').lower() in ['true', '1', 'yes'] -TEST_RUNTIME = os.getenv('TEST_RUNTIME', 'eventstream').lower() +TEST_RUNTIME = os.getenv('TEST_RUNTIME', 'docker').lower() RUN_AS_OPENHANDS = os.getenv('RUN_AS_OPENHANDS', 'True').lower() in ['true', '1', 'yes'] test_mount_path = '' project_dir = os.path.dirname( @@ -62,7 +62,7 @@ def _remove_folder(folder: str) -> bool: def _close_test_runtime(runtime: Runtime) -> None: - if isinstance(runtime, EventStreamRuntime): + if isinstance(runtime, DockerRuntime): runtime.close(rm_all_containers=False) else: runtime.close() @@ -129,8 +129,8 @@ def temp_dir(tmp_path_factory: TempPathFactory, request) -> str: # Depending on TEST_RUNTIME, feed the appropriate box class(es) to the test. def get_runtime_classes() -> list[type[Runtime]]: runtime = TEST_RUNTIME - if runtime.lower() == 'eventstream': - return [EventStreamRuntime] + if runtime.lower() == 'docker' or runtime.lower() == 'eventstream': + return [DockerRuntime] elif runtime.lower() == 'remote': return [RemoteRuntime] elif runtime.lower() == 'runloop': @@ -173,7 +173,7 @@ def runtime_cls(request): # TODO: We will change this to `run_as_user` when `ServerRuntime` is deprecated. -# since `EventStreamRuntime` supports running as an arbitrary user. +# since `DockerRuntime` supports running as an arbitrary user. @pytest.fixture(scope='module', params=get_run_as_openhands()) def run_as_openhands(request): time.sleep(1)