Use Docker SDK for sandbox, integrate into CommandManager (#93)

* refactor command manager to use docker and move to docker sdk

* fix read and write actions

* actually run background cmd

* use bash for running cmds and fix logs

* keep logs in buffer file

* fix up background logs

* consolidate requirements

* fix docker imports

* add fixme

* add remove fixme

* fix sandbox.py path in README

* fix typo annotation and prompt

---------

Co-authored-by: Xingyao Wang <xingyao6@illinois.edu>
This commit is contained in:
Robert Brennan 2024-03-22 09:54:00 -04:00 committed by GitHub
parent 4fda533b91
commit 3b2ed14ae7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 227 additions and 215 deletions

View File

@ -6,7 +6,7 @@ from termcolor import colored
from typing import List, Dict
from opendevin.agent import Agent, Message, Role
from opendevin.sandbox.docker import DockerInteractive
from opendevin.sandbox.sandbox import DockerInteractive
assert (
"OPENAI_API_KEY" in os.environ

View File

@ -1,8 +0,0 @@
langchain
langchain-openai
langchain-community
llama-index
llama-index-vector-stores-chroma
chromadb
litellm
termcolor

View File

@ -8,7 +8,7 @@ Run the docker-based sandbox interactive:
```bash
mkdir workspace
python3 opendevin/sandbox/docker.py -d workspace
python3 opendevin/sandbox/sandbox.py -d workspace
```
It will map `./workspace` into the docker container with the folder permission correctly adjusted for current user.

View File

@ -5,11 +5,11 @@ def print_callback(event):
print(event, flush=True)
class AgentController:
def __init__(self, agent, max_iterations=100, callbacks=[]):
def __init__(self, agent, workdir, max_iterations=100, callbacks=[]):
self.agent = agent
self.max_iterations = max_iterations
self.background_commands = []
self.command_manager = CommandManager()
self.command_manager = CommandManager(workdir)
self.callbacks = callbacks
self.callbacks.append(self.agent.add_event)
self.callbacks.append(print_callback)

View File

@ -1,4 +1,7 @@
def read(file_path):
import os
def read(base_path, file_path):
file_path = os.path.join(base_path, file_path)
with open(file_path, 'r') as file:
return file.read()

View File

@ -1,4 +1,7 @@
def write(path, contents):
import os
def write(base_path, path, contents):
path = os.path.join(base_path, path)
with open(path, 'w') as file:
file.write(contents)
return ""

View File

@ -3,36 +3,25 @@ import select
from typing import List
from opendevin.lib.event import Event
from opendevin.sandbox.sandbox import DockerInteractive
class BackgroundCommand:
def __init__(self, id: int, command: str, process: subprocess.Popen):
def __init__(self, id: int, command: str, dir: str):
self.command = command
self.id = id
self.process = process
def _get_log_from_stream(self, stream):
logs = ""
while True:
readable, _, _ = select.select([stream], [], [], .1)
if not readable:
break
next = stream.readline()
if next == '':
break
logs += next
if logs == "": return
return logs
self.shell = DockerInteractive(id=str(id), workspace_dir=dir)
self.shell.execute_in_background(command)
def get_logs(self):
stdout = self._get_log_from_stream(self.process.stdout)
stderr = self._get_log_from_stream(self.process.stderr)
exit_code = self.process.poll()
return stdout, stderr, exit_code
# TODO: get an exit code if process is exited
return self.shell.read_logs()
class CommandManager:
def __init__(self):
def __init__(self, dir):
self.cur_id = 0
self.directory = dir
self.background_commands = {}
self.shell = DockerInteractive(id="default", workspace_dir=dir)
def run_command(self, command: str, background=False) -> str:
if background:
@ -41,49 +30,29 @@ class CommandManager:
return self.run_immediately(command)
def run_immediately(self, command: str) -> str:
result = subprocess.run(["/bin/bash", "-c", command], capture_output=True, text=True)
output = result.stdout + result.stderr
exit_code = result.returncode
exit_code, output = self.shell.execute(command)
if exit_code != 0:
raise ValueError('Command failed with exit code ' + str(exit_code) + ': ' + output)
return output
def run_background(self, command: str) -> str:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
bg_cmd = BackgroundCommand(self.cur_id, command, process)
bg_cmd = BackgroundCommand(self.cur_id, command, self.directory)
self.cur_id += 1
self.background_commands[bg_cmd.id] = bg_cmd
return "Background command started. To stop it, send a `kill` action with id " + str(bg_cmd.id)
def kill_command(self, id: int) -> str:
# TODO: get log events before killing
self.background_commands[id].processs.kill()
self.background_commands[id].shell.close()
del self.background_commands[id]
def get_background_events(self) -> List[Event]:
events = []
for id, cmd in self.background_commands.items():
stdout, stderr, exit_code = cmd.get_logs()
if stdout is not None:
events.append(Event('output', {
'output': stdout,
'stream': 'stdout',
'id': id,
'command': cmd.command,
}))
if stderr is not None:
events.append(Event('output', {
'output': stderr,
'stream': 'stderr',
'id': id,
'command': cmd.command,
}))
if exit_code is not None:
events.append(Event('output', {
'exit_code': exit_code,
'output': 'Background command %d exited with code %d' % (idx, exit_code),
'id': id,
'command': cmd.command,
}))
del self.background_commands[id]
output = cmd.get_logs()
events.append(Event('output', {
'output': output,
'id': id,
'command': cmd.command,
}))
return events

View File

@ -39,11 +39,11 @@ class Event:
return actions.browse(url)
elif self.action == 'read':
path = self.args['path']
return actions.read(path)
return actions.read(agent_controller.command_manager.directory, path)
elif self.action == 'write':
path = self.args['path']
contents = self.args['contents']
return actions.write(path, contents)
return actions.write(agent_controller.command_manager.directory, path, contents)
elif self.action == 'recall':
return agent_controller.agent.search_memory(self.args['query'])
else:

View File

@ -20,5 +20,5 @@ if __name__ == "__main__":
model_name=args.model_name
)
controller = AgentController(agent)
controller.start_loop()
controller = AgentController(agent, args.directory)
controller.start_loop()

View File

@ -1,145 +0,0 @@
import os
import pty
import sys
import uuid
import time
import shlex
import select
import subprocess
from typing import List
from collections import namedtuple
InputType = namedtuple("InputDtype", ["content"])
OutputType = namedtuple("OutputDtype", ["content"])
class DockerInteractive:
CONTAINER_IMAGE = "opendevin/sandbox:latest"
def __init__(
self,
workspace_dir: str = None,
container_image: str = None,
timeout: int = 5
):
self.instance_id: str = uuid.uuid4()
if workspace_dir is not None:
assert os.path.exists(workspace_dir), f"Directory {workspace_dir} does not exist."
# expand to absolute path
workspace_dir = os.path.abspath(workspace_dir)
else:
workspace_dir = os.getcwd()
print(f"workspace unspecified, using current directory: {workspace_dir}")
# TODO: this timeout is actually essential - need a better way to set it
# if it is too short, the container may still waiting for previous
# command to finish (e.g. apt-get update)
# if it is too long, the user may have to wait for a unnecessary long time
self.timeout: int = timeout
if container_image is None:
container_image = self.CONTAINER_IMAGE
uid = os.getuid()
cmd = (
f"docker run -it --rm --name sandbox-{self.instance_id} "
f"-v {workspace_dir}:/workspace "
f"-w /workspace "
f"--network=host "
f"{container_image} "
f"/bin/bash -c 'useradd --shell /bin/bash -u {uid} -o -c \"\" -m devin && su devin'"
)
# print(f"Starting Docker container with command: {cmd}")
self.master_fd, self.slave_fd = pty.openpty()
self.container = subprocess.Popen(
shlex.split(cmd),
stdin=self.slave_fd,
stdout=self.slave_fd,
stderr=self.slave_fd,
text=True,
close_fds=True,
)
time.sleep(1) # wait for the container to start
# TODO: use a more robust way to check if the container is ready
self.history: List[InputType | OutputType] = [
OutputType(self._wait_and_read_output())
]
def _wait_and_read_output(self, user_input: str = None) -> str:
output_str = ""
while True:
readable, _, _ = select.select([self.master_fd], [], [], self.timeout)
if readable:
output = os.read(self.master_fd, 1024).decode()
if not output:
break
output_str += output
else:
break
if user_input:
output_str = output_str.lstrip(user_input).lstrip()
return output_str
def execute(self, cmd: str) -> str:
os.write(self.master_fd, (cmd + "\n").encode())
self.history.append(InputType(cmd))
output = self._wait_and_read_output(cmd)
self.history.append(OutputType(output))
return output
def close(self):
if hasattr(self, "master_fd") and self.master_fd is not None:
os.close(self.master_fd)
self.master_fd = None
if hasattr(self, "container") and self.container is not None:
self.container.terminate()
try:
self.container.wait(timeout=5)
print("Container stopped.")
except subprocess.TimeoutExpired:
self.container.kill()
print("Container killed.")
self.container = None
def __del__(self):
self.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Interactive Docker container")
parser.add_argument(
"-d",
"--directory",
type=str,
default=None,
help="The directory to mount as the workspace in the Docker container.",
)
args = parser.parse_args()
docker_interactive = DockerInteractive(
workspace_dir=args.directory,
container_image="opendevin/sandbox:latest",
)
print("Interactive Docker container started. Type 'exit' or use Ctrl+C to exit.")
for item in docker_interactive.history:
print(item.content, end="")
sys.stdout.flush()
try:
while True:
try:
user_input = input()
except EOFError:
print("\nExiting...")
break
if user_input.lower() == "exit":
print(f"Exiting...")
break
output = docker_interactive.execute(user_input)
print(output, end="")
sys.stdout.flush()
except KeyboardInterrupt:
print("\nExiting...")
docker_interactive.close()

View File

@ -0,0 +1,182 @@
import os
import pty
import sys
import uuid
import time
import shlex
import select
import subprocess
import docker
import time
from typing import List, Tuple
from collections import namedtuple
InputType = namedtuple("InputDtype", ["content"])
OutputType = namedtuple("OutputDtype", ["content"])
CONTAINER_IMAGE = os.getenv("SANDBOX_CONTAINER_IMAGE", "opendevin/sandbox:latest")
class DockerInteractive:
def __init__(
self,
workspace_dir: str = None,
container_image: str = None,
timeout: int = 120,
id: str = None
):
if id is not None:
self.instance_id: str = id
else:
self.instance_id: str = uuid.uuid4()
if workspace_dir is not None:
assert os.path.exists(workspace_dir), f"Directory {workspace_dir} does not exist."
# expand to absolute path
self.workspace_dir = os.path.abspath(workspace_dir)
else:
self.workspace_dir = os.getcwd()
print(f"workspace unspecified, using current directory: {workspace_dir}")
# TODO: this timeout is actually essential - need a better way to set it
# if it is too short, the container may still waiting for previous
# command to finish (e.g. apt-get update)
# if it is too long, the user may have to wait for a unnecessary long time
self.timeout: int = timeout
if container_image is None:
self.container_image = CONTAINER_IMAGE
else:
self.container_image = container_image
self.container_name = f"sandbox-{self.instance_id}"
self.restart_docker_container()
uid = os.getuid()
self.execute('useradd --shell /bin/bash -u {uid} -o -c \"\" -m devin && su devin')
def read_logs(self) -> str:
if not hasattr(self, "log_generator"):
return ""
logs = ""
while True:
ready_to_read, _, _ = select.select([self.log_generator], [], [], .1)
if ready_to_read:
data = self.log_generator.read(4096)
if not data:
break
# FIXME: we're occasionally seeing some escape characters like `\x02` and `\x00` in the logs...
chunk = data.decode('utf-8')
logs += chunk
else:
break
return logs
def execute(self, cmd: str) -> Tuple[int, str]:
exit_code, logs = self.container.exec_run(['/bin/bash', '-c', cmd], workdir="/workspace")
return exit_code, logs.decode('utf-8')
def execute_in_background(self, cmd: str) -> None:
self.log_time = time.time()
result = self.container.exec_run(['/bin/bash', '-c', cmd], socket=True, workdir="/workspace")
self.log_generator = result.output # socket.SocketIO
self.log_generator._sock.setblocking(0)
def close(self):
self.stop_docker_container()
def stop_docker_container(self):
docker_client = docker.from_env()
try:
container = docker_client.containers.get(self.container_name)
container.stop()
container.remove()
elapsed = 0
while container.status != "exited":
time.sleep(1)
elapsed += 1
if elapsed > self.timeout:
break
container = docker_client.containers.get(self.container_name)
except docker.errors.NotFound:
pass
def restart_docker_container(self):
self.stop_docker_container()
docker_client = docker.from_env()
try:
self.container = docker_client.containers.run(
self.container_image,
command="tail -f /dev/null",
network_mode='host',
working_dir="/workspace",
name=self.container_name,
detach=True,
volumes={self.workspace_dir: {"bind": "/workspace", "mode": "rw"}})
except Exception as e:
print(f"Failed to start container: {e}")
raise e
# wait for container to be ready
elapsed = 0
while self.container.status != "running":
if self.container.status == "exited":
print("container exited")
print("container logs:")
print(self.container.logs())
break
time.sleep(1)
elapsed += 1
self.container = docker_client.containers.get(self.container_name)
if elapsed > self.timeout:
break
if self.container.status != "running":
raise Exception("Failed to start container")
def __del__(self):
# FIXME: this fails because python is already shutting down. How can we clean up?
# self.container.remove(force=True)
pass
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Interactive Docker container")
parser.add_argument(
"-d",
"--directory",
type=str,
default=None,
help="The directory to mount as the workspace in the Docker container.",
)
args = parser.parse_args()
docker_interactive = DockerInteractive(
workspace_dir=args.directory,
)
print("Interactive Docker container started. Type 'exit' or use Ctrl+C to exit.")
bg = DockerInteractive(
workspace_dir=args.directory,
)
bg.execute_in_background("while true; do echo 'dot ' && sleep 1; done")
sys.stdout.flush()
try:
while True:
try:
user_input = input(">>> ")
except EOFError:
print("\nExiting...")
break
if user_input.lower() == "exit":
print(f"Exiting...")
break
exit_code, output = docker_interactive.execute(user_input)
print("exit code:", exit_code)
print(output + "\n", end="")
logs = bg.read_logs()
print("background logs:", logs, "\n")
sys.stdout.flush()
except KeyboardInterrupt:
print("\nExiting...")
docker_interactive.close()

View File

@ -3,3 +3,14 @@ pandas
litellm
termcolor
seaborn
docker
fastapi
uvicorn[standard]
# for agenthub/lanchangs_agent
langchain
langchain-openai
langchain-community
llama-index
llama-index-vector-stores-chroma
chromadb

View File

@ -1,3 +0,0 @@
fastapi
uvicorn[standard]
docker