Add websocket runtime and od-client-runtime (#2603)

* add draft code

* add some sandbox draft code

* Export WebSocketBox and fix add_to_env async

* fix

* test execute

* add runtime draft

* add draft od-runtime-client

* refactor useless code

* format

* resume runtime

* resume runtime

* remove background command

* remove uselss action and init function

* add EventStreamRuntime test

* add echo server test

* temporarily build websocket everytimes

* remove websocket sandbox deprecated

* refactor code

* fix bug, add test

* fix bug

* remove test draft code

* refactor code, remove async

* rename file and directory

* add init plugin and runtime tools function

* add docker luanch

* fix plugin initialization

* remove test scropt

* add mock test code

* apply suggestions

---------

Co-authored-by: Boxuan Li <liboxuan@connect.hku.hk>
This commit is contained in:
Yufan Song
2024-07-08 10:44:37 -07:00
committed by GitHub
parent e6ebb4307e
commit 9fbfa0650e
5 changed files with 597 additions and 0 deletions

View File

@@ -0,0 +1,212 @@
import asyncio
import os
import websockets
import pexpect
import json
import shutil
from typing import Any
from websockets.exceptions import ConnectionClosed
from opendevin.events.serialization import event_to_dict, event_from_dict
from opendevin.events.observation import Observation
from opendevin.runtime.plugins import PluginRequirement
from opendevin.events.action import (
Action,
CmdRunAction,
IPythonRunCellAction,
)
from opendevin.events.observation import (
CmdOutputObservation,
ErrorObservation,
Observation,
IPythonRunCellObservation
)
from opendevin.runtime.plugins import (
AgentSkillsRequirement,
JupyterRequirement,
PluginRequirement,
)
class RuntimeClient():
# This runtime will listen to the websocket
# When receive an event, it will run the action and send the observation back to the websocket
def __init__(self) -> None:
self.init_shell()
self.init_websocket()
def init_websocket(self) -> None:
server = websockets.serve(self.listen, "0.0.0.0", 8080)
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
def init_shell(self) -> None:
# TODO: we need to figure a way to support different users. Maybe the runtime cli should be run as root
self.shell = pexpect.spawn('/bin/bash', encoding='utf-8')
self.shell.expect(r'[$#] ')
async def listen(self, websocket):
try:
async for message in websocket:
event_str = json.loads(message)
event = event_from_dict(event_str)
if isinstance(event, Action):
observation = self.run_action(event)
await websocket.send(json.dumps(event_to_dict(observation)))
except ConnectionClosed:
print("Connection closed")
def run_action(self, action) -> Observation:
# Should only receive Action CmdRunAction and IPythonRunCellAction
action_type = action.action # type: ignore[attr-defined]
observation = getattr(self, action_type)(action)
# TODO: see comments in https://github.com/OpenDevin/OpenDevin/pull/2603#discussion_r1668994137
observation._parent = action.id # type: ignore[attr-defined]
return observation
def run(self, action: CmdRunAction) -> Observation:
return self._run_command(action.command)
def _run_command(self, command: str) -> Observation:
try:
output, exit_code = self.execute(command)
return CmdOutputObservation(
command_id=-1, content=str(output), command=command, exit_code=exit_code
)
except UnicodeDecodeError:
return ErrorObservation('Command output could not be decoded as utf-8')
def execute(self, command):
print(f"Received command: {command}")
self.shell.sendline(command)
self.shell.expect(r'[$#] ')
output = self.shell.before.strip().split('\r\n', 1)[1].strip()
exit_code = output[-1].strip()
return output, exit_code
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
obs = self._run_command(
("cat > /tmp/opendevin_jupyter_temp.py <<'EOL'\n" f'{action.code}\n' 'EOL'),
)
# run the code
obs = self._run_command('cat /tmp/opendevin_jupyter_temp.py | execute_cli')
output = obs.content
if 'pip install' in action.code:
print(output)
package_names = action.code.split(' ', 2)[-1]
is_single_package = ' ' not in package_names
if 'Successfully installed' in output:
restart_kernel = 'import IPython\nIPython.Application.instance().kernel.do_shutdown(True)'
if (
'Note: you may need to restart the kernel to use updated packages.'
in output
):
self._run_command(
(
"cat > /tmp/opendevin_jupyter_temp.py <<'EOL'\n"
f'{restart_kernel}\n'
'EOL'
)
)
obs = self._run_command(
'cat /tmp/opendevin_jupyter_temp.py | execute_cli'
)
output = '[Package installed successfully]'
if "{'status': 'ok', 'restart': True}" != obs.content.strip():
print(obs.content)
output += (
'\n[But failed to restart the kernel to load the package]'
)
else:
output += (
'\n[Kernel restarted successfully to load the package]'
)
# re-init the kernel after restart
if action.kernel_init_code:
obs = self._run_command(
(
f"cat > /tmp/opendevin_jupyter_init.py <<'EOL'\n"
f'{action.kernel_init_code}\n'
'EOL'
),
)
obs = self._run_command(
'cat /tmp/opendevin_jupyter_init.py | execute_cli',
)
elif (
is_single_package
and f'Requirement already satisfied: {package_names}' in output
):
output = '[Package already installed]'
return IPythonRunCellObservation(content=output, code=action.code)
def close(self):
self.shell.close()
############################################################################
# Initialization work inside sandbox image
############################################################################
# init_runtime_tools do in EventStreamRuntime
def init_sandbox_plugins(self, requirements: list[PluginRequirement]) -> None:
# TODO:: test after settle donw the way to move code into sandbox
for requirement in requirements:
self._source_bashrc()
shutil.copytree(requirement.host_src, requirement.sandbox_dest)
# Execute the bash script
abs_path_to_bash_script = os.path.join(
requirement.sandbox_dest, requirement.bash_script_path
)
print(
f'Initializing plugin [{requirement.name}] by executing [{abs_path_to_bash_script}] in the sandbox.'
)
output, exit_code = self.execute(abs_path_to_bash_script)
if exit_code != 0:
raise RuntimeError(
f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output: {output}'
)
print(f'Plugin {requirement.name} initialized successfully.')
if len(requirements) > 0:
self._source_bashrc()
def _source_bashrc(self):
output, exit_code = self.execute(
'source /opendevin/bash.bashrc && source ~/.bashrc'
)
print("Yufan:",exit_code, output)
# if exit_code != 0:
# raise RuntimeError(
# f'Failed to source /opendevin/bash.bashrc and ~/.bashrc with exit code {exit_code} and output: {output}'
# )
print('Sourced /opendevin/bash.bashrc and ~/.bashrc successfully')
def test_run_commond():
client = RuntimeClient()
command = CmdRunAction(command="ls -l")
obs = client.run_action(command)
print(obs)
def test_shell(message):
shell = pexpect.spawn('/bin/bash', encoding='utf-8')
shell.expect(r'[$#] ')
print(f"Received command: {message}")
shell.sendline(message)
shell.expect(r'[$#] ')
output = shell.before.strip().split('\r\n', 1)[1].strip()
shell.close()
if __name__ == "__main__":
# print(test_shell("ls -l"))
client = RuntimeClient()
# test_run_commond()
# client.init_sandbox_plugins([AgentSkillsRequirement,JupyterRequirement])

View File

@@ -0,0 +1,62 @@
# client.py, this file is used in development to test the runtime client. It is not used in production.
import asyncio
import websockets
# Function for sending commands to the server in EventStreamRuntime
class EventStreamRuntime:
uri = 'ws://localhost:8080'
def __init__(self):
self.websocket = None
def _init_websocket(self):
self.websocket = None
# TODO: need to initialization globally only once
# self.loop = asyncio.new_event_loop()
# asyncio.set_event_loop(self.loop)
# self.loop.run_until_complete(self._init_websocket_connect())
async def execute(self, command):
self.websocket = await websockets.connect(self.uri)
print(f"Sending command: {command}")
await self.websocket.send(command)
print("Command sent, waiting for response...")
try:
output = await asyncio.wait_for(self.websocket.recv(), timeout=10)
print("Received output")
print(output)
except asyncio.TimeoutError:
print("No response received within the timeout period.")
await self.websocket.close()
# Function for testing sending commands to the server
async def send_command():
uri = "ws://localhost:8080"
while True:
try:
async with websockets.connect(uri) as websocket:
while True:
command = input("Enter the command to execute in the Docker container (type 'exit' to quit): ")
if command.lower() == 'exit':
return
await websocket.send(command)
response = await websocket.recv()
exit_code = response[-1].strip()\
# command_output = '\n'.join(response[1:-1]).strip()
# print("Yufan:", command_output)
print("Exit Code:", exit_code)
print(response)
except (websockets.exceptions.ConnectionClosed, OSError) as e:
print(f"Connection closed, retrying... ({str(e)})")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(send_command())
# if __name__ == "__main__":
# runtime = EventStreamRuntime()
# asyncio.run(runtime.execute('ls -l'))
# asyncio.run(runtime.execute('pwd'))

View File

@@ -0,0 +1,31 @@
# echo_server.py, this file is used in development to test the runtime client. It is not used in production.
import asyncio
import websockets
import pexpect
from websockets.exceptions import ConnectionClosed
import json
def is_valid_json(s):
try:
json.loads(s)
except json.JSONDecodeError:
return False
return True
# Function for testing websocket echo
async def echo(websocket, path):
async for message in websocket:
if is_valid_json(message):
event = json.loads(message)
print("Received:", event)
response = json.dumps(event)
await websocket.send(response)
else:
print("Received:", message)
response = f"Echo: {message}"
await websocket.send(response)
start_server = websockets.serve(echo, "0.0.0.0", 8080)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@@ -0,0 +1,33 @@
# execute_server.py, this file is used in development to test the runtime client. It is not used in production.
import asyncio
import websockets
import pexpect
from websockets.exceptions import ConnectionClosed
import json
# Function for testing execution of shell commands
async def execute_command(websocket, path):
shell = pexpect.spawn('/bin/bash', encoding='utf-8')
shell.expect(r'[$#] ')
try:
async for message in websocket:
try:
print(f"Received command: {message}")
shell.sendline(message)
shell.expect(r'[$#] ')
output = shell.before.strip().split('\r\n', 1)[1].strip()
print("Yufan:",output)
await websocket.send(output)
except Exception as e:
await websocket.send(f"Error: {str(e)}")
except ConnectionClosed:
print("Connection closed")
finally:
shell.close()
start_server = websockets.serve(execute_command, "0.0.0.0", 8080)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View File

@@ -0,0 +1,259 @@
from typing import Any
import asyncio
import json
import websockets
import docker
import uuid
from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
from opendevin.events.action.action import Action
from opendevin.events.event import Event
from opendevin.events.observation import Observation
from opendevin.events.stream import EventStream
from opendevin.events.serialization import event_to_dict, observation_from_dict
from opendevin.runtime.runtime import Runtime
from opendevin.runtime.server.browse import browse
from opendevin.runtime.server.files import read_file, write_file
from opendevin.runtime.plugins import PluginRequirement
from opendevin.core.config import config
from opendevin.events.observation import (
ErrorObservation,
NullObservation,
Observation,
)
from opendevin.events.action import (
AgentRecallAction,
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
import asyncio
from opendevin.events import EventSource, EventStream, EventStreamSubscriber
class EventStreamRuntime(Runtime):
# This runtime will subscribe the event stream
# When receive an event, it will send the event to od-runtime-client which run inside the docker environment
# websocket uri
uri = 'ws://localhost:8080'
container_name_prefix = 'opendevin-sandbox-'
docker_client: docker.DockerClient
def __init__(self, event_stream: EventStream, sid: str = 'default',container_image: str | None = None):
# We don't need sandbox in this runtime, because it's equal to a websocket sandbox
self._init_event_stream(event_stream)
self._init_websocket()
self._init_docker(sid,container_image)
def _init_docker(self,sid,container_image):
self.container_image = container_image
# (
# config.sandbox_container_image
# if container_image is None
# else container_image
# )
self.instance_id = (
sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
)
self.container_name = self.container_name_prefix + self.instance_id
try:
self.docker_client = docker.from_env()
self._init_sandbox()
except Exception as ex:
print(
"Launch docker client failed. Please make sure you have installed docker and started the docker daemon."
)
raise ex
def _init_event_stream(self,event_stream: EventStream):
self.event_stream = event_stream
self.event_stream.subscribe(EventStreamSubscriber.RUNTIME, self.on_event)
def _init_websocket(self):
self.websocket = None
# TODO: need to initialization globally only once
# self.loop = asyncio.new_event_loop()
# asyncio.set_event_loop(self.loop)
# self.loop.run_until_complete(self._init_websocket_connect())
async def _init_websocket_connect(self):
self.websocket = await websockets.connect(self.uri)
def _init_sandbox(self):
try:
# start the container
mount_dir = config.workspace_mount_path
self.container = self.docker_client.containers.run(
self.container_image,
command='tail -f /dev/null',
# TODO: test it in mac and linux
# network_mode='host',
working_dir=self.sandbox_workspace_dir,
name=self.container_name,
detach=True,
volumes={mount_dir: {'bind': self.sandbox_workspace_dir, 'mode': 'rw'}},
)
print('Container started')
except Exception as e:
print('Failed to start container')
raise e
@property
def sandbox_workspace_dir(self):
return config.workspace_mount_path_in_sandbox
def close(self):
containers = self.docker_client.containers.list(all=True)
for container in containers:
try:
if container.name.startswith(self.container_name_prefix):
container.remove(force=True)
except docker.errors.NotFound:
pass
self.docker_client.close()
async def on_event(self, event: Event) -> None:
print("EventStreamRuntime: on_event triggered")
if isinstance(event, Action):
observation = await self.run_action(event)
print("EventStreamRuntime: observation", observation)
# observation._cause = event.id # type: ignore[attr-defined]
source = event.source if event.source else EventSource.AGENT
await self.event_stream.add_event(observation, source)
async def run_action(self, action: Action) -> Observation:
"""
Run an action and return the resulting observation.
If the action is not runnable in any runtime, a NullObservation is returned.
If the action is not supported by the current runtime, an ErrorObservation is returned.
We will filter some action and execute in runtime. Pass others into od-runtime-client
"""
if not action.runnable:
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
return ErrorObservation(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.'
)
observation = await getattr(self, action_type)(action)
# TODO: fix ID problem, see comments https://github.com/OpenDevin/OpenDevin/pull/2603#discussion_r1668994137
observation._parent = action.id # type: ignore[attr-defined]
return observation
async def run(self, action: CmdRunAction) -> Observation:
return await self._run_command(action)
async def _run_command(
self, action: Action, _stream: bool = False, timeout: int | None = None
) -> Observation:
# Send action into websocket and get the result
# TODO: need to initialization globally only once
self.websocket = await websockets.connect(self.uri)
if self.websocket is None:
raise Exception("WebSocket is not connected.")
try:
await self.websocket.send(json.dumps(event_to_dict(action)))
output = await asyncio.wait_for(self.websocket.recv(), timeout=timeout)
output = json.loads(output)
print("Received output: ", output)
except asyncio.TimeoutError:
print("No response received within the timeout period.")
await self.websocket.close()
return observation_from_dict(output)
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
return await self._run_command(action)
############################################################################
# Keep the same with other runtimes
############################################################################
def get_working_directory(self):
# TODO: should we get this from od-runtime-client
return config.workspace_base
async def read(self, action: FileReadAction) -> Observation:
working_dir = self.get_working_directory()
return await read_file(action.path, working_dir, action.start, action.end)
async def write(self, action: FileWriteAction) -> Observation:
working_dir = self.get_working_directory()
return await write_file(
action.path, working_dir, action.content, action.start, action.end
)
async def browse(self, action: BrowseURLAction) -> Observation:
return await browse(action, self.browser)
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return await browse(action, self.browser)
async def recall(self, action: AgentRecallAction) -> Observation:
return NullObservation('')
############################################################################
# Initialization work inside sandbox image
############################################################################
# init_runtime_tools direcctly do as what Runtime do
# Do in the od_runtime_client
# Overwrite the init_sandbox_plugins
def init_sandbox_plugins(self, plugins: list[PluginRequirement]) -> None:
pass
def test_run_command():
sid = "test"
cli_session = 'main' + ('_' + sid if sid else '')
event_stream = EventStream(cli_session)
runtime = EventStreamRuntime(event_stream)
asyncio.run(runtime._run_command(CmdRunAction('ls -l')))
async def test_event_stream():
sid = "test"
cli_session = 'main' + ('_' + sid if sid else '')
event_stream = EventStream(cli_session)
runtime = EventStreamRuntime(event_stream)
# Test run command
action_cmd = CmdRunAction(command='ls -l')
print(await runtime.run_action(action_cmd))
# Test run ipython
test_code = "print('Hello, `World`!\n')"
action_opython = IPythonRunCellAction(code=test_code)
print(await runtime.run_action(action_opython))
# Test read file
action_read = FileReadAction(path='hello.sh')
print(await runtime.run_action(action_read))
# Test write file
action_write = FileWriteAction(content='echo "Hello, World!"', path='hello.sh')
print(await runtime.run_action(action_write))
# Test browse
action_browse = BrowseURLAction(url='https://google.com')
print(await runtime.run_action(action_browse))
# Test recall
action_recall = AgentRecallAction(query='who am I?')
print(await runtime.run_action(action_recall))
def test_docker_launch():
sid = "test"
cli_session = 'main' + ('_' + sid if sid else '')
event_stream = EventStream(cli_session)
runtime = EventStreamRuntime(event_stream,sid,"ghcr.io/opendevin/sandbox:main")
runtime.close()
if __name__ == "__main__":
asyncio.run(test_event_stream())