feat(runtime): upgrade E2B runtime to v2.0 with full implementation (#10832)

This commit is contained in:
Ruilin Zhou 2025-09-07 21:32:08 -07:00 committed by GitHub
parent d5d5e265f8
commit 9960d11d08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 444 additions and 75 deletions

View File

@ -863,7 +863,7 @@ fi
# If the instructions file is not found in the workspace root, try to load it from the repo root
self.log(
'debug',
f'.openhands_instructions not present, trying to load from repository {microagents_dir=}',
f'.openhands_instructions not present, trying to load from repository microagents_dir={microagents_dir}',
)
obs = self.read(
FileReadAction(path=str(repo_root / '.openhands_instructions'))

32
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand.
[[package]]
name = "aiofiles"
@ -2257,15 +2257,15 @@ files = [
[[package]]
name = "e2b"
version = "1.7.0"
version = "2.0.0"
description = "E2B SDK that give agents cloud environments"
optional = true
python-versions = "<4.0,>=3.9"
groups = ["main"]
markers = "extra == \"third-party-runtimes\""
files = [
{file = "e2b-1.7.0-py3-none-any.whl", hash = "sha256:6bd3d935249fcf5684494a97178d4d58446b4ed4018ac09087e4000046e82aab"},
{file = "e2b-1.7.0.tar.gz", hash = "sha256:7783408c2cdf7aee9b088d31759364f2b13b21100cc4e132ba36fd84cfc72e31"},
{file = "e2b-2.0.0-py3-none-any.whl", hash = "sha256:a6621b905cb2a883a9c520736ae98343a6184fc90c29b4f2f079d720294a0df0"},
{file = "e2b-2.0.0.tar.gz", hash = "sha256:4d033d937b0a09b8428e73233321a913cbaef8e7299fc731579c656e9d53a144"},
]
[package.dependencies]
@ -2273,10 +2273,28 @@ attrs = ">=23.2.0"
httpcore = ">=1.0.5,<2.0.0"
httpx = ">=0.27.0,<1.0.0"
packaging = ">=24.1"
protobuf = ">=5.29.4,<6.0.0"
protobuf = ">=4.21.0"
python-dateutil = ">=2.8.2"
typing-extensions = ">=4.1.0"
[[package]]
name = "e2b-code-interpreter"
version = "2.0.0"
description = "E2B Code Interpreter - Stateful code execution"
optional = true
python-versions = "<4.0,>=3.9"
groups = ["main"]
markers = "extra == \"third-party-runtimes\""
files = [
{file = "e2b_code_interpreter-2.0.0-py3-none-any.whl", hash = "sha256:273642d4dd78f09327fb1553fe4f7ddcf17892b78f98236e038d29985e42dca5"},
{file = "e2b_code_interpreter-2.0.0.tar.gz", hash = "sha256:19136916be8de60bfd0a678742501d1d0335442bb6e86405c7dd6f98059b73c4"},
]
[package.dependencies]
attrs = ">=21.3.0"
e2b = ">=2.0.0,<3.0.0"
httpx = ">=0.20.0,<1.0.0"
[[package]]
name = "english-words"
version = "2.0.1"
@ -11845,9 +11863,9 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\
cffi = ["cffi (>=1.11)"]
[extras]
third-party-runtimes = ["daytona", "e2b", "modal", "runloop-api-client"]
third-party-runtimes = ["daytona", "e2b-code-interpreter", "modal", "runloop-api-client"]
[metadata]
lock-version = "2.1"
python-versions = "^3.12,<3.14"
content-hash = "a0ae2cee596dde71f89c06e9669efda58ee8f8f019fad3dbe9df068005c32904"
content-hash = "5135db5c5c744f7b2aab0ccb6921343d2268d8ef950e024ddc3bce25c597140a"

View File

@ -96,14 +96,14 @@ memory-profiler = "^0.61.0"
jupyter_kernel_gateway = "*"
# Third-party runtime dependencies (optional)
e2b = { version = ">=1.0.5,<1.8.0", optional = true }
modal = { version = ">=0.66.26,<1.2.0", optional = true }
runloop-api-client = { version = "0.50.0", optional = true }
daytona = { version = "0.24.2", optional = true }
httpx-aiohttp = "^0.1.8"
e2b-code-interpreter = { version = "^2.0.0", optional = true }
[tool.poetry.extras]
third_party_runtimes = [ "e2b", "modal", "runloop-api-client", "daytona" ]
third_party_runtimes = [ "e2b-code-interpreter", "modal", "runloop-api-client", "daytona" ]
[tool.poetry.group.dev]
optional = true

View File

@ -1,32 +1,50 @@
import os
from typing import Callable
from openhands.core.config import OpenHandsConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events.action import (
BrowseURLAction,
BrowseInteractiveAction,
CmdRunAction,
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.observation import (
BrowserOutputObservation,
CmdOutputObservation,
ErrorObservation,
FileEditObservation,
FileReadObservation,
FileWriteObservation,
IPythonRunCellObservation,
Observation,
)
from openhands.events.stream import EventStream
from openhands.integrations.provider import PROVIDER_TOKEN_TYPE
from openhands.llm.llm_registry import LLMRegistry
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient,
)
from third_party.runtime.impl.e2b.filestore import E2BFileStore
from third_party.runtime.impl.e2b.sandbox import E2BSandbox
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.runtime_status import RuntimeStatus
from openhands.runtime.utils.files import insert_lines, read_lines
from openhands.utils.async_utils import call_sync_from_async
from third_party.runtime.impl.e2b.filestore import E2BFileStore
from third_party.runtime.impl.e2b.sandbox import E2BBox, E2BSandbox
class E2BRuntime(ActionExecutionClient):
# Class-level cache for sandbox IDs
_sandbox_id_cache: dict[str, str] = {}
def __init__(
self,
config: OpenHandsConfig,
event_stream: EventStream,
llm_registry: LLMRegistry,
sid: str = "default",
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
@ -37,42 +55,348 @@ class E2BRuntime(ActionExecutionClient):
git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
sandbox: E2BSandbox | None = None,
):
if config.workspace_base is not None:
logger.warning(
"Setting workspace_base is not supported in the E2B runtime. "
"E2B provides its own isolated filesystem."
)
super().__init__(
config,
event_stream,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
headless_mode,
user_id,
git_provider_tokens,
config=config,
event_stream=event_stream,
llm_registry=llm_registry,
sid=sid,
plugins=plugins,
env_vars=env_vars,
status_callback=status_callback,
attach_to_existing=attach_to_existing,
headless_mode=headless_mode,
user_id=user_id,
git_provider_tokens=git_provider_tokens,
)
if sandbox is None:
self.sandbox = E2BSandbox(config.sandbox)
if not isinstance(self.sandbox, E2BSandbox):
raise ValueError("E2BRuntime requires an E2BSandbox")
self.file_store = E2BFileStore(self.sandbox.filesystem)
self.sandbox = sandbox
self.file_store = None
self.api_url = None
self._action_server_port = 8000
self._runtime_initialized = False
@property
def action_execution_server_url(self) -> str:
"""Return the URL of the action execution server."""
if not self.api_url:
raise RuntimeError("E2B runtime not connected. Call connect() first.")
return self.api_url
async def connect(self) -> None:
"""Initialize E2B sandbox and start action execution server."""
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
try:
if self.attach_to_existing and self.sandbox is None:
try:
cached_sandbox_id = self.__class__._sandbox_id_cache.get(self.sid)
if cached_sandbox_id:
try:
self.sandbox = E2BBox(self.config.sandbox, sandbox_id=cached_sandbox_id)
logger.info(f"Successfully attached to existing E2B sandbox: {cached_sandbox_id}")
except Exception as e:
logger.warning(f"Failed to connect to cached sandbox {cached_sandbox_id}: {e}")
del self.__class__._sandbox_id_cache[self.sid]
self.sandbox = None
except Exception as e:
logger.warning(f"Failed to attach to existing sandbox: {e}. Will create a new one.")
# Create E2B sandbox if not provided
if self.sandbox is None:
try:
self.sandbox = E2BSandbox(self.config.sandbox)
sandbox_id = self.sandbox.sandbox.sandbox_id
logger.info(f"E2B sandbox created with ID: {sandbox_id}")
self.__class__._sandbox_id_cache[self.sid] = sandbox_id
except Exception as e:
logger.error(f"Failed to create E2B sandbox: {e}")
raise
if not isinstance(self.sandbox, (E2BSandbox, E2BBox)):
raise ValueError("E2BRuntime requires an E2BSandbox or E2BBox")
self.file_store = E2BFileStore(self.sandbox.filesystem)
# E2B doesn't use action execution server - set dummy URL
self.api_url = "direct://e2b-sandbox"
workspace_dir = self.config.workspace_mount_path_in_sandbox
if workspace_dir:
try:
exit_code, output = self.sandbox.execute(f"sudo mkdir -p {workspace_dir}")
if exit_code == 0:
self.sandbox.execute(f"sudo chmod 777 {workspace_dir}")
logger.info(f"Created workspace directory: {workspace_dir}")
else:
logger.warning(f"Failed to create workspace directory: {output}")
except Exception as e:
logger.warning(f"Failed to create workspace directory: {e}")
await call_sync_from_async(self.setup_initial_env)
self._runtime_initialized = True
self.set_runtime_status(RuntimeStatus.READY)
logger.info("E2B runtime connected successfully")
except Exception as e:
logger.error(f"Failed to connect E2B runtime: {e}")
self.set_runtime_status(RuntimeStatus.FAILED)
raise
async def close(self) -> None:
"""Close the E2B runtime."""
if self._runtime_closed:
return
self._runtime_closed = True
if self.sandbox:
try:
if not self.attach_to_existing:
self.sandbox.close()
if self.sid in self.__class__._sandbox_id_cache:
del self.__class__._sandbox_id_cache[self.sid]
logger.info("E2B sandbox closed and removed from cache")
else:
logger.info("E2B runtime connection closed, sandbox kept running for reuse")
except Exception as e:
logger.error(f"Error closing E2B sandbox: {e}")
parent_close = super().close()
if parent_close is not None:
await parent_close
def run(self, action: CmdRunAction) -> Observation:
"""Execute command using E2B's native execute method."""
if self.sandbox is None:
return ErrorObservation("E2B sandbox not initialized")
try:
timeout = action.timeout if action.timeout else self.config.sandbox.timeout
exit_code, output = self.sandbox.execute(action.command, timeout=timeout)
return CmdOutputObservation(
content=output,
command=action.command,
exit_code=exit_code
)
except Exception as e:
return ErrorObservation(f"Failed to execute command: {e}")
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
"""Execute IPython code using E2B's code interpreter."""
if self.sandbox is None:
return ErrorObservation("E2B sandbox not initialized")
try:
result = self.sandbox.sandbox.run_code(action.code)
outputs = []
if hasattr(result, 'results') and result.results:
for r in result.results:
if hasattr(r, 'text') and r.text:
outputs.append(r.text)
elif hasattr(r, 'html') and r.html:
outputs.append(r.html)
elif hasattr(r, 'png') and r.png:
outputs.append(f"[Image data: {len(r.png)} bytes]")
if hasattr(result, 'error') and result.error:
return ErrorObservation(f"IPython error: {result.error}")
return IPythonRunCellObservation(
content='\n'.join(outputs) if outputs else '',
code=action.code
)
except Exception as e:
return ErrorObservation(f"Failed to execute IPython code: {e}")
def read(self, action: FileReadAction) -> Observation:
content = self.file_store.read(action.path)
lines = read_lines(content.split("\n"), action.start, action.end)
code_view = "".join(lines)
return FileReadObservation(code_view, path=action.path)
if self.file_store is None:
return ErrorObservation("E2B file store not initialized. Call connect() first.")
try:
content = self.file_store.read(action.path)
lines = read_lines(content.split("\n"), action.start, action.end)
code_view = "".join(lines)
return FileReadObservation(code_view, path=action.path)
except Exception as e:
return ErrorObservation(f"Failed to read file: {e}")
def write(self, action: FileWriteAction) -> Observation:
if action.start == 0 and action.end == -1:
self.file_store.write(action.path, action.content)
return FileWriteObservation(content="", path=action.path)
files = self.file_store.list(action.path)
if action.path in files:
all_lines = self.file_store.read(action.path).split("\n")
new_file = insert_lines(
action.content.split("\n"), all_lines, action.start, action.end
if self.file_store is None:
return ErrorObservation("E2B file store not initialized. Call connect() first.")
try:
if action.start == 0 and action.end == -1:
self.file_store.write(action.path, action.content)
return FileWriteObservation(content="", path=action.path)
files = self.file_store.list(action.path)
if action.path in files:
all_lines = self.file_store.read(action.path).split("\n")
new_file = insert_lines(
action.content.split("\n"), all_lines, action.start, action.end
)
self.file_store.write(action.path, "".join(new_file))
return FileWriteObservation("", path=action.path)
else:
# Create a new file
self.file_store.write(action.path, action.content)
return FileWriteObservation(content="", path=action.path)
except Exception as e:
return ErrorObservation(f"Failed to write file: {e}")
def edit(self, action: FileEditAction) -> Observation:
"""Edit a file using E2B's file system."""
if self.file_store is None:
return ErrorObservation("E2B file store not initialized. Call connect() first.")
try:
if action.path in self.file_store.list(action.path):
content = self.file_store.read(action.path)
else:
return ErrorObservation(f"File {action.path} not found")
lines = content.split('\n')
if action.start < 0 or action.end > len(lines):
return ErrorObservation(f"Invalid line range: {action.start}-{action.end}")
new_lines = lines[:action.start] + action.content.split('\n') + lines[action.end:]
new_content = '\n'.join(new_lines)
self.file_store.write(action.path, new_content)
return FileEditObservation(
content='',
path=action.path,
old_content='\n'.join(lines[action.start:action.end]),
start=action.start,
end=action.end
)
self.file_store.write(action.path, "".join(new_file))
return FileWriteObservation("", path=action.path)
else:
# FIXME: we should create a new file here
return ErrorObservation(f"File not found: {action.path}")
except Exception as e:
return ErrorObservation(f"Failed to edit file: {e}")
def browse(self, action: BrowseURLAction) -> Observation:
"""Browse a URL using E2B's browser capabilities."""
if self.sandbox is None:
return ErrorObservation("E2B sandbox not initialized")
try:
exit_code, output = self.sandbox.execute(f"curl -s -L '{action.url}'")
if exit_code != 0:
exit_code, output = self.sandbox.execute(f"wget -qO- '{action.url}'")
if exit_code != 0:
return ErrorObservation(f"Failed to fetch URL: {output}")
return BrowserOutputObservation(
content=output,
url=action.url,
screenshot=None,
error=None if exit_code == 0 else output
)
except Exception as e:
return ErrorObservation(f"Failed to browse URL: {e}")
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
"""Interactive browsing is not supported in E2B."""
return ErrorObservation(
"Interactive browsing is not supported in E2B runtime. "
"Use browse() for simple URL fetching or consider using a different runtime."
)
def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox."""
if self.sandbox is None:
logger.warning("Cannot list files: E2B sandbox not initialized")
return []
if path is None:
path = self.config.workspace_mount_path_in_sandbox or '/workspace'
try:
exit_code, output = self.sandbox.execute(f"find {path} -maxdepth 1 -type f -o -type d")
if exit_code == 0:
files = [line.strip() for line in output.strip().split('\n') if line.strip()]
return [f.replace(path + '/', '') if f.startswith(path + '/') else f for f in files]
else:
logger.warning(f"Failed to list files in {path}: {output}")
return []
except Exception as e:
logger.warning(f"Error listing files: {e}")
return []
def add_env_vars(self, env_vars: dict[str, str]) -> None:
"""Add environment variables to the E2B sandbox."""
if self.sandbox is None:
logger.warning("Cannot add env vars: E2B sandbox not initialized")
return
if not hasattr(self, '_env_vars'):
self._env_vars = {}
self._env_vars.update(env_vars)
for key, value in env_vars.items():
try:
escaped_value = value.replace("'", "'\"'\"'")
cmd = f"export {key}='{escaped_value}'"
self.sandbox.execute(cmd)
logger.debug(f"Set env var: {key}")
except Exception as e:
logger.warning(f"Failed to set env var {key}: {e}")
def get_working_directory(self) -> str:
"""Get the current working directory."""
if self.sandbox is None:
return self.config.workspace_mount_path_in_sandbox or '/workspace'
try:
exit_code, output = self.sandbox.execute("pwd")
if exit_code == 0:
return output.strip()
except Exception:
pass
return self.config.workspace_mount_path_in_sandbox or '/workspace'
def get_mcp_config(self, extra_stdio_servers: list | None = None) -> dict:
"""Get MCP configuration for E2B runtime."""
return {
'stdio_servers': extra_stdio_servers or []
}
def check_if_alive(self) -> None:
"""Check if the E2B sandbox is alive."""
if self.sandbox is None:
raise RuntimeError("E2B sandbox not initialized")
return
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False) -> None:
"""Copy files to the E2B sandbox."""
if self.sandbox is None:
raise RuntimeError("E2B sandbox not initialized")
self.sandbox.copy_to(host_src, sandbox_dest, recursive)
def get_vscode_token(self) -> str:
"""E2B doesn't support VSCode integration."""
return ""
@classmethod
def setup(cls, config: OpenHandsConfig, headless_mode: bool = False) -> None:
"""Set up the E2B runtime environment."""
logger.info("E2B runtime setup called")
pass
@classmethod
def teardown(cls, config: OpenHandsConfig) -> None:
"""Tear down the E2B runtime environment."""
logger.info("E2B runtime teardown called")
pass

View File

@ -3,7 +3,7 @@ import os
import tarfile
from glob import glob
from e2b import Sandbox as E2BSandbox
from e2b_code_interpreter import Sandbox
from e2b.exceptions import TimeoutException
from openhands.core.config import SandboxConfig
@ -19,7 +19,7 @@ class E2BBox:
def __init__(
self,
config: SandboxConfig,
template: str = "openhands",
sandbox_id: str | None = None,
):
self.config = copy.deepcopy(config)
self.initialize_plugins: bool = config.initialize_plugins
@ -30,20 +30,41 @@ class E2BBox:
raise ValueError(
"E2B_API_KEY environment variable is required for E2B runtime"
)
# Read custom E2B domain if provided
e2b_domain = os.getenv("E2B_DOMAIN")
if e2b_domain:
logger.info(f'Using custom E2B domain: {e2b_domain}')
self.sandbox = E2BSandbox(
api_key=e2b_api_key,
template=template,
# It's possible to stream stdout and stderr from sandbox and from each process
on_stderr=lambda x: logger.debug(f"E2B sandbox stderr: {x}"),
on_stdout=lambda x: logger.debug(f"E2B sandbox stdout: {x}"),
cwd=self._cwd, # Default workdir inside sandbox
)
logger.debug(f'Started E2B sandbox with ID "{self.sandbox.id}"')
# E2B v2 requires using create() method or connect to existing
try:
# Configure E2B client with custom domain if provided
create_kwargs = {}
connect_kwargs = {}
if e2b_domain:
# Set up custom domain configuration
# Note: This depends on E2B SDK version and may need adjustment
os.environ['E2B_API_URL'] = f'https://{e2b_domain}'
logger.info(f'Set E2B_API_URL to https://{e2b_domain}')
if sandbox_id:
# Connect to existing sandbox
self.sandbox = Sandbox.connect(sandbox_id, **connect_kwargs)
logger.info(f'Connected to existing E2B sandbox with ID "{sandbox_id}"')
else:
# Create new sandbox (e2b-code-interpreter doesn't need template)
self.sandbox = Sandbox.create(**create_kwargs)
sandbox_id = self.sandbox.sandbox_id
logger.info(f'Created E2B sandbox with ID "{sandbox_id}"')
except Exception as e:
logger.error(f"Failed to create/connect E2B sandbox: {e}")
raise
@property
def filesystem(self):
return self.sandbox.filesystem
# E2B v2 uses 'files' instead of 'filesystem'
return getattr(self.sandbox, 'files', None) or getattr(self.sandbox, 'filesystem', None)
def _archive(self, host_src: str, recursive: bool = False):
if recursive:
@ -70,21 +91,23 @@ class E2BBox:
def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
timeout = timeout if timeout is not None else self.config.timeout
process = self.sandbox.process.start(cmd, env_vars=self._env)
# E2B code-interpreter uses commands.run()
try:
process_output = process.wait(timeout=timeout)
result = self.sandbox.commands.run(cmd)
output = ""
if hasattr(result, 'stdout') and result.stdout:
output += result.stdout
if hasattr(result, 'stderr') and result.stderr:
output += result.stderr
exit_code = getattr(result, 'exit_code', 0) or 0
return exit_code, output
except TimeoutException:
logger.debug("Command timed out, killing process...")
process.kill()
logger.debug("Command timed out")
return -1, f'Command: "{cmd}" timed out'
logs = [m.line for m in process_output.messages]
logs_str = "\n".join(logs)
if process.exit_code is None:
return -1, logs_str
assert process_output.exit_code is not None
return process_output.exit_code, logs_str
except Exception as e:
logger.error(f"Command execution failed: {e}")
return -1, str(e)
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
"""Copies a local file or directory to the sandbox."""
@ -98,24 +121,28 @@ class E2BBox:
uploaded_path = self.sandbox.upload_file(tar_file)
# Check if sandbox_dest exists. If not, create it.
process = self.sandbox.process.start_and_wait(f"test -d {sandbox_dest}")
if process.exit_code != 0:
self.sandbox.filesystem.make_dir(sandbox_dest)
exit_code, _ = self.execute(f"test -d {sandbox_dest}")
if exit_code != 0:
self.execute(f"mkdir -p {sandbox_dest}")
# Extract the archive into the destination and delete the archive
process = self.sandbox.process.start_and_wait(
exit_code, output = self.execute(
f"sudo tar -xf {uploaded_path} -C {sandbox_dest} && sudo rm {uploaded_path}"
)
if process.exit_code != 0:
if exit_code != 0:
raise Exception(
f"Failed to extract {uploaded_path} to {sandbox_dest}: {process.stderr}"
f"Failed to extract {uploaded_path} to {sandbox_dest}: {output}"
)
# Delete the local archive
os.remove(tar_filename)
def close(self):
self.sandbox.close()
# E2B v2 uses kill() instead of close()
if hasattr(self.sandbox, 'kill'):
self.sandbox.kill()
elif hasattr(self.sandbox, 'close'):
self.sandbox.close()
def get_working_directory(self):
return self.sandbox.cwd