From e89cc8f19bdc69df221626397c120378f1009afc Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 16 May 2024 22:37:49 +0800 Subject: [PATCH] Feat: add stream output to exec_run (#1625) * Feat: add stream output to exec_run * Using command timeout to control the exec_box's timeout. * add bash -c to source command to compatible for sh. Signed-off-by: ifuryst * Feat: add stream output to SSHBox execute Signed-off-by: ifuryst * fix the test case fail. Signed-off-by: ifuryst * fix the test case import wrong path for method. Signed-off-by: ifuryst --------- Signed-off-by: ifuryst --- evaluation/regression/run_tests.py | 12 ++- opendevin/core/schema/__init__.py | 3 + opendevin/core/schema/stream.py | 27 ++++++ opendevin/runtime/docker/exec_box.py | 120 +++++++++++++++++++------- opendevin/runtime/docker/local_box.py | 5 +- opendevin/runtime/docker/ssh_box.py | 82 ++++++++++++++++-- opendevin/runtime/e2b/sandbox.py | 5 +- opendevin/runtime/plugins/mixin.py | 30 +++++-- opendevin/runtime/sandbox.py | 5 +- opendevin/runtime/server/runtime.py | 2 +- opendevin/server/session/manager.py | 5 +- tests/test_fileops.py | 2 +- tests/unit/test_sandbox.py | 2 +- 13 files changed, 249 insertions(+), 51 deletions(-) create mode 100644 opendevin/core/schema/stream.py diff --git a/evaluation/regression/run_tests.py b/evaluation/regression/run_tests.py index 245dbcc3b7..705739b1c9 100644 --- a/evaluation/regression/run_tests.py +++ b/evaluation/regression/run_tests.py @@ -13,9 +13,15 @@ if __name__ == '__main__': python script_name.py [--OPENAI_API_KEY=] [--model=] """ - parser = argparse.ArgumentParser(description='This script runs pytest with specific arguments and configuration.') - parser.add_argument('--OPENAI_API_KEY', type=str, required=True, help='Your OpenAI API key') - parser.add_argument('--model', type=str, required=True, help='The model name to use') + parser = argparse.ArgumentParser( + description='This script runs pytest with specific arguments and configuration.' + ) + parser.add_argument( + '--OPENAI_API_KEY', type=str, required=True, help='Your OpenAI API key' + ) + parser.add_argument( + '--model', type=str, required=True, help='The model name to use' + ) parser_args = parser.parse_args() config.config['OPENAI_API_KEY'] = parser_args.OPENAI_API_KEY diff --git a/opendevin/core/schema/__init__.py b/opendevin/core/schema/__init__.py index 2fa4e1aa3e..416aa0736b 100644 --- a/opendevin/core/schema/__init__.py +++ b/opendevin/core/schema/__init__.py @@ -2,10 +2,13 @@ from .action import ActionType from .agent import AgentState from .config import ConfigType from .observation import ObservationType +from .stream import CancellableStream, StreamMixin __all__ = [ 'ActionType', 'ObservationType', 'ConfigType', 'AgentState', + 'CancellableStream', + 'StreamMixin', ] diff --git a/opendevin/core/schema/stream.py b/opendevin/core/schema/stream.py new file mode 100644 index 0000000000..29c9c7e463 --- /dev/null +++ b/opendevin/core/schema/stream.py @@ -0,0 +1,27 @@ +from abc import ABC, abstractmethod +from typing import Union + + +class StreamMixin: + def __init__(self, generator): + self.generator = generator + self.closed = False + + def __iter__(self): + return self + + def __next__(self): + if self.closed: + raise StopIteration + else: + return next(self.generator) + + +class CancellableStream(StreamMixin, ABC): + @abstractmethod + def close(self): + pass + + @abstractmethod + def exit_code(self) -> Union[int, None]: + pass diff --git a/opendevin/runtime/docker/exec_box.py b/opendevin/runtime/docker/exec_box.py index 545a61be7a..dd283bf875 100644 --- a/opendevin/runtime/docker/exec_box.py +++ b/opendevin/runtime/docker/exec_box.py @@ -1,6 +1,6 @@ import atexit -import concurrent.futures import os +import shlex import sys import tarfile import time @@ -14,6 +14,7 @@ from opendevin.const.guide_url import TROUBLESHOOTING_URL from opendevin.core.config import config from opendevin.core.exceptions import SandboxInvalidBackgroundCommandError from opendevin.core.logger import opendevin_logger as logger +from opendevin.core.schema import CancellableStream from opendevin.runtime.docker.process import DockerProcess, Process from opendevin.runtime.sandbox import Sandbox @@ -22,6 +23,76 @@ InputType = namedtuple('InputType', ['content']) OutputType = namedtuple('OutputType', ['content']) +ExecResult = namedtuple('ExecResult', 'exit_code,output') +""" A result of Container.exec_run with the properties ``exit_code`` and + ``output``. """ + + +class DockerExecCancellableStream(CancellableStream): + # Reference: https://github.com/docker/docker-py/issues/1989 + def __init__(self, _client, _id, _output): + super().__init__(self.read_output()) + self._id = _id + self._client = _client + self._output = _output + + def close(self): + self.closed = True + + def exit_code(self): + return self.inspect()['ExitCode'] + + def inspect(self): + return self._client.api.exec_inspect(self._id) + + def read_output(self): + for chunk in self._output: + yield chunk.decode('utf-8') + + +def container_exec_run( + container, + cmd, + stdout=True, + stderr=True, + stdin=False, + tty=False, + privileged=False, + user='', + detach=False, + stream=False, + socket=False, + environment=None, + workdir=None, +) -> ExecResult: + exec_id = container.client.api.exec_create( + container.id, + cmd, + stdout=stdout, + stderr=stderr, + stdin=stdin, + tty=tty, + privileged=privileged, + user=user, + environment=environment, + workdir=workdir, + )['Id'] + + output = container.client.api.exec_start( + exec_id, detach=detach, tty=tty, stream=stream, socket=socket + ) + + if stream: + return ExecResult( + None, DockerExecCancellableStream(container.client, exec_id, output) + ) + + if socket: + return ExecResult(None, output) + + return ExecResult(container.client.api.exec_inspect(exec_id)['ExitCode'], output) + + class DockerExecBox(Sandbox): instance_id: str container_image: str @@ -106,38 +177,27 @@ class DockerExecBox(Sandbox): bg_cmd = self.background_commands[id] return bg_cmd.read_logs() - def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]: + def execute( + self, cmd: str, stream: bool = False, timeout: int | None = None + ) -> tuple[int, str | CancellableStream]: timeout = timeout if timeout is not None else self.timeout + wrapper = f'timeout {self.timeout}s bash -c {shlex.quote(cmd)}' + _exit_code, _output = container_exec_run( + self.container, + wrapper, + stream=stream, + workdir=self.sandbox_workspace_dir, + environment=self._env, + ) - # TODO: each execute is not stateful! We need to keep track of the current working directory - def run_command(container, command): - return container.exec_run( - command, workdir=self.sandbox_workspace_dir, environment=self._env - ) + if stream: + return _exit_code, _output - # Use ThreadPoolExecutor to control command and set timeout - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit( - run_command, self.container, self.get_exec_cmd(cmd) - ) - try: - exit_code, logs = future.result(timeout=timeout) - except concurrent.futures.TimeoutError: - logger.exception( - 'Command timed out, killing process...', exc_info=False - ) - pid = self.get_pid(cmd) - if pid is not None: - self.container.exec_run( - f'kill -9 {pid}', - workdir=self.sandbox_workspace_dir, - environment=self._env, - ) - return -1, f'Command: "{cmd}" timed out' - logs_out = logs.decode('utf-8') - if logs_out.endswith('\n'): - logs_out = logs_out[:-1] - return exit_code, logs_out + print(_output) + _output = _output.decode('utf-8') + if _output.endswith('\n'): + _output = _output[:-1] + return _exit_code, _output def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False): # mkdir -p sandbox_dest if it doesn't exist diff --git a/opendevin/runtime/docker/local_box.py b/opendevin/runtime/docker/local_box.py index 44bf469fe0..6214e264cd 100644 --- a/opendevin/runtime/docker/local_box.py +++ b/opendevin/runtime/docker/local_box.py @@ -5,6 +5,7 @@ import sys from opendevin.core.config import config from opendevin.core.logger import opendevin_logger as logger +from opendevin.core.schema import CancellableStream from opendevin.runtime.docker.process import DockerProcess, Process from opendevin.runtime.sandbox import Sandbox @@ -33,7 +34,9 @@ class LocalBox(Sandbox): atexit.register(self.cleanup) super().__init__() - def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]: + def execute( + self, cmd: str, stream: bool = False, timeout: int | None = None + ) -> tuple[int, str | CancellableStream]: timeout = timeout if timeout is not None else self.timeout try: completed_process = subprocess.run( diff --git a/opendevin/runtime/docker/ssh_box.py b/opendevin/runtime/docker/ssh_box.py index 3609917b8f..d457c4167f 100644 --- a/opendevin/runtime/docker/ssh_box.py +++ b/opendevin/runtime/docker/ssh_box.py @@ -1,6 +1,7 @@ import atexit import json import os +import re import sys import tarfile import tempfile @@ -10,12 +11,13 @@ from collections import namedtuple from glob import glob import docker -from pexpect import pxssh +from pexpect import exceptions, pxssh from opendevin.const.guide_url import TROUBLESHOOTING_URL from opendevin.core.config import config from opendevin.core.exceptions import SandboxInvalidBackgroundCommandError from opendevin.core.logger import opendevin_logger as logger +from opendevin.core.schema import CancellableStream from opendevin.runtime.docker.process import DockerProcess, Process from opendevin.runtime.plugins import ( JupyterRequirement, @@ -29,6 +31,71 @@ InputType = namedtuple('InputType', ['content']) OutputType = namedtuple('OutputType', ['content']) +class SSHExecCancellableStream(CancellableStream): + def __init__(self, ssh, cmd, timeout): + super().__init__(self.read_output()) + self.ssh = ssh + self.cmd = cmd + self.timeout = timeout + + def close(self): + self.closed = True + + def exit_code(self): + self.ssh.sendline('echo $?') + success = self.ssh.prompt(timeout=self.timeout) + if not success: + return -1 + + _exit_code = self.ssh.before.strip() + return int(_exit_code) + + def read_output(self): + st = time.time() + buf = '' + crlf = '\r\n' + lf = '\n' + prompt_len = len(self.ssh.PROMPT) + while True: + try: + if self.closed: + break + _output = self.ssh.read_nonblocking(timeout=1) + if not _output: + continue + + buf += _output + + if len(buf) < prompt_len: + continue + + match = re.search(self.ssh.PROMPT, buf) + if match: + idx, _ = match.span() + yield buf[:idx].replace(crlf, lf) + buf = '' + break + + res = buf[:-prompt_len] + if len(res) == 0 or res.find(crlf) == -1: + continue + buf = buf[-prompt_len:] + yield res.replace(crlf, lf) + except exceptions.TIMEOUT: + if time.time() - st < self.timeout: + match = re.search(self.ssh.PROMPT, buf) + if match: + idx, _ = match.span() + yield buf[:idx].replace(crlf, lf) + break + continue + else: + yield buf.replace(crlf, lf) + break + except exceptions.EOF: + break + + def split_bash_commands(commands): # States NORMAL = 0 @@ -128,6 +195,7 @@ class DockerSSHBox(Sandbox): _ssh_password: str _ssh_port: int + ssh: pxssh.pxssh cur_background_id = 0 background_commands: dict[int, Process] = {} @@ -344,9 +412,10 @@ class DockerSSHBox(Sandbox): f'Command: "{cmd}" timed out. Sending SIGINT to the process: {command_output}', ) - def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]: + def execute( + self, cmd: str, stream: bool = False, timeout: int | None = None + ) -> tuple[int, str | CancellableStream]: timeout = timeout if timeout is not None else self.timeout - commands = split_bash_commands(cmd) if len(commands) > 1: all_output = '' @@ -354,11 +423,14 @@ class DockerSSHBox(Sandbox): exit_code, output = self.execute(command) if all_output: all_output += '\r\n' - all_output += output + all_output += str(output) if exit_code != 0: return exit_code, all_output return 0, all_output + self.ssh.sendline(cmd) + if stream: + return 0, SSHExecCancellableStream(self.ssh, cmd, self.timeout) success = self.ssh.prompt(timeout=timeout) if not success: logger.exception('Command timed out, killing process...', exc_info=False) @@ -499,7 +571,7 @@ class DockerSSHBox(Sandbox): exit_code, result = self.execute('pwd') if exit_code != 0: raise Exception('Failed to get working directory') - return result.strip() + return str(result).strip() @property def user_id(self): diff --git a/opendevin/runtime/e2b/sandbox.py b/opendevin/runtime/e2b/sandbox.py index b0d4e043b2..cca9171f94 100644 --- a/opendevin/runtime/e2b/sandbox.py +++ b/opendevin/runtime/e2b/sandbox.py @@ -9,6 +9,7 @@ from e2b.sandbox.exception import ( from opendevin.core.config import config from opendevin.core.logger import opendevin_logger as logger +from opendevin.core.schema import CancellableStream from opendevin.runtime.e2b.process import E2BProcess from opendevin.runtime.process import Process from opendevin.runtime.sandbox import Sandbox @@ -72,7 +73,9 @@ class E2BBox(Sandbox): assert isinstance(proc, E2BProcess) return '\n'.join([m.line for m in proc.output_messages]) - def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]: + def execute( + self, cmd: str, stream: bool = False, timeout: int | None = None + ) -> tuple[int, str | CancellableStream]: timeout = timeout if timeout is not None else self.timeout process = self.sandbox.process.start(cmd, env_vars=self._env) try: diff --git a/opendevin/runtime/plugins/mixin.py b/opendevin/runtime/plugins/mixin.py index a2ef0727e5..59a28d378d 100644 --- a/opendevin/runtime/plugins/mixin.py +++ b/opendevin/runtime/plugins/mixin.py @@ -2,13 +2,16 @@ import os from typing import Protocol from opendevin.core.logger import opendevin_logger as logger +from opendevin.core.schema import CancellableStream from opendevin.runtime.plugins.requirement import PluginRequirement class SandboxProtocol(Protocol): # https://stackoverflow.com/questions/51930339/how-do-i-correctly-add-type-hints-to-mixin-classes - def execute(self, cmd: str) -> tuple[int, str]: ... + def execute( + self, cmd: str, stream: bool = False + ) -> tuple[int, str | CancellableStream]: ... def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False): ... @@ -36,12 +39,27 @@ class PluginMixin: logger.info( f'Initializing plugin [{requirement.name}] by executing [{abs_path_to_bash_script}] in the sandbox.' ) - exit_code, output = self.execute(abs_path_to_bash_script) - if exit_code != 0: - raise RuntimeError( - f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output: {output}' + exit_code, output = self.execute(abs_path_to_bash_script, stream=True) + if isinstance(output, CancellableStream): + for line in output: + if line.endswith('\n'): + line = line[:-1] + logger.info(line) + _exit_code = output.exit_code() + output.close() + if _exit_code != 0: + raise RuntimeError( + f'Failed to initialize plugin {requirement.name} with exit code {_exit_code} and output {output}' + ) + logger.info(f'Plugin {requirement.name} initialized successfully') + else: + if exit_code != 0: + raise RuntimeError( + f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output: {output}' + ) + logger.info( + f'Plugin {requirement.name} initialized successfully\n:{output}' ) - logger.info(f'Plugin {requirement.name} initialized successfully.') if len(requirements) > 0: exit_code, output = self.execute('source ~/.bashrc') diff --git a/opendevin/runtime/sandbox.py b/opendevin/runtime/sandbox.py index 59c729bfa1..ecce4220ae 100644 --- a/opendevin/runtime/sandbox.py +++ b/opendevin/runtime/sandbox.py @@ -1,6 +1,7 @@ import os from abc import ABC, abstractmethod +from opendevin.core.schema import CancellableStream from opendevin.runtime.docker.process import Process from opendevin.runtime.plugins.mixin import PluginMixin @@ -19,7 +20,9 @@ class Sandbox(ABC, PluginMixin): self._env[key] = value @abstractmethod - def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]: + def execute( + self, cmd: str, stream: bool = False, timeout: int | None = None + ) -> tuple[int, str | CancellableStream]: pass @abstractmethod diff --git a/opendevin/runtime/server/runtime.py b/opendevin/runtime/server/runtime.py index fa0ee323e3..246fafce03 100644 --- a/opendevin/runtime/server/runtime.py +++ b/opendevin/runtime/server/runtime.py @@ -75,7 +75,7 @@ class ServerRuntime(Runtime): try: exit_code, output = self.sandbox.execute(command) return CmdOutputObservation( - command_id=-1, content=output, command=command, exit_code=exit_code + command_id=-1, content=str(output), command=command, exit_code=exit_code ) except UnicodeDecodeError: return ErrorObservation('Command output could not be decoded as utf-8') diff --git a/opendevin/server/session/manager.py b/opendevin/server/session/manager.py index 3a53f2e38c..73bb0dcee9 100644 --- a/opendevin/server/session/manager.py +++ b/opendevin/server/session/manager.py @@ -91,7 +91,10 @@ class SessionManager: session_ids_to_remove = [] for sid, session in list(self._sessions.items()): # if session inactive for a long time, remove it - if not session.is_alive and current_time - session.last_active_ts > self.session_timeout: + if ( + not session.is_alive + and current_time - session.last_active_ts > self.session_timeout + ): session_ids_to_remove.append(sid) for sid in session_ids_to_remove: diff --git a/tests/test_fileops.py b/tests/test_fileops.py index d14a2d42d7..61518646c7 100644 --- a/tests/test_fileops.py +++ b/tests/test_fileops.py @@ -3,7 +3,7 @@ from pathlib import Path import pytest from opendevin.core.config import config -from opendevin.events.action import files +from opendevin.runtime.server import files SANDBOX_PATH_PREFIX = '/workspace' diff --git a/tests/unit/test_sandbox.py b/tests/unit/test_sandbox.py index d3e70dc739..15120f4ee6 100644 --- a/tests/unit/test_sandbox.py +++ b/tests/unit/test_sandbox.py @@ -145,7 +145,7 @@ def test_ssh_box_multi_line_cmd_run_as_devin(temp_dir): config, 'sandbox_type', new='ssh' ): for box in [DockerSSHBox(), DockerExecBox()]: - exit_code, output = box.execute('pwd\nls -l') + exit_code, output = box.execute('pwd && ls -l') assert exit_code == 0, ( 'The exit code should be 0 for ' + box.__class__.__name__ )