Factor out ActionExecutionClient (#5796)

This commit is contained in:
Robert Brennan 2024-12-30 10:32:13 -05:00 committed by GitHub
parent 37363a0a8d
commit 0e4e1b3316
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 475 additions and 735 deletions

View File

@ -63,7 +63,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -43,7 +43,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-slim',

View File

@ -50,7 +50,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.11-bookworm',

View File

@ -61,7 +61,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image=BIOCODER_BENCH_CONTAINER_IMAGE,

View File

@ -74,7 +74,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -39,7 +39,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -124,7 +124,7 @@ def get_config(
default_agent=metadata.agent_class,
run_as_openhands=False,
max_iterations=metadata.max_iterations,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
sandbox=SandboxConfig(
base_container_image=base_container_image,
enable_auto_lint=True,

View File

@ -65,7 +65,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -50,7 +50,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -43,7 +43,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -64,7 +64,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -85,7 +85,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -48,7 +48,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='xingyaoww/od-eval-logic-reasoning:v1.0',

View File

@ -58,7 +58,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='xingyaoww/od-eval-miniwob:v1.0',

View File

@ -106,7 +106,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='xingyaoww/od-eval-mint:v1.0',

View File

@ -80,7 +80,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='public.ecr.aws/i5g0m1f6/ml-bench',

View File

@ -62,7 +62,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
max_budget_per_task=4,
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(

View File

@ -76,7 +76,7 @@ def get_config(instance: pd.Series) -> AppConfig:
)
config = AppConfig(
run_as_openhands=False,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
sandbox=SandboxConfig(
base_container_image=base_container_image,
use_host_network=False,

View File

@ -121,7 +121,7 @@ def get_config(
default_agent=metadata.agent_class,
run_as_openhands=False,
max_iterations=metadata.max_iterations,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
sandbox=SandboxConfig(
base_container_image=base_container_image,
enable_auto_lint=True,

View File

@ -44,7 +44,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -53,7 +53,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime='eventstream',
runtime='docker',
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
base_container_image='python:3.12-bookworm',

View File

@ -42,7 +42,7 @@ def get_config(
config = AppConfig(
default_agent=metadata.agent_class,
run_as_openhands=False,
runtime=os.environ.get('RUNTIME', 'eventstream'),
runtime=os.environ.get('RUNTIME', 'docker'),
max_iterations=metadata.max_iterations,
sandbox=SandboxConfig(
# use default base_container_image

View File

@ -49,7 +49,7 @@ class AppConfig:
default_agent: str = OH_DEFAULT_AGENT
sandbox: SandboxConfig = field(default_factory=SandboxConfig)
security: SecurityConfig = field(default_factory=SecurityConfig)
runtime: str = 'eventstream'
runtime: str = 'docker'
file_store: str = 'memory'
file_store_path: str = '/tmp/file_store'
trajectories_path: str | None = None

View File

@ -182,7 +182,7 @@ async def process_issue(
config = AppConfig(
default_agent='CodeActAgent',
runtime='eventstream',
runtime='docker',
max_budget_per_task=4,
max_iterations=max_iterations,
sandbox=SandboxConfig(

View File

@ -3,13 +3,13 @@
## Introduction
The OpenHands Runtime folder contains the core components responsible for executing actions and managing the runtime environment for the OpenHands project. This README provides an overview of the main components and their interactions.
You can learn more about how the runtime works in the [EventStream Runtime](https://docs.all-hands.dev/modules/usage/architecture/runtime) documentation.
You can learn more about how the runtime works in the [Docker Runtime](https://docs.all-hands.dev/modules/usage/architecture/runtime) documentation.
## Main Components
### 1. impl/*runtime.py
### 1. base.py
The `impl/*runtime.py` file defines the `Runtime` class, which serves as the primary [interface](./base.py) for agent interactions with the external environment. It handles various operations including:
The `base.py` file defines the `Runtime` class, which serves as the primary [interface](./base.py) for agent interactions with the external environment. It handles various operations including:
- Bash sandbox execution
- Browser interactions
@ -23,9 +23,16 @@ Key features of the `Runtime` class:
- Action execution methods for different types of actions (run, read, write, browse, etc.)
- Abstract methods for file operations (to be implemented by subclasses)
### 2. action_execution_server.py
### 2. impl/action_execution/action_execution_client.py
The `action_execution_client.py` file contains the `ActionExecutionClient` class, which implements the Runtime interface. It is an abstract implementation, meaning
it still needs to be extended by a concrete implementation to be used.
The `action_executor_server.py` file contains the `ActionExecutor` class, which is responsible for executing actions received from the OpenHands backend and producing observations. This client runs inside a Docker sandbox.
This client interacts with an action_execution_server (defined below) via HTTP
calls to actually perform runtime actions.
### 3. action_execution_server.py
The `action_executor_server.py` file contains the `ActionExecutor` class, which is responsible for executing actions received via the `/execute_action` HTTP endpoint. It returns observations in the HTTP response.
Key features of the `ActionExecutor` class:
- Initialization of user environment and bash shell
@ -33,6 +40,19 @@ Key features of the `ActionExecutor` class:
- Execution of various action types (bash commands, IPython cells, file operations, browsing)
- Integration with BrowserEnv for web interactions
### 4. Other Implementations
The `./impl/` directory contains a few different Runtime implementations, all of
which extend the `ActionExecutionClient` class. These implementations
handle the lifecycle of a Docker container or other environment running the
ActionExecutor server.
There are currently four implementations:
* Docker (runs locally in a Docker container)
* Remote (runs via a custom HTTP API for creating, pausing, resuming, and stopping runtimes in a remote environment)
* Modal (uses the Modal API)
* Runloop (uses the Runloop API)
## Workflow Description
1. **Initialization**:
@ -76,9 +96,9 @@ Key features of the `ActionExecutor` class:
## Runtime Types
### EventStream Runtime
### Docker Runtime
The EventStream Runtime is designed for local execution using Docker containers:
The Docker Runtime is designed for local execution using Docker containers:
- Creates and manages a Docker container for each session
- Executes actions within the container

View File

@ -1,8 +1,8 @@
from openhands.core.logger import openhands_logger as logger
from openhands.runtime.impl.e2b.sandbox import E2BBox
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
from openhands.runtime.impl.docker.docker_runtime import (
DockerRuntime,
)
from openhands.runtime.impl.e2b.sandbox import E2BBox
from openhands.runtime.impl.modal.modal_runtime import ModalRuntime
from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime
from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime
@ -10,8 +10,8 @@ from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime
def get_runtime_cls(name: str):
# Local imports to avoid circular imports
if name == 'eventstream':
return EventStreamRuntime
if name == 'eventstream' or name == 'docker':
return DockerRuntime
elif name == 'e2b':
return E2BBox
elif name == 'remote':
@ -30,6 +30,6 @@ __all__ = [
'RemoteRuntime',
'ModalRuntime',
'RunloopRuntime',
'EventStreamRuntime',
'DockerRuntime',
'get_runtime_cls',
]

View File

@ -0,0 +1,289 @@
import os
import tempfile
import threading
from abc import abstractmethod
from pathlib import Path
from typing import Any
from zipfile import ZipFile
import requests
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeTimeoutError,
)
from openhands.events import EventStream
from openhands.events.action import (
ActionConfirmationStatus,
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.action.action import Action
from openhands.events.observation import (
ErrorObservation,
NullObservation,
Observation,
UserRejectObservation,
)
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.request import send_request
class ActionExecutionClient(Runtime):
"""Base class for runtimes that interact with the action execution server.
This class contains shared logic between DockerRuntime and RemoteRuntime
for interacting with the HTTP server defined in action_execution_server.py.
"""
def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_callback: Any | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
):
self.session = requests.Session()
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self._runtime_initialized: bool = False
self._vscode_token: str | None = None # initial dummy value
super().__init__(
config,
event_stream,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
headless_mode,
)
@abstractmethod
def _get_action_execution_server_host(self) -> str:
pass
def _send_action_server_request(
self,
method: str,
url: str,
**kwargs,
) -> requests.Response:
"""Send a request to the action execution server.
Args:
method: HTTP method (GET, POST, etc.)
url: URL to send the request to
**kwargs: Additional arguments to pass to requests.request()
Returns:
Response from the server
Raises:
AgentRuntimeError: If the request fails
"""
return send_request(self.session, method, url, **kwargs)
def check_if_alive(self) -> None:
with self._send_action_server_request(
'GET',
f'{self._get_action_execution_server_host()}/alive',
timeout=5,
):
pass
def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox.
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
try:
data = {}
if path is not None:
data['path'] = path
with send_request(
self.session,
'POST',
f'{self._get_action_execution_server_host()}/list_files',
json=data,
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, list)
return response_json
except requests.Timeout:
raise TimeoutError('List files operation timed out')
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
try:
params = {'path': path}
with send_request(
self.session,
'GET',
f'{self._get_action_execution_server_host()}/download_files',
params=params,
stream=True,
timeout=30,
) as response:
temp_file = tempfile.NamedTemporaryFile(delete=False)
for chunk in response.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
temp_file.write(chunk)
return Path(temp_file.name)
except requests.Timeout:
raise TimeoutError('Copy operation timed out')
def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
try:
if recursive:
with tempfile.NamedTemporaryFile(
suffix='.zip', delete=False
) as temp_zip:
temp_zip_path = temp_zip.name
with ZipFile(temp_zip_path, 'w') as zipf:
for root, _, files in os.walk(host_src):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(
file_path, os.path.dirname(host_src)
)
zipf.write(file_path, arcname)
upload_data = {'file': open(temp_zip_path, 'rb')}
else:
upload_data = {'file': open(host_src, 'rb')}
params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
with self._send_action_server_request(
'POST',
f'{self._get_action_execution_server_host()}/upload_file',
files=upload_data,
params=params,
timeout=300,
) as response:
self.log(
'debug',
f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
)
finally:
if recursive:
os.unlink(temp_zip_path)
self.log(
'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
)
def get_vscode_token(self) -> str:
if self.vscode_enabled and self._runtime_initialized:
if self._vscode_token is not None: # cached value
return self._vscode_token
with send_request(
self.session,
'GET',
f'{self._get_action_execution_server_host()}/vscode/connection_token',
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return ''
self._vscode_token = response_json['token']
return response_json['token']
else:
return ''
def send_action_for_execution(self, action: Action) -> Observation:
if isinstance(action, FileEditAction):
return self.edit(action)
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
with self.action_semaphore:
if not action.runnable:
return NullObservation('')
if (
hasattr(action, 'confirmation_state')
and action.confirmation_state
== ActionConfirmationStatus.AWAITING_CONFIRMATION
):
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
raise ValueError(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.',
error_id='AGENT_ERROR$BAD_ACTION',
)
if (
getattr(action, 'confirmation_state', None)
== ActionConfirmationStatus.REJECTED
):
return UserRejectObservation(
'Action has been rejected by the user! Waiting for further user input.'
)
assert action.timeout is not None
try:
with send_request(
self.session,
'POST',
f'{self._get_action_execution_server_host()}/execute_action',
json={'action': event_to_dict(action)},
# wait a few more seconds to get the timeout error from client side
timeout=action.timeout + 5,
) as response:
output = response.json()
obs = observation_from_dict(output)
obs._cause = action.id # type: ignore[attr-defined]
except requests.Timeout:
raise AgentRuntimeTimeoutError(
f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
)
return obs
def run(self, action: CmdRunAction) -> Observation:
return self.send_action_for_execution(action)
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
return self.send_action_for_execution(action)
def read(self, action: FileReadAction) -> Observation:
return self.send_action_for_execution(action)
def write(self, action: FileWriteAction) -> Observation:
return self.send_action_for_execution(action)
def browse(self, action: BrowseURLAction) -> Observation:
return self.send_action_for_execution(action)
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.send_action_for_execution(action)
def close(self) -> None:
self.session.close()

View File

@ -1,11 +1,6 @@
import atexit
import os
import tempfile
import threading
from functools import lru_cache
from pathlib import Path
from typing import Callable
from zipfile import ZipFile
import docker
import requests
@ -14,40 +9,20 @@ import tenacity
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeDisconnectedError,
AgentRuntimeError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeTimeoutError,
)
from openhands.core.logger import DEBUG
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import (
ActionConfirmationStatus,
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.action.action import Action
from openhands.events.observation import (
ErrorObservation,
NullObservation,
Observation,
UserRejectObservation,
)
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient,
)
from openhands.runtime.impl.docker.containers import remove_all_containers
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit
@ -62,7 +37,7 @@ def remove_all_runtime_containers():
_atexit_registered = False
class EventStreamRuntime(Runtime):
class DockerRuntime(ActionExecutionClient):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the docker environment.
@ -74,30 +49,6 @@ class EventStreamRuntime(Runtime):
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
"""
# Need to provide this method to allow inheritors to init the Runtime
# without initting the EventStreamRuntime.
def init_base_runtime(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_callback: Callable | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
):
super().__init__(
config,
event_stream,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
headless_mode,
)
def __init__(
self,
config: AppConfig,
@ -117,10 +68,8 @@ class EventStreamRuntime(Runtime):
self.config = config
self._host_port = 30000 # initial dummy value
self._container_port = 30001 # initial dummy value
self._vscode_url: str | None = None # initial dummy value
self._runtime_initialized: bool = False
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.session = requests.Session()
self.status_callback = status_callback
self.docker_client: docker.DockerClient = self._init_docker_client()
@ -128,14 +77,13 @@ class EventStreamRuntime(Runtime):
self.runtime_container_image = self.config.sandbox.runtime_container_image
self.container_name = CONTAINER_NAME_PREFIX + sid
self.container = None
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
self.init_base_runtime(
super().__init__(
config,
event_stream,
sid,
@ -153,6 +101,9 @@ class EventStreamRuntime(Runtime):
f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
)
def _get_action_execution_server_host(self):
return self.api_url
async def connect(self):
self.send_status_message('STATUS$STARTING_RUNTIME')
try:
@ -377,26 +328,18 @@ class EventStreamRuntime(Runtime):
if not self.log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
with send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
):
pass
self.check_if_alive()
def close(self, rm_all_containers: bool | None = None):
"""Closes the EventStreamRuntime and associated objects
"""Closes the DockerRuntime and associated objects
Parameters:
- rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
"""
super().close()
if self.log_streamer:
self.log_streamer.close()
if self.session:
self.session.close()
if rm_all_containers is None:
rm_all_containers = self.config.sandbox.rm_all_containers
@ -407,178 +350,6 @@ class EventStreamRuntime(Runtime):
)
remove_all_containers(close_prefix)
def run_action(self, action: Action) -> Observation:
if isinstance(action, FileEditAction):
return self.edit(action)
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
with self.action_semaphore:
if not action.runnable:
return NullObservation('')
if (
hasattr(action, 'confirmation_state')
and action.confirmation_state
== ActionConfirmationStatus.AWAITING_CONFIRMATION
):
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
raise ValueError(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.',
error_id='AGENT_ERROR$BAD_ACTION',
)
if (
getattr(action, 'confirmation_state', None)
== ActionConfirmationStatus.REJECTED
):
return UserRejectObservation(
'Action has been rejected by the user! Waiting for further user input.'
)
assert action.timeout is not None
try:
with send_request(
self.session,
'POST',
f'{self.api_url}/execute_action',
json={'action': event_to_dict(action)},
# wait a few more seconds to get the timeout error from client side
timeout=action.timeout + 5,
) as response:
output = response.json()
obs = observation_from_dict(output)
obs._cause = action.id # type: ignore[attr-defined]
except requests.Timeout:
raise AgentRuntimeTimeoutError(
f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
)
return obs
def run(self, action: CmdRunAction) -> Observation:
return self.run_action(action)
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
return self.run_action(action)
def read(self, action: FileReadAction) -> Observation:
return self.run_action(action)
def write(self, action: FileWriteAction) -> Observation:
return self.run_action(action)
def browse(self, action: BrowseURLAction) -> Observation:
return self.run_action(action)
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.run_action(action)
# ====================================================================
# Implement these methods (for file operations) in the subclass
# ====================================================================
def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
try:
if recursive:
# For recursive copy, create a zip file
with tempfile.NamedTemporaryFile(
suffix='.zip', delete=False
) as temp_zip:
temp_zip_path = temp_zip.name
with ZipFile(temp_zip_path, 'w') as zipf:
for root, _, files in os.walk(host_src):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(
file_path, os.path.dirname(host_src)
)
zipf.write(file_path, arcname)
upload_data = {'file': open(temp_zip_path, 'rb')}
else:
# For single file copy
upload_data = {'file': open(host_src, 'rb')}
params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
with send_request(
self.session,
'POST',
f'{self.api_url}/upload_file',
files=upload_data,
params=params,
timeout=300,
):
pass
except requests.Timeout:
raise AgentRuntimeTimeoutError('Copy operation timed out')
except Exception as e:
raise AgentRuntimeError(f'Copy operation failed: {str(e)}')
finally:
if recursive:
os.unlink(temp_zip_path)
self.log(
'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
)
def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox.
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
try:
data = {}
if path is not None:
data['path'] = path
with send_request(
self.session,
'POST',
f'{self.api_url}/list_files',
json=data,
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, list)
return response_json
except requests.Timeout:
raise TimeoutError('List files operation timed out')
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
try:
params = {'path': path}
with send_request(
self.session,
'GET',
f'{self.api_url}/download_files',
params=params,
stream=True,
timeout=30,
) as response:
temp_file = tempfile.NamedTemporaryFile(delete=False)
for chunk in response.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
temp_file.write(chunk)
return Path(temp_file.name)
except requests.Timeout:
raise TimeoutError('Copy operation timed out')
def _is_port_in_use_docker(self, port):
containers = self.docker_client.containers.list()
for container in containers:
@ -598,27 +369,9 @@ class EventStreamRuntime(Runtime):
@property
def vscode_url(self) -> str | None:
if self.vscode_enabled and self._runtime_initialized:
if (
hasattr(self, '_vscode_url') and self._vscode_url is not None
): # cached value
return self._vscode_url
with send_request(
self.session,
'GET',
f'{self.api_url}/vscode/connection_token',
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return None
self._vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',
)
return self._vscode_url
else:
token = super().get_vscode_token()
print('got token', token)
if not token:
return None
vscode_url = f'http://localhost:{self._host_port + 1}/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}'
return vscode_url

View File

@ -1,8 +1,7 @@
import os
import tempfile
import threading
from pathlib import Path
from typing import Callable, Generator
from typing import Callable
import modal
import requests
@ -10,9 +9,8 @@ import tenacity
from openhands.core.config import AppConfig
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
LogStreamer,
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient,
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
@ -21,52 +19,13 @@ from openhands.runtime.utils.runtime_build import (
prep_build_folder,
)
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit
# FIXME: this will not work in HA mode. We need a better way to track IDs
MODAL_RUNTIME_IDS: dict[str, str] = {}
# Modal's log generator returns strings, but the upstream LogBuffer expects bytes.
def bytes_shim(string_generator) -> Generator[bytes, None, None]:
for line in string_generator:
yield line.encode('utf-8')
class ModalLogStreamer(LogStreamer):
"""Streams Modal sandbox logs to stdout.
This class provides a way to stream logs from a Modal sandbox directly to stdout
through the provided logging function.
"""
def __init__(
self,
sandbox: modal.Sandbox,
logFn: Callable,
):
self.log = logFn
self._stop_event = threading.Event()
self.log_generator = bytes_shim(sandbox.stderr)
# Start the stdout streaming thread
self.stdout_thread = threading.Thread(target=self._stream_logs)
self.stdout_thread.daemon = True
self.stdout_thread.start()
def _stream_logs(self):
"""Stream logs from the Modal sandbox."""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
decoded_line = log_line.decode('utf-8').rstrip()
self.log('debug', f'[inside sandbox] {decoded_line}')
except Exception as e:
self.log('error', f'Error streaming modal logs: {e}')
class ModalRuntime(EventStreamRuntime):
class ModalRuntime(ActionExecutionClient):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the Modal sandbox environment.
@ -116,14 +75,9 @@ class ModalRuntime(EventStreamRuntime):
# This value is arbitrary as it's private to the container
self.container_port = 3000
self.session = requests.Session()
self.status_callback = status_callback
self.base_container_image_id = self.config.sandbox.base_container_image
self.runtime_container_image_id = self.config.sandbox.runtime_container_image
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
if self.config.sandbox.runtime_extra_deps:
self.log(
@ -131,7 +85,7 @@ class ModalRuntime(EventStreamRuntime):
f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
)
self.init_base_runtime(
super().__init__(
config,
event_stream,
sid,
@ -170,7 +124,6 @@ class ModalRuntime(EventStreamRuntime):
self.send_status_message('STATUS$CONTAINER_STARTED')
self.log_streamer = ModalLogStreamer(self.sandbox, self.log)
if self.sandbox is None:
raise Exception('Sandbox not initialized')
tunnel = self.sandbox.tunnels()[self.container_port]
@ -187,6 +140,20 @@ class ModalRuntime(EventStreamRuntime):
if not self.attach_to_existing:
self.send_status_message(' ')
def _get_action_execution_server_host(self):
return self.api_url
@tenacity.retry(
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
retry=tenacity.retry_if_exception_type(
(ConnectionError, requests.exceptions.ConnectionError)
),
reraise=True,
wait=tenacity.wait_fixed(2),
)
def _wait_until_alive(self):
self.check_if_alive()
def _get_image_definition(
self,
base_container_image_id: str | None,
@ -292,11 +259,7 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc
def close(self):
"""Closes the ModalRuntime and associated objects."""
if self.log_streamer:
self.log_streamer.close()
if self.session:
self.session.close()
super().close()
if not self.attach_to_existing and self.sandbox:
self.sandbox.terminate()

View File

@ -1,10 +1,6 @@
import os
import tempfile
import threading
from pathlib import Path
from typing import Callable, Optional
from urllib.parse import urlparse
from zipfile import ZipFile
import requests
import tenacity
@ -15,29 +11,13 @@ from openhands.core.exceptions import (
AgentRuntimeError,
AgentRuntimeNotFoundError,
AgentRuntimeNotReadyError,
AgentRuntimeTimeoutError,
AgentRuntimeUnavailableError,
)
from openhands.events import EventStream
from openhands.events.action import (
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.action.action import Action
from openhands.events.observation import (
ErrorObservation,
NullObservation,
Observation,
)
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.base import Runtime
from openhands.runtime.builder.remote import RemoteRuntimeBuilder
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient,
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.request import (
@ -49,7 +29,7 @@ from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit
class RemoteRuntime(Runtime):
class RemoteRuntime(ActionExecutionClient):
"""This runtime will connect to a remote oh-runtime-client."""
port: int = 60000 # default port for the remote runtime client
@ -65,10 +45,6 @@ class RemoteRuntime(Runtime):
attach_to_existing: bool = False,
headless_mode: bool = True,
):
# We need to set session and action_semaphore before the __init__ below, or we get odd errors
self.session = requests.Session()
self.action_semaphore = threading.Semaphore(1)
super().__init__(
config,
event_stream,
@ -98,7 +74,9 @@ class RemoteRuntime(Runtime):
self.runtime_id: str | None = None
self.runtime_url: str | None = None
self._runtime_initialized: bool = False
self._vscode_url: str | None = None # initial dummy value
def _get_action_execution_server_host(self):
return self.runtime_url
async def connect(self):
try:
@ -148,10 +126,9 @@ class RemoteRuntime(Runtime):
def _check_existing_runtime(self) -> bool:
try:
with self._send_request(
with self._send_runtime_api_request(
'GET',
f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
is_retry=False,
timeout=60,
) as response:
data = response.json()
@ -179,10 +156,9 @@ class RemoteRuntime(Runtime):
def _build_runtime(self):
self.log('debug', f'Building RemoteRuntime config:\n{self.config}')
with self._send_request(
with self._send_runtime_api_request(
'GET',
f'{self.config.sandbox.remote_runtime_api_url}/registry_prefix',
is_retry=False,
timeout=60,
) as response:
response_json = response.json()
@ -210,10 +186,9 @@ class RemoteRuntime(Runtime):
force_rebuild=self.config.sandbox.force_rebuild_runtime,
)
with self._send_request(
with self._send_runtime_api_request(
'GET',
f'{self.config.sandbox.remote_runtime_api_url}/image_exists',
is_retry=False,
params={'image': self.container_image},
timeout=60,
) as response:
@ -252,10 +227,9 @@ class RemoteRuntime(Runtime):
# Start the sandbox using the /start endpoint
try:
with self._send_request(
with self._send_runtime_api_request(
'POST',
f'{self.config.sandbox.remote_runtime_api_url}/start',
is_retry=False,
json=start_request,
timeout=60,
) as response:
@ -269,10 +243,9 @@ class RemoteRuntime(Runtime):
raise AgentRuntimeUnavailableError() from e
def _resume_runtime(self):
with self._send_request(
with self._send_runtime_api_request(
'POST',
f'{self.config.sandbox.remote_runtime_api_url}/resume',
is_retry=False,
json={'runtime_id': self.runtime_id},
timeout=60,
):
@ -290,34 +263,19 @@ class RemoteRuntime(Runtime):
@property
def vscode_url(self) -> str | None:
if self.vscode_enabled and self._runtime_initialized:
if (
hasattr(self, '_vscode_url') and self._vscode_url is not None
): # cached value
return self._vscode_url
with self._send_request(
'GET',
f'{self.runtime_url}/vscode/connection_token',
timeout=60,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return None
# parse runtime_url to get vscode_url
_parsed_url = urlparse(self.runtime_url)
assert isinstance(_parsed_url.scheme, str) and isinstance(
_parsed_url.netloc, str
)
self._vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={response_json["token"]}&folder={self.config.workspace_mount_path_in_sandbox}'
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',
)
return self._vscode_url
else:
token = super().get_vscode_token()
if not token:
return None
_parsed_url = urlparse(self.runtime_url)
assert isinstance(_parsed_url.scheme, str) and isinstance(
_parsed_url.netloc, str
)
vscode_url = f'{_parsed_url.scheme}://vscode-{_parsed_url.netloc}/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}'
self.log(
'debug',
f'VSCode URL: {vscode_url}',
)
return vscode_url
def _wait_until_alive(self):
retry_decorator = tenacity.retry(
@ -333,7 +291,7 @@ class RemoteRuntime(Runtime):
def _wait_until_alive_impl(self):
self.log('debug', f'Waiting for runtime to be alive at url: {self.runtime_url}')
with self._send_request(
with self._send_runtime_api_request(
'GET',
f'{self.config.sandbox.remote_runtime_api_url}/sessions/{self.sid}',
timeout=60,
@ -350,12 +308,7 @@ class RemoteRuntime(Runtime):
# Retry a period of time to give the cluster time to start the pod
if pod_status == 'ready':
try:
with self._send_request(
'GET',
f'{self.runtime_url}/alive',
timeout=60,
): # will raise exception if we don't get 200 back.
pass
self.check_if_alive()
except requests.HTTPError as e:
self.log(
'warning', f"Runtime /alive failed, but pod says it's ready: {e}"
@ -390,182 +343,39 @@ class RemoteRuntime(Runtime):
def close(self, timeout: int = 10):
if self.config.sandbox.keep_runtime_alive or self.attach_to_existing:
self.session.close()
super().close()
return
if self.runtime_id and self.session:
try:
with self._send_request(
'POST',
f'{self.config.sandbox.remote_runtime_api_url}/stop',
is_retry=False,
json={'runtime_id': self.runtime_id},
timeout=timeout,
):
self.log('debug', 'Runtime stopped.')
except Exception as e:
raise e
finally:
self.session.close()
def run_action(self, action: Action, is_retry: bool = False) -> Observation:
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
if isinstance(action, FileEditAction):
return self.edit(action)
with self.action_semaphore:
if not action.runnable:
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
raise ValueError(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'[Runtime (ID={self.runtime_id})] Action {action_type} is not supported in the current runtime.',
error_id='AGENT_ERROR$BAD_ACTION',
)
assert action.timeout is not None
try:
request_body = {'action': event_to_dict(action)}
self.log('debug', f'Request body: {request_body}')
with self._send_request(
'POST',
f'{self.runtime_url}/execute_action',
is_retry=False,
json=request_body,
# wait a few more seconds to get the timeout error from client side
timeout=action.timeout + 5,
) as response:
output = response.json()
obs = observation_from_dict(output)
obs._cause = action.id # type: ignore[attr-defined]
except requests.Timeout:
raise AgentRuntimeTimeoutError(
f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
)
return obs
def _send_request(self, method, url, is_retry=False, **kwargs):
is_runtime_request = self.runtime_url and self.runtime_url in url
try:
return send_request(self.session, method, url, **kwargs)
with self._send_runtime_api_request(
'POST',
f'{self.config.sandbox.remote_runtime_api_url}/stop',
json={'runtime_id': self.runtime_id},
timeout=timeout,
):
self.log('debug', 'Runtime stopped.')
except Exception as e:
raise e
finally:
super().close()
def _send_runtime_api_request(self, method, url, **kwargs):
return send_request(self.session, method, url, **kwargs)
def _send_action_server_request(self, method, url, **kwargs):
try:
super()._send_action_server_request(method, url, **kwargs)
except requests.Timeout:
self.log('error', 'No response received within the timeout period.')
raise
except RequestHTTPError as e:
if is_runtime_request and e.response.status_code in (404, 502):
if e.response.status_code in (404, 502):
raise AgentRuntimeDisconnectedError(
f'{e.response.status_code} error while connecting to {self.runtime_url}'
) from e
elif is_runtime_request and e.response.status_code == 503:
if not is_retry:
self.log('warning', 'Runtime appears to be paused. Resuming...')
self._resume_runtime()
self._wait_until_alive()
return self._send_request(method, url, True, **kwargs)
else:
raise AgentRuntimeUnavailableError(
f'{e.response.status_code} error while connecting to {self.runtime_url}'
) from e
elif e.response.status_code == 503:
self.log('warning', 'Runtime appears to be paused. Resuming...')
self._resume_runtime()
self._wait_until_alive()
return super()._send_action_server_request(method, url, **kwargs)
else:
raise e
def run(self, action: CmdRunAction) -> Observation:
return self.run_action(action)
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
return self.run_action(action)
def read(self, action: FileReadAction) -> Observation:
return self.run_action(action)
def write(self, action: FileWriteAction) -> Observation:
return self.run_action(action)
def browse(self, action: BrowseURLAction) -> Observation:
return self.run_action(action)
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.run_action(action)
def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
try:
if recursive:
with tempfile.NamedTemporaryFile(
suffix='.zip', delete=False
) as temp_zip:
temp_zip_path = temp_zip.name
with ZipFile(temp_zip_path, 'w') as zipf:
for root, _, files in os.walk(host_src):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(
file_path, os.path.dirname(host_src)
)
zipf.write(file_path, arcname)
upload_data = {'file': open(temp_zip_path, 'rb')}
else:
upload_data = {'file': open(host_src, 'rb')}
params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
with self._send_request(
'POST',
f'{self.runtime_url}/upload_file',
is_retry=False,
files=upload_data,
params=params,
timeout=300,
) as response:
self.log(
'debug',
f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {response.text}',
)
finally:
if recursive:
os.unlink(temp_zip_path)
self.log(
'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
)
def list_files(self, path: str | None = None) -> list[str]:
data = {}
if path is not None:
data['path'] = path
with self._send_request(
'POST',
f'{self.runtime_url}/list_files',
is_retry=False,
json=data,
timeout=30,
) as response:
response_json = response.json()
assert isinstance(response_json, list)
return response_json
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return as a stream of bytes."""
params = {'path': path}
with self._send_request(
'GET',
f'{self.runtime_url}/download_files',
is_retry=False,
params=params,
stream=True,
timeout=30,
) as response:
temp_file = tempfile.NamedTemporaryFile(delete=False)
for chunk in response.iter_content(chunk_size=8192):
if chunk: # filter out keep-alive new chunks
temp_file.write(chunk)
return Path(temp_file.name)

View File

@ -1,82 +1,26 @@
import logging
import threading
import time
from typing import Callable
import requests
import tenacity
from runloop_api_client import Runloop
from runloop_api_client.types import DevboxView
from runloop_api_client.types.shared_params import LaunchParameters
from openhands.core.config import AppConfig
from openhands.core.exceptions import (
AgentRuntimeNotReadyError,
AgentRuntimeUnavailableError,
)
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient,
)
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.request import send_request
from openhands.utils.tenacity_stop import stop_if_should_exit
CONTAINER_NAME_PREFIX = 'openhands-runtime-'
class RunloopLogStreamer(LogStreamer):
"""Streams Runloop devbox logs to stdout.
This class provides a way to stream logs from a Runloop devbox directly to stdout
through the provided logging function.
"""
def __init__(
self,
runloop_api_client: Runloop,
devbox_id: str,
logFn: Callable,
):
self.runloop_api_client = runloop_api_client
self.devbox_id = devbox_id
self.log = logFn
self.log_index = 0
self._stop_event = threading.Event()
# Start the stdout streaming thread
self.stdout_thread = threading.Thread(target=self._stream_logs)
self.stdout_thread.daemon = True
self.stdout_thread.start()
def _stream_logs(self):
"""Stream logs from the Runloop devbox."""
try:
while True:
raw_logs = self.runloop_api_client.devboxes.logs.list(
self.devbox_id
).logs[self.log_index :]
logs = [
log.message
for log in raw_logs
if log.message and log.cmd_id is None
]
self.log_index += len(raw_logs)
if self._stop_event.is_set():
break
if logs:
for log_line in logs:
self.log('debug', f'[inside devbox] {log_line}')
time.sleep(1)
except Exception as e:
self.log('error', f'Error streaming runloop logs: {e}')
class RunloopRuntime(EventStreamRuntime):
"""The RunloopRuntime class is an EventStreamRuntime that utilizes Runloop Devbox as a runtime environment."""
class RunloopRuntime(ActionExecutionClient):
"""The RunloopRuntime class is an DockerRuntime that utilizes Runloop Devbox as a runtime environment."""
_sandbox_port: int = 4444
_vscode_port: int = 4445
@ -98,10 +42,8 @@ class RunloopRuntime(EventStreamRuntime):
self.runloop_api_client = Runloop(
bearer_token=config.runloop_api_key,
)
self.session = requests.Session()
self.container_name = CONTAINER_NAME_PREFIX + sid
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self.init_base_runtime(
super().__init__(
config,
event_stream,
sid,
@ -112,9 +54,11 @@ class RunloopRuntime(EventStreamRuntime):
headless_mode,
)
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
self._vscode_url: str | None = None
def _get_action_execution_server_host(self):
return self.api_url
@tenacity.retry(
stop=tenacity.stop_after_attempt(120),
wait=tenacity.wait_fixed(1),
@ -203,15 +147,11 @@ class RunloopRuntime(EventStreamRuntime):
port=self._sandbox_port,
)
# Hook up logs
self.log_streamer = RunloopLogStreamer(
self.runloop_api_client, self.devbox.id, logger.info
)
self.api_url = tunnel.url
logger.info(f'Container started. Server url: {self.api_url}')
# End Runloop connect
# NOTE: Copied from EventStreamRuntime
# NOTE: Copied from DockerRuntime
logger.info('Waiting for client to become ready...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
self._wait_until_alive()
@ -230,27 +170,10 @@ class RunloopRuntime(EventStreamRuntime):
reraise=(ConnectionRefusedError,),
)
def _wait_until_alive(self):
if not self.log_streamer:
raise AgentRuntimeNotReadyError('Runtime client is not ready.')
response = send_request(
self.session,
'GET',
f'{self.api_url}/alive',
timeout=5,
)
if response.status_code == 200:
return
else:
msg = f'Action execution API is not alive. Response: {response}'
logger.error(msg)
raise AgentRuntimeUnavailableError(msg)
super().check_if_alive()
def close(self, rm_all_containers: bool | None = True):
if self.log_streamer:
self.log_streamer.close()
if self.session:
self.session.close()
super().close()
if self.attach_to_existing:
return
@ -260,42 +183,24 @@ class RunloopRuntime(EventStreamRuntime):
@property
def vscode_url(self) -> str | None:
if self.vscode_enabled and self.devbox and self.devbox.status == 'running':
if self._vscode_url is not None:
return self._vscode_url
try:
with send_request(
self.session,
'GET',
f'{self.api_url}/vscode/connection_token',
timeout=10,
) as response:
response_json = response.json()
assert isinstance(response_json, dict)
if response_json['token'] is None:
return None
token = response_json['token']
self._vscode_url = (
self.runloop_api_client.devboxes.create_tunnel(
id=self.devbox.id,
port=self._vscode_port,
).url
+ f'/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}'
)
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',
)
return self._vscode_url
except Exception as e:
self.log(
'error',
f'Failed to create vscode tunnel {e}',
)
return None
else:
if self._vscode_url is not None: # cached value
return self._vscode_url
token = super().get_vscode_token()
if not token:
return None
if not self.devbox:
return None
self._vscode_url = (
self.runloop_api_client.devboxes.create_tunnel(
id=self.devbox.id,
port=self._vscode_port,
).url
+ f'/?tkn={token}&folder={self.config.workspace_mount_path_in_sandbox}'
)
self.log(
'debug',
f'VSCode URL: {self._vscode_url}',
)
return self._vscode_url

View File

@ -5,9 +5,9 @@ from openai import OpenAI
# ==================================================================================================
# OPENAI
# TODO: Move this to EventStream Actions when EventStreamRuntime is fully implemented
# TODO: Move this to EventStream Actions when DockerRuntime is fully implemented
# NOTE: we need to get env vars inside functions because they will be set in IPython
# AFTER the agentskills is imported (the case for EventStreamRuntime)
# AFTER the agentskills is imported (the case for DockerRuntime)
# ==================================================================================================
def _get_openai_api_key():
return os.getenv('OPENAI_API_KEY', os.getenv('SANDBOX_ENV_OPENAI_API_KEY', ''))

View File

@ -12,7 +12,7 @@ from openhands.core.config import load_app_config
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.base import Runtime
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
from openhands.runtime.impl.docker.docker_runtime import DockerRuntime
from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime
from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime
from openhands.runtime.plugins import AgentSkillsRequirement, JupyterRequirement
@ -20,7 +20,7 @@ from openhands.storage import get_file_store
from openhands.utils.async_utils import call_async_from_sync
TEST_IN_CI = os.getenv('TEST_IN_CI', 'False').lower() in ['true', '1', 'yes']
TEST_RUNTIME = os.getenv('TEST_RUNTIME', 'eventstream').lower()
TEST_RUNTIME = os.getenv('TEST_RUNTIME', 'docker').lower()
RUN_AS_OPENHANDS = os.getenv('RUN_AS_OPENHANDS', 'True').lower() in ['true', '1', 'yes']
test_mount_path = ''
project_dir = os.path.dirname(
@ -62,7 +62,7 @@ def _remove_folder(folder: str) -> bool:
def _close_test_runtime(runtime: Runtime) -> None:
if isinstance(runtime, EventStreamRuntime):
if isinstance(runtime, DockerRuntime):
runtime.close(rm_all_containers=False)
else:
runtime.close()
@ -129,8 +129,8 @@ def temp_dir(tmp_path_factory: TempPathFactory, request) -> str:
# Depending on TEST_RUNTIME, feed the appropriate box class(es) to the test.
def get_runtime_classes() -> list[type[Runtime]]:
runtime = TEST_RUNTIME
if runtime.lower() == 'eventstream':
return [EventStreamRuntime]
if runtime.lower() == 'docker' or runtime.lower() == 'eventstream':
return [DockerRuntime]
elif runtime.lower() == 'remote':
return [RemoteRuntime]
elif runtime.lower() == 'runloop':
@ -173,7 +173,7 @@ def runtime_cls(request):
# TODO: We will change this to `run_as_user` when `ServerRuntime` is deprecated.
# since `EventStreamRuntime` supports running as an arbitrary user.
# since `DockerRuntime` supports running as an arbitrary user.
@pytest.fixture(scope='module', params=get_run_as_openhands())
def run_as_openhands(request):
time.sleep(1)