mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
feat(sandbox): Implementation of Sandbox Plugin to Support Jupyter (#1255)
* initialize plugin definition * initialize plugin definition * simplify mixin * further improve plugin mixin * add cache dir for pip * support clean up cache * add script for setup jupyter and execution server * integrate JupyterRequirement to ssh_box * source bashrc at the end of plugin load * add execute_cli that accept code via stdin * make JUPYTER_EXEC_SERVER_PORT configurable via env var * increase background cmd sleep time * Update opendevin/sandbox/plugins/mixin.py Co-authored-by: Robert Brennan <accounts@rbren.io> * add mixin to base class * make jupyter requirement a dataclass * source plugins only when >0 requirements * add `sandbox_plugins` for each agent & have controller take care of it * update build.sh to make logs available in /opendevin/logs * switch to use config for lib and cache dir * fix permission issue with /workspace * use python to implement execute_cli to avoid stdin escape issue * wait until jupyter is avaialble * support plugin via copying instead of mounting --------- Co-authored-by: Robert Brennan <accounts@rbren.io>
This commit is contained in:
parent
220dac926e
commit
fc5e075ea0
8
Makefile
8
Makefile
@ -51,7 +51,7 @@ check-system:
|
||||
echo "$(RED)Unsupported system detected. Please use macOS, Linux, or Windows Subsystem for Linux (WSL).$(RESET)"; \
|
||||
exit 1; \
|
||||
fi
|
||||
|
||||
|
||||
check-python:
|
||||
@echo "$(YELLOW)Checking Python installation...$(RESET)"
|
||||
@if command -v python3.11 > /dev/null; then \
|
||||
@ -218,6 +218,12 @@ setup-config-prompts:
|
||||
workspace_dir=$${workspace_dir:-$(DEFAULT_WORKSPACE_DIR)}; \
|
||||
echo "WORKSPACE_BASE=\"$$workspace_dir\"" >> $(CONFIG_FILE).tmp
|
||||
|
||||
# Clean up all caches
|
||||
clean:
|
||||
@echo "$(YELLOW)Cleaning up caches...$(RESET)"
|
||||
@rm -rf opendevin/.cache
|
||||
@echo "$(GREEN)Caches cleaned up successfully.$(RESET)"
|
||||
|
||||
# Help
|
||||
help:
|
||||
@echo "$(BLUE)Usage: make [target]$(RESET)"
|
||||
|
||||
@ -15,6 +15,7 @@ from opendevin.observation import (
|
||||
)
|
||||
from opendevin.parse_commands import parse_command_file
|
||||
from opendevin.state import State
|
||||
from opendevin.sandbox.plugins import PluginRequirement, JupyterRequirement
|
||||
|
||||
COMMAND_DOCS = parse_command_file()
|
||||
COMMAND_SEGMENT = (
|
||||
@ -69,6 +70,8 @@ class CodeActAgent(Agent):
|
||||
The agent works by passing the model a list of action-observation pairs and prompting the model to take the next step.
|
||||
"""
|
||||
|
||||
sandbox_plugins: List[PluginRequirement] = [JupyterRequirement()]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
llm: LLM,
|
||||
|
||||
@ -6,6 +6,7 @@ if TYPE_CHECKING:
|
||||
from opendevin.state import State
|
||||
from opendevin.llm.llm import LLM
|
||||
from opendevin.exceptions import AgentAlreadyRegisteredError, AgentNotRegisteredError
|
||||
from opendevin.sandbox.plugins import PluginRequirement
|
||||
|
||||
|
||||
class Agent(ABC):
|
||||
@ -17,6 +18,7 @@ class Agent(ABC):
|
||||
"""
|
||||
|
||||
_registry: Dict[str, Type['Agent']] = {}
|
||||
sandbox_plugins: List[PluginRequirement] = []
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import os
|
||||
|
||||
import argparse
|
||||
import toml
|
||||
import pathlib
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from opendevin.schema import ConfigType
|
||||
@ -18,6 +18,7 @@ DEFAULT_CONFIG: dict = {
|
||||
ConfigType.WORKSPACE_MOUNT_PATH: None,
|
||||
ConfigType.WORKSPACE_MOUNT_PATH_IN_SANDBOX: '/workspace',
|
||||
ConfigType.WORKSPACE_MOUNT_REWRITE: None,
|
||||
ConfigType.CACHE_DIR: os.path.join(os.path.dirname(os.path.abspath(__file__)), '.cache'),
|
||||
ConfigType.LLM_MODEL: 'gpt-3.5-turbo-1106',
|
||||
ConfigType.SANDBOX_CONTAINER_IMAGE: 'ghcr.io/opendevin/sandbox',
|
||||
ConfigType.RUN_AS_DEVIN: 'true',
|
||||
@ -145,3 +146,8 @@ def get(key: str, required: bool = False):
|
||||
if not value and required:
|
||||
raise KeyError(f"Please set '{key}' in `config.toml` or `.env`.")
|
||||
return value
|
||||
|
||||
|
||||
_cache_dir = config.get('CACHE_DIR')
|
||||
if _cache_dir:
|
||||
pathlib.Path(_cache_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@ -14,6 +14,7 @@ from opendevin.observation import (
|
||||
AgentErrorObservation,
|
||||
NullObservation,
|
||||
)
|
||||
from opendevin.sandbox.plugins import PluginRequirement
|
||||
|
||||
|
||||
class ActionManager:
|
||||
@ -41,6 +42,9 @@ class ActionManager:
|
||||
else:
|
||||
raise ValueError(f'Invalid sandbox type: {sandbox_type}')
|
||||
|
||||
def init_sandbox_plugins(self, plugins: List[PluginRequirement]):
|
||||
self.sandbox.init_plugins(plugins)
|
||||
|
||||
async def run_action(self, action: Action, agent_controller) -> Observation:
|
||||
observation: Observation = NullObservation('')
|
||||
if not action.executable:
|
||||
|
||||
@ -53,6 +53,8 @@ class AgentController:
|
||||
self.action_manager = ActionManager(self.id, container_image)
|
||||
self.max_chars = max_chars
|
||||
self.callbacks = callbacks
|
||||
# Initialize agent-required plugins for sandbox (if any)
|
||||
self.action_manager.init_sandbox_plugins(agent.sandbox_plugins)
|
||||
|
||||
def update_state_for_step(self, i):
|
||||
if self.state is None:
|
||||
|
||||
@ -4,6 +4,8 @@ import os
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
import tarfile
|
||||
from glob import glob
|
||||
from collections import namedtuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
@ -122,6 +124,37 @@ class DockerExecBox(Sandbox):
|
||||
return -1, f'Command: "{cmd}" timed out'
|
||||
return exit_code, logs.decode('utf-8')
|
||||
|
||||
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
|
||||
# mkdir -p sandbox_dest if it doesn't exist
|
||||
exit_code, logs = self.container.exec_run(
|
||||
['/bin/bash', '-c', f'mkdir -p {sandbox_dest}'],
|
||||
workdir=SANDBOX_WORKSPACE_DIR,
|
||||
)
|
||||
if exit_code != 0:
|
||||
raise Exception(
|
||||
f'Failed to create directory {sandbox_dest} in sandbox: {logs}')
|
||||
|
||||
if recursive:
|
||||
assert os.path.isdir(host_src), 'Source must be a directory when recursive is True'
|
||||
files = glob(host_src + '/**/*', recursive=True)
|
||||
srcname = os.path.basename(host_src)
|
||||
tar_filename = os.path.join(os.path.dirname(host_src), srcname + '.tar')
|
||||
with tarfile.open(tar_filename, mode='w') as tar:
|
||||
for file in files:
|
||||
tar.add(file, arcname=os.path.relpath(file, os.path.dirname(host_src)))
|
||||
else:
|
||||
assert os.path.isfile(host_src), 'Source must be a file when recursive is False'
|
||||
srcname = os.path.basename(host_src)
|
||||
tar_filename = os.path.join(os.path.dirname(host_src), srcname + '.tar')
|
||||
with tarfile.open(tar_filename, mode='w') as tar:
|
||||
tar.add(host_src, arcname=srcname)
|
||||
|
||||
with open(tar_filename, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
self.container.put_archive(os.path.dirname(sandbox_dest), data)
|
||||
os.remove(tar_filename)
|
||||
|
||||
def execute_in_background(self, cmd: str) -> Process:
|
||||
result = self.container.exec_run(
|
||||
self.get_exec_cmd(cmd), socket=True, workdir=SANDBOX_WORKSPACE_DIR
|
||||
|
||||
@ -39,6 +39,25 @@ class LocalBox(Sandbox):
|
||||
except subprocess.TimeoutExpired:
|
||||
return -1, 'Command timed out'
|
||||
|
||||
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
|
||||
# mkdir -p sandbox_dest if it doesn't exist
|
||||
res = subprocess.run(f'mkdir -p {sandbox_dest}', shell=True, text=True, cwd=config.get('WORKSPACE_BASE'))
|
||||
if res.returncode != 0:
|
||||
raise RuntimeError(f'Failed to create directory {sandbox_dest} in sandbox')
|
||||
|
||||
if recursive:
|
||||
res = subprocess.run(
|
||||
f'cp -r {host_src} {sandbox_dest}', shell=True, text=True, cwd=config.get('WORKSPACE_BASE')
|
||||
)
|
||||
if res.returncode != 0:
|
||||
raise RuntimeError(f'Failed to copy {host_src} to {sandbox_dest} in sandbox')
|
||||
else:
|
||||
res = subprocess.run(
|
||||
f'cp {host_src} {sandbox_dest}', shell=True, text=True, cwd=config.get('WORKSPACE_BASE')
|
||||
)
|
||||
if res.returncode != 0:
|
||||
raise RuntimeError(f'Failed to copy {host_src} to {sandbox_dest} in sandbox')
|
||||
|
||||
def execute_in_background(self, cmd: str) -> Process:
|
||||
process = subprocess.Popen(
|
||||
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||||
|
||||
@ -4,6 +4,8 @@ import platform
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
import tarfile
|
||||
from glob import glob
|
||||
from collections import namedtuple
|
||||
from typing import Dict, List, Tuple, Union
|
||||
|
||||
@ -15,6 +17,7 @@ from opendevin.logger import opendevin_logger as logger
|
||||
from opendevin.sandbox.sandbox import Sandbox
|
||||
from opendevin.sandbox.process import Process
|
||||
from opendevin.sandbox.docker.process import DockerProcess
|
||||
from opendevin.sandbox.plugins.jupyter import JupyterRequirement
|
||||
from opendevin.schema import ConfigType
|
||||
from opendevin.utils import find_available_tcp_port
|
||||
from opendevin.exceptions import SandboxInvalidBackgroundCommandError
|
||||
@ -58,10 +61,10 @@ class DockerSSHBox(Sandbox):
|
||||
background_commands: Dict[int, Process] = {}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
container_image: str | None = None,
|
||||
timeout: int = 120,
|
||||
sid: str | None = None,
|
||||
self,
|
||||
container_image: str | None = None,
|
||||
timeout: int = 120,
|
||||
sid: str | None = None,
|
||||
):
|
||||
# Initialize docker client. Throws an exception if Docker is not reachable.
|
||||
try:
|
||||
@ -137,6 +140,22 @@ class DockerSSHBox(Sandbox):
|
||||
)
|
||||
if exit_code != 0:
|
||||
raise Exception(f'Failed to set password in sandbox: {logs}')
|
||||
|
||||
# chown the home directory
|
||||
exit_code, logs = self.container.exec_run(
|
||||
['/bin/bash', '-c', 'chown opendevin:root /home/opendevin'],
|
||||
workdir=SANDBOX_WORKSPACE_DIR,
|
||||
)
|
||||
if exit_code != 0:
|
||||
raise Exception(
|
||||
f'Failed to chown home directory for opendevin in sandbox: {logs}')
|
||||
exit_code, logs = self.container.exec_run(
|
||||
['/bin/bash', '-c', f'chown opendevin:root {SANDBOX_WORKSPACE_DIR}'],
|
||||
workdir=SANDBOX_WORKSPACE_DIR,
|
||||
)
|
||||
if exit_code != 0:
|
||||
raise Exception(
|
||||
f'Failed to chown workspace directory for opendevin in sandbox: {logs}')
|
||||
else:
|
||||
exit_code, logs = self.container.exec_run(
|
||||
# change password for root
|
||||
@ -208,6 +227,37 @@ class DockerSSHBox(Sandbox):
|
||||
exit_code = int(exit_code.lstrip('echo $?').strip())
|
||||
return exit_code, command_output
|
||||
|
||||
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
|
||||
# mkdir -p sandbox_dest if it doesn't exist
|
||||
exit_code, logs = self.container.exec_run(
|
||||
['/bin/bash', '-c', f'mkdir -p {sandbox_dest}'],
|
||||
workdir=SANDBOX_WORKSPACE_DIR,
|
||||
)
|
||||
if exit_code != 0:
|
||||
raise Exception(
|
||||
f'Failed to create directory {sandbox_dest} in sandbox: {logs}')
|
||||
|
||||
if recursive:
|
||||
assert os.path.isdir(host_src), 'Source must be a directory when recursive is True'
|
||||
files = glob(host_src + '/**/*', recursive=True)
|
||||
srcname = os.path.basename(host_src)
|
||||
tar_filename = os.path.join(os.path.dirname(host_src), srcname + '.tar')
|
||||
with tarfile.open(tar_filename, mode='w') as tar:
|
||||
for file in files:
|
||||
tar.add(file, arcname=os.path.relpath(file, os.path.dirname(host_src)))
|
||||
else:
|
||||
assert os.path.isfile(host_src), 'Source must be a file when recursive is False'
|
||||
srcname = os.path.basename(host_src)
|
||||
tar_filename = os.path.join(os.path.dirname(host_src), srcname + '.tar')
|
||||
with tarfile.open(tar_filename, mode='w') as tar:
|
||||
tar.add(host_src, arcname=srcname)
|
||||
|
||||
with open(tar_filename, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
self.container.put_archive(os.path.dirname(sandbox_dest), data)
|
||||
os.remove(tar_filename)
|
||||
|
||||
def execute_in_background(self, cmd: str) -> Process:
|
||||
result = self.container.exec_run(
|
||||
self.get_exec_cmd(cmd), socket=True, workdir=SANDBOX_WORKSPACE_DIR
|
||||
@ -307,6 +357,11 @@ class DockerSSHBox(Sandbox):
|
||||
'bind': SANDBOX_WORKSPACE_DIR,
|
||||
'mode': 'rw'
|
||||
},
|
||||
# mount cache directory to /home/opendevin/.cache for pip cache reuse
|
||||
config.get('CACHE_DIR'): {
|
||||
'bind': '/home/opendevin/.cache' if RUN_AS_DEVIN else '/root/.cache',
|
||||
'mode': 'rw'
|
||||
},
|
||||
},
|
||||
)
|
||||
logger.info('Container started')
|
||||
@ -355,8 +410,11 @@ if __name__ == '__main__':
|
||||
logger.info(
|
||||
"Interactive Docker container started. Type 'exit' or use Ctrl+C to exit.")
|
||||
|
||||
# Initialize required plugins
|
||||
ssh_box.init_plugins([JupyterRequirement()])
|
||||
|
||||
bg_cmd = ssh_box.execute_in_background(
|
||||
"while true; do echo 'dot ' && sleep 1; done"
|
||||
"while true; do echo 'dot ' && sleep 10; done"
|
||||
)
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
@ -61,6 +61,10 @@ class E2BBox(Sandbox):
|
||||
assert process_output.exit_code is not None
|
||||
return process_output.exit_code, logs_str
|
||||
|
||||
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
|
||||
# FIXME
|
||||
raise NotImplementedError('Copying files to E2B sandbox is not implemented yet')
|
||||
|
||||
def execute_in_background(self, cmd: str) -> Process:
|
||||
process = self.sandbox.process.start(cmd)
|
||||
e2b_process = E2BProcess(process, cmd)
|
||||
|
||||
7
opendevin/sandbox/plugins/__init__.py
Normal file
7
opendevin/sandbox/plugins/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
from .mixin import PluginMixin
|
||||
from .requirement import PluginRequirement
|
||||
|
||||
# Requirements
|
||||
from .jupyter import JupyterRequirement
|
||||
|
||||
__all__ = ['PluginMixin', 'PluginRequirement', 'JupyterRequirement']
|
||||
11
opendevin/sandbox/plugins/jupyter/__init__.py
Normal file
11
opendevin/sandbox/plugins/jupyter/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from opendevin.sandbox.plugins.requirement import PluginRequirement
|
||||
|
||||
|
||||
@dataclass
|
||||
class JupyterRequirement(PluginRequirement):
|
||||
name: str = 'jupyter'
|
||||
host_src: str = os.path.dirname(os.path.abspath(__file__)) # The directory of this file (sandbox/plugins/jupyter)
|
||||
sandbox_dest: str = '/opendevin/plugins/jupyter'
|
||||
bash_script_path: str = 'setup.sh'
|
||||
25
opendevin/sandbox/plugins/jupyter/execute_cli
Executable file
25
opendevin/sandbox/plugins/jupyter/execute_cli
Executable file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import requests
|
||||
|
||||
# Read the Python code from STDIN
|
||||
code = sys.stdin.read()
|
||||
|
||||
# Set the default kernel ID
|
||||
kernel_id = 'default'
|
||||
|
||||
# try 5 times until success
|
||||
PORT = os.environ.get('JUPYTER_EXEC_SERVER_PORT')
|
||||
POST_URL = f'http://localhost:{PORT}/execute'
|
||||
|
||||
for i in range(5):
|
||||
response = requests.post(POST_URL, json={'kernel_id': kernel_id, 'code': code})
|
||||
# if "500: Internal Server Error" is not in the response, break the loop
|
||||
if '500: Internal Server Error' not in response.text:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
# Print the response
|
||||
print(str(response.text))
|
||||
270
opendevin/sandbox/plugins/jupyter/execute_server
Executable file
270
opendevin/sandbox/plugins/jupyter/execute_server
Executable file
@ -0,0 +1,270 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import re
|
||||
import asyncio
|
||||
import tornado
|
||||
import logging
|
||||
|
||||
from tornado.escape import json_encode, json_decode, url_escape
|
||||
from tornado.websocket import websocket_connect
|
||||
from tornado.ioloop import PeriodicCallback
|
||||
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
|
||||
from uuid import uuid4
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
def strip_ansi(o: str) -> str:
|
||||
"""
|
||||
Removes ANSI escape sequences from `o`, as defined by ECMA-048 in
|
||||
http://www.ecma-international.org/publications/files/ECMA-ST/Ecma-048.pdf
|
||||
|
||||
# https://github.com/ewen-lbh/python-strip-ansi/blob/master/strip_ansi/__init__.py
|
||||
|
||||
>>> strip_ansi("\\033[33mLorem ipsum\\033[0m")
|
||||
'Lorem ipsum'
|
||||
|
||||
>>> strip_ansi("Lorem \\033[38;25mIpsum\\033[0m sit\\namet.")
|
||||
'Lorem Ipsum sit\\namet.'
|
||||
|
||||
>>> strip_ansi("")
|
||||
''
|
||||
|
||||
>>> strip_ansi("\\x1b[0m")
|
||||
''
|
||||
|
||||
>>> strip_ansi("Lorem")
|
||||
'Lorem'
|
||||
|
||||
>>> strip_ansi('\\x1b[38;5;32mLorem ipsum\\x1b[0m')
|
||||
'Lorem ipsum'
|
||||
|
||||
>>> strip_ansi('\\x1b[1m\\x1b[46m\\x1b[31mLorem dolor sit ipsum\\x1b[0m')
|
||||
'Lorem dolor sit ipsum'
|
||||
"""
|
||||
|
||||
# pattern = re.compile(r'/(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]/')
|
||||
pattern = re.compile(r'\x1B\[\d+(;\d+){0,2}m')
|
||||
stripped = pattern.sub('', o)
|
||||
return stripped
|
||||
|
||||
|
||||
class JupyterKernel:
|
||||
def __init__(
|
||||
self,
|
||||
url_suffix,
|
||||
convid,
|
||||
lang='python'
|
||||
):
|
||||
self.base_url = f'http://{url_suffix}'
|
||||
self.base_ws_url = f'ws://{url_suffix}'
|
||||
self.lang = lang
|
||||
self.kernel_id = None
|
||||
self.ws = None
|
||||
self.convid = convid
|
||||
logging.info(f'Jupyter kernel created for conversation {convid} at {url_suffix}')
|
||||
|
||||
self.heartbeat_interval = 10000 # 10 seconds
|
||||
self.heartbeat_callback = None
|
||||
|
||||
async def initialize(self):
|
||||
await self.execute(r'%colors nocolor')
|
||||
# pre-defined tools
|
||||
self.tools_to_run = [
|
||||
# TODO: You can add code for your pre-defined tools here
|
||||
]
|
||||
for tool in self.tools_to_run:
|
||||
# logging.info(f'Tool initialized:\n{tool}')
|
||||
await self.execute(tool)
|
||||
|
||||
async def _send_heartbeat(self):
|
||||
if not self.ws:
|
||||
return
|
||||
try:
|
||||
self.ws.ping()
|
||||
# logging.info('Heartbeat sent...')
|
||||
except tornado.iostream.StreamClosedError:
|
||||
# logging.info('Heartbeat failed, reconnecting...')
|
||||
try:
|
||||
await self._connect()
|
||||
except ConnectionRefusedError:
|
||||
logging.info('ConnectionRefusedError: Failed to reconnect to kernel websocket - Is the kernel still running?')
|
||||
|
||||
async def _connect(self):
|
||||
if self.ws:
|
||||
self.ws.close()
|
||||
self.ws = None
|
||||
|
||||
client = AsyncHTTPClient()
|
||||
if not self.kernel_id:
|
||||
n_tries = 5
|
||||
while n_tries > 0:
|
||||
try:
|
||||
response = await client.fetch(
|
||||
'{}/api/kernels'.format(self.base_url),
|
||||
method='POST',
|
||||
body=json_encode({'name': self.lang}),
|
||||
)
|
||||
kernel = json_decode(response.body)
|
||||
self.kernel_id = kernel['id']
|
||||
break
|
||||
except Exception:
|
||||
# kernels are not ready yet
|
||||
n_tries -= 1
|
||||
await asyncio.sleep(1)
|
||||
|
||||
if n_tries == 0:
|
||||
raise ConnectionRefusedError('Failed to connect to kernel')
|
||||
|
||||
ws_req = HTTPRequest(
|
||||
url='{}/api/kernels/{}/channels'.format(
|
||||
self.base_ws_url, url_escape(self.kernel_id)
|
||||
)
|
||||
)
|
||||
self.ws = await websocket_connect(ws_req)
|
||||
logging.info('Connected to kernel websocket')
|
||||
|
||||
# Setup heartbeat
|
||||
if self.heartbeat_callback:
|
||||
self.heartbeat_callback.stop()
|
||||
self.heartbeat_callback = PeriodicCallback(self._send_heartbeat, self.heartbeat_interval)
|
||||
self.heartbeat_callback.start()
|
||||
|
||||
async def execute(self, code, timeout=60):
|
||||
if not self.ws:
|
||||
await self._connect()
|
||||
|
||||
msg_id = uuid4().hex
|
||||
self.ws.write_message(
|
||||
json_encode(
|
||||
{
|
||||
'header': {
|
||||
'username': '',
|
||||
'version': '5.0',
|
||||
'session': '',
|
||||
'msg_id': msg_id,
|
||||
'msg_type': 'execute_request',
|
||||
},
|
||||
'parent_header': {},
|
||||
'channel': 'shell',
|
||||
'content': {
|
||||
'code': code,
|
||||
'silent': False,
|
||||
'store_history': False,
|
||||
'user_expressions': {},
|
||||
'allow_stdin': False,
|
||||
},
|
||||
'metadata': {},
|
||||
'buffers': {},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
outputs = []
|
||||
|
||||
async def wait_for_messages():
|
||||
execution_done = False
|
||||
while not execution_done:
|
||||
msg = await self.ws.read_message()
|
||||
msg = json_decode(msg)
|
||||
msg_type = msg['msg_type']
|
||||
parent_msg_id = msg['parent_header'].get('msg_id', None)
|
||||
|
||||
if parent_msg_id != msg_id:
|
||||
continue
|
||||
|
||||
if os.environ.get('DEBUG', False):
|
||||
logging.info(f"MSG TYPE: {msg_type.upper()} DONE:{execution_done}\nCONTENT: {msg['content']}")
|
||||
|
||||
if msg_type == 'error':
|
||||
traceback = '\n'.join(msg['content']['traceback'])
|
||||
outputs.append(traceback)
|
||||
execution_done = True
|
||||
elif msg_type == 'stream':
|
||||
outputs.append(msg['content']['text'])
|
||||
elif msg_type in ['execute_result', 'display_data']:
|
||||
outputs.append(msg['content']['data']['text/plain'])
|
||||
if 'image/png' in msg['content']['data']:
|
||||
# use markdone to display image (in case of large image)
|
||||
# outputs.append(f"\n<img src=\'data:image/png;base64,{msg['content']['data']['image/png']}\'/>\n")
|
||||
outputs.append(f"")
|
||||
|
||||
elif msg_type == 'execute_reply':
|
||||
execution_done = True
|
||||
return execution_done
|
||||
|
||||
async def interrupt_kernel():
|
||||
client = AsyncHTTPClient()
|
||||
interrupt_response = await client.fetch(
|
||||
f'{self.base_url}/api/kernels/{self.kernel_id}/interrupt',
|
||||
method='POST',
|
||||
body=json_encode({'kernel_id': self.kernel_id}),
|
||||
)
|
||||
logging.info(f'Kernel interrupted: {interrupt_response}')
|
||||
|
||||
try:
|
||||
execution_done = await asyncio.wait_for(wait_for_messages(), timeout)
|
||||
except asyncio.TimeoutError:
|
||||
await interrupt_kernel()
|
||||
return f'[Execution timed out ({timeout} seconds).]'
|
||||
|
||||
if not outputs and execution_done:
|
||||
ret = '[Code executed successfully with no output]'
|
||||
else:
|
||||
ret = ''.join(outputs)
|
||||
|
||||
# Remove ANSI
|
||||
ret = strip_ansi(ret)
|
||||
|
||||
if os.environ.get('DEBUG', False):
|
||||
logging.info(f'OUTPUT:\n{ret}')
|
||||
return ret
|
||||
|
||||
async def shutdown_async(self):
|
||||
if self.kernel_id:
|
||||
client = AsyncHTTPClient()
|
||||
await client.fetch(
|
||||
'{}/api/kernels/{}'.format(self.base_url, self.kernel_id),
|
||||
method='DELETE',
|
||||
)
|
||||
self.kernel_id = None
|
||||
if self.ws:
|
||||
self.ws.close()
|
||||
self.ws = None
|
||||
|
||||
|
||||
class ExecuteHandler(tornado.web.RequestHandler):
|
||||
def initialize(self, jupyter_kernel):
|
||||
self.jupyter_kernel = jupyter_kernel
|
||||
|
||||
async def post(self):
|
||||
data = json_decode(self.request.body)
|
||||
code = data.get('code')
|
||||
|
||||
if not code:
|
||||
self.set_status(400)
|
||||
self.write('Missing code')
|
||||
return
|
||||
|
||||
output = await self.jupyter_kernel.execute(code)
|
||||
|
||||
self.write(output)
|
||||
|
||||
|
||||
def make_app():
|
||||
jupyter_kernel = JupyterKernel(
|
||||
f"localhost:{os.environ.get('JUPYTER_GATEWAY_PORT')}",
|
||||
os.environ.get('JUPYTER_GATEWAY_KERNEL_ID')
|
||||
)
|
||||
asyncio.get_event_loop().run_until_complete(jupyter_kernel.initialize())
|
||||
|
||||
return tornado.web.Application([
|
||||
(r'/execute', ExecuteHandler, {'jupyter_kernel': jupyter_kernel}),
|
||||
])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app = make_app()
|
||||
app.listen(os.environ.get('JUPYTER_EXEC_SERVER_PORT'))
|
||||
tornado.ioloop.IOLoop.current().start()
|
||||
51
opendevin/sandbox/plugins/jupyter/setup.sh
Executable file
51
opendevin/sandbox/plugins/jupyter/setup.sh
Executable file
@ -0,0 +1,51 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
# ADD /opendevin/plugins to PATH to make `jupyter_cli` available
|
||||
echo 'export PATH=$PATH:/opendevin/plugins/jupyter' >> ~/.bashrc
|
||||
export PATH=/opendevin/plugins/jupyter:$PATH
|
||||
|
||||
# if user name is `opendevin`, add '/home/opendevin/.local/bin' to PATH
|
||||
if [ "$USER" = "opendevin" ]; then
|
||||
echo 'export PATH=$PATH:/home/opendevin/.local/bin' >> ~/.bashrc
|
||||
export PATH=$PATH:/home/opendevin/.local/bin
|
||||
fi
|
||||
# if user name is `root`, add '/root/.local/bin' to PATH
|
||||
if [ "$USER" = "root" ]; then
|
||||
echo 'export PATH=$PATH:/root/.local/bin' >> ~/.bashrc
|
||||
export PATH=$PATH:/root/.local/bin
|
||||
fi
|
||||
|
||||
# Install dependencies
|
||||
pip install jupyterlab notebook jupyter_kernel_gateway
|
||||
|
||||
# Create logs directory
|
||||
sudo mkdir -p /opendevin/logs && sudo chmod 777 /opendevin/logs
|
||||
|
||||
# Run background process to start jupyter kernel gateway
|
||||
export JUPYTER_GATEWAY_PORT=18888
|
||||
jupyter kernelgateway --KernelGatewayApp.ip=0.0.0.0 --KernelGatewayApp.port=$JUPYTER_GATEWAY_PORT > /opendevin/logs/jupyter_kernel_gateway.log 2>&1 &
|
||||
export JUPYTER_GATEWAY_PID=$!
|
||||
echo "export JUPYTER_GATEWAY_PID=$JUPYTER_GATEWAY_PID" >> ~/.bashrc
|
||||
export JUPYTER_GATEWAY_KERNEL_ID="default"
|
||||
echo "export JUPYTER_GATEWAY_KERNEL_ID=$JUPYTER_GATEWAY_KERNEL_ID" >> ~/.bashrc
|
||||
echo "JupyterKernelGateway started with PID: $JUPYTER_GATEWAY_PID"
|
||||
|
||||
# Start the jupyter_server
|
||||
export JUPYTER_EXEC_SERVER_PORT=18889
|
||||
echo "export JUPYTER_EXEC_SERVER_PORT=$JUPYTER_EXEC_SERVER_PORT" >> ~/.bashrc
|
||||
/opendevin/plugins/jupyter/execute_server > /opendevin/logs/jupyter_execute_server.log 2>&1 &
|
||||
export JUPYTER_EXEC_SERVER_PID=$!
|
||||
echo "export JUPYTER_EXEC_SERVER_PID=$JUPYTER_EXEC_SERVER_PID" >> ~/.bashrc
|
||||
echo "Execution server started with PID: $JUPYTER_EXEC_SERVER_PID"
|
||||
|
||||
# Wait until /opendevin/logs/jupyter_kernel_gateway.log contains "is available"
|
||||
while ! grep -q "is available" /opendevin/logs/jupyter_kernel_gateway.log; do
|
||||
sleep 1
|
||||
done
|
||||
# Wait until /opendevin/logs/jupyter_execute_server.log contains "Jupyter kernel created for conversation"
|
||||
while ! grep -q "Jupyter kernel created for conversation" /opendevin/logs/jupyter_execute_server.log; do
|
||||
sleep 1
|
||||
done
|
||||
echo "Jupyter kernel ready."
|
||||
39
opendevin/sandbox/plugins/mixin.py
Normal file
39
opendevin/sandbox/plugins/mixin.py
Normal file
@ -0,0 +1,39 @@
|
||||
import os
|
||||
from typing import List, Protocol, Tuple
|
||||
from opendevin.logger import opendevin_logger as logger
|
||||
from opendevin.sandbox.plugins.requirement import PluginRequirement
|
||||
|
||||
|
||||
class SandboxProtocol(Protocol):
|
||||
# https://stackoverflow.com/questions/51930339/how-do-i-correctly-add-type-hints-to-mixin-classes
|
||||
|
||||
def execute(self, cmd: str) -> Tuple[int, str]:
|
||||
...
|
||||
|
||||
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
|
||||
...
|
||||
|
||||
|
||||
class PluginMixin:
|
||||
"""Mixin for Sandbox to support plugins."""
|
||||
|
||||
def init_plugins(self: SandboxProtocol, requirements: List[PluginRequirement]):
|
||||
"""Load a plugin into the sandbox."""
|
||||
for requirement in requirements:
|
||||
# copy over the files
|
||||
self.copy_to(requirement.host_src, requirement.sandbox_dest, recursive=True)
|
||||
logger.info(f'Copied files from [{requirement.host_src}] to [{requirement.sandbox_dest}] inside sandbox.')
|
||||
|
||||
# Execute the bash script
|
||||
abs_path_to_bash_script = os.path.join(requirement.sandbox_dest, requirement.bash_script_path)
|
||||
logger.info(f'Initalizing plugin [{requirement.name}] by executing [{abs_path_to_bash_script}] in the sandbox.')
|
||||
exit_code, output = self.execute(abs_path_to_bash_script)
|
||||
if exit_code != 0:
|
||||
raise RuntimeError(f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output {output}')
|
||||
logger.info(f'Plugin {requirement.name} initialized successfully\n:{output}')
|
||||
|
||||
if len(requirements) > 0:
|
||||
exit_code, output = self.execute('source ~/.bashrc')
|
||||
if exit_code != 0:
|
||||
raise RuntimeError(f'Failed to source ~/.bashrc with exit code {exit_code} and output {output}')
|
||||
logger.info('Sourced ~/.bashrc successfully')
|
||||
12
opendevin/sandbox/plugins/requirement.py
Normal file
12
opendevin/sandbox/plugins/requirement.py
Normal file
@ -0,0 +1,12 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PluginRequirement:
|
||||
"""Requirement for a plugin."""
|
||||
name: str
|
||||
# FOLDER/FILES to be copied to the sandbox
|
||||
host_src: str
|
||||
sandbox_dest: str
|
||||
# NOTE: bash_script_path shoulds be relative to the `sandbox_dest` path
|
||||
bash_script_path: str
|
||||
@ -3,9 +3,10 @@ from typing import Dict
|
||||
from typing import Tuple
|
||||
|
||||
from opendevin.sandbox.process import Process
|
||||
from opendevin.sandbox.plugins.mixin import PluginMixin
|
||||
|
||||
|
||||
class Sandbox(ABC):
|
||||
class Sandbox(ABC, PluginMixin):
|
||||
background_commands: Dict[int, Process] = {}
|
||||
|
||||
@abstractmethod
|
||||
@ -27,3 +28,7 @@ class Sandbox(ABC):
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
|
||||
pass
|
||||
|
||||
@ -8,6 +8,7 @@ class ConfigType(str, Enum):
|
||||
WORKSPACE_MOUNT_PATH = 'WORKSPACE_MOUNT_PATH'
|
||||
WORKSPACE_MOUNT_REWRITE = 'WORKSPACE_MOUNT_REWRITE'
|
||||
WORKSPACE_MOUNT_PATH_IN_SANDBOX = 'WORKSPACE_MOUNT_PATH_IN_SANDBOX'
|
||||
CACHE_DIR = 'CACHE_DIR'
|
||||
LLM_MODEL = 'LLM_MODEL'
|
||||
SANDBOX_CONTAINER_IMAGE = 'SANDBOX_CONTAINER_IMAGE'
|
||||
RUN_AS_DEVIN = 'RUN_AS_DEVIN'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user