Xingyao Wang 31b244f95e
[Refactor, Evaluation] Refactor and clean up evaluation harness to remove global config and use EventStreamRuntime (#3230)
* move multi-line bash tests to test_runtime;
support multi-line bash for esruntime;

* add testcase to handle PS2 prompt

* use bashlex for bash parsing to handle multi-line commands;
add testcases for multi-line commands

* revert ghcr runtime change

* Apply stash

* fix run as other user;
make test async;

* fix test runtime for run as od

* add run-as-devin to all the runtime tests

* handle the case when username is root

* move all run-as-devin tests from sandbox;
only tests a few cases on different user to save time;

* move over multi-line echo related tests to test_runtime

* fix user-specific jupyter by fixing the pypoetry virtualenv folder

* make plugin's init async;
chdir at initialization of jupyter plugin;
move ipy simple testcase to test runtime;

* support agentskills import in
move tests for jupyter pwd tests;
overload `add_env_vars` for EventStreamRuntime to update env var also in Jupyter;
make agentskills read env var lazily, in case env var is updated;

* fix ServerRuntime agentskills issue

* move agnostic image test to test_runtime

* merge runtime tests in CI

* fix enable auto lint as env var

* update warning message

* update warning message

* test for different container images

* change parsing output as debug

* add exception handling for update_pwd_decorator

* fix unit test indentation

* add plugins as default input to Runtime class;
remove init_sandbox_plugins;
implement add_env_var (include jupyter) in the base class;

* fix server runtime auto lint

* Revert "add exception handling for update_pwd_decorator"

This reverts commit 2b668b1506e02145cb8f87e321aad62febca3d50.

* tries to print debugging info for agentskills

* explictly setting uid (try fix permission issue)

* Revert "tries to print debugging info for agentskills"

This reverts commit 8be4c86756f0e3fc62957b327ba2ac4999c419de.

* set sandbox user id during testing to hopefully fix the permission issue

* add browser tools for server runtime

* try to debug for old pwd

* update debug cmd

* only test agnostic runtime when TEST_RUNTIME is Server

* fix temp dir mkdir

* load TEST_RUNTIME at the beginning

* remove ipython tests

* only log to file when DEBUG

* default logging to project root

* temporarily remove log to file

* fix LLM logger dir

* fix logger

* make set pwd an optional aux action

* fix prev pwd

* fix infinity recursion

* simplify

* do not import the whole od library to avoid logger folder by jupyter

* fix browsing

* increase timeout

* attempt to fix agentskills yet again

* clean up in testcases, since CI maybe run as non-root

* add _cause attribute for event.id

* remove parent

* add a bunch of debugging statement again for CI :(

* fix temp_dir fixture

* change all temp dir to follow pytest's tmp_path_factory

* remove extra bracket

* clean up error printing a bit

* jupyter chdir to self.config.workspace_mount_path_in_sandbox on initialization

* jupyter chdir to self.config.workspace_mount_path_in_sandbox on initialization

* add typing for tmp dir fixture

* clear the directory before running the test to avoid weird CI temp dir

* remove agnostic test case for server runtime

* Revert "remove agnostic test case for server runtime"

This reverts commit 30e2181c3fc1410e69596c2dcd06be01f1d016b3.

* disable agnostic tests in CI

* fix test

* make sure plugin arg is not passed when no plugin is specified;
remove redundant on_event function;

* move mock prompt

* rename runtime

* remove extra logging

* refactor run_controller's interface;
support multiple runtime for integration test;
filter out hostname for prompt

* uncomment other tests

* pass the right runtime to controller

* log runtime when start

* uncomment tests

* improve symbol filters

* add intergration test prompts that seemd ok

* add integration test workflow

* add python3 to default ubuntu image

* symlink python and fix permission to jupyter pip

* add retry for jupyter execute server

* fix jupyter pip install;
add post-process for jupyter pip install;
simplify init by add agent_skills path to PYTHONPATH;
add testcase to tests jupyter pip install;

* fix bug

* use ubuntu:22.04 for eventstream integration tests

* add todo

* update testcase

* remove redundant code

* fix unit test

* reduce dependency for runtime

* try making llama-index an optional dependency that's not installed by default

* remove pip install since it seemd not needed

* log ipython execution;
await write message since it returns a future

* update ipy testcase

* do not install llama-index in CI

* do not install llama-index in the app docker as well

* set sandbox container image in the integration test script

* log plugins & env var for runtime

* update conftest for sha256

* add git

* remove all non-alphanumeric chalracters

* add working ipy module tests!

* default to use host network

* remove is_async from browser to make thing a little more reliable;
retry loading browser when error;

* add sleep to wait a bit for http server

* kill http server before regenerate browsing tests

* fix browsing

* only set sandbox container image if undefined

* skip empty config value

* update evaluation to use the latest run_controller

* revert logger in execute_server to be compatible with server runtime

* revert logging level to fix jupyter

* set logger level

* revert the logging

* chmod for workspace to fix permission

* support getting timeout from action

* update test for server runtime

* try to fix file permission

* fix test_cmd_run_action_serialization_deserialization test (added timeout)

* poetry: pip 24.2, torch 2.2.2

* revert adding pip to pyproject.toml

* add build to dependencies in pyproject.toml

* forgot poetry lock --no-update

* fix a DelegatorAgent prompt_002.log (timeout)

* fix a DelegatorAgent prompt_003.log (timeout)

* couple more timeout attribs in prompt files

* some more prompt files

* prompts galore

* add clarification comment for timeout

* default timeout to config

* add assert

* update integraton tests for eventstream

* update integration tests

* fix timeout for action<->dict

* remove redundant on_event

* default to use instance image

* update run_controller interface

* add logging for copy

* refactor swe_bench for the new design

* fix action execution timeout

* updatelock

* remove build sandbox locally

* fix runtime

* use plain for-loop for single process

* remove extra print

* get swebench inference working

* print whole `test_result` dict

* got swebench patch post-process working

* update swe-bench evaluation readme

* refactor using shared reset_logger function

* move messy swebench prompt to a different file

* support the ability to specify whether to keep prompt

* support the ability to specify whether to keep prompt

* fix dockerfile

* fix import and remove unnecessary strip logic

* fix action serialization

* get agentbench running

* remove extra ls for agent bench

* fix agentbench metric

* factor out common documentation for eval

* update biocoder doc

* remove swe_env_box since it is no longer needed

* get biocoder working

* add func timeout for bird

* fix jupyter pwd with ~ as user name

* fix jupyter pwd with ~ as user name

* get bird working

* get browsing evaluation working

* make eda runnable

* fix id column

* fix eda run_infer

* unify eval output using a structured format;
make swebench coompatible with that format;
update client source code for every swebench run;
do not inject testcmd for swebench

* standardize existing benchs for the new eval output

* set update source code = true

* get gaia standardized

* fix gaia

* gorilla refactored but stuck at language.so to test

* refactor and make gpqa work

* refactor humanevalfix and get it working

* refactor logic reasoning and get it working

* refactor browser env so it works with eventstream runtime for eval

* add initial version of miniwob refactor

* fix browsergym environment

* get miniwob working!!

* allowing injecting additional dependency to OD runtime docker image

* allowing injecting additional dependency to OD runtime docker image

* support logic reasoning with pre-injected dependency

* get mint working

* update runtime build

* fix mint docker

* add test for keep_prompt;
add missing await close for some tests

* update integration tests for eventstream runtime

* fix integration tests for server runtime

* refactor ml bench and toolqa

* refactor webarena

* fix default factory

* Update run_infer.py

* add APIError to retry

* increase timeout for swebench

* make sure to hide api key when dump eval output

* update the behavior of put source code to put files instead of tarball

* add dishash to dependency

* sendintr when timeout

* fix dockerfile copy

* reduce timeout

* use dirhash to avoid repeat building for update source

* fix runtime_build testcase

* add dir_hash to docker build pipeline

* revert api error

* update poetry lock

* add retries for swebench run infer

* fix git patch

* update poetry lock

* adjust config order

* fix mount volumns

* enforce all eval to use "instance_id"

* remove file store from runtime

* make file_store public inside eventstream

* move the runtime logic inside `main` out

* support using async function for process_instance_fn

* refactor run_infer with the create_time

* fix file store

* Update evaluation/toolqa/utils.py

Co-authored-by: Graham Neubig <neubig@gmail.com>

* fix typo

---------

Co-authored-by: tobitege <tobitege@gmx.de>
Co-authored-by: super-dainiu <78588128+super-dainiu@users.noreply.github.com>
Co-authored-by: Graham Neubig <neubig@gmail.com>
2024-08-06 17:21:45 +00:00

363 lines
14 KiB
Python

import asyncio
import os
import tempfile
import uuid
from typing import Any, Optional
from zipfile import ZipFile
import aiohttp
import docker
import tenacity
from opendevin.core.config import AppConfig
from opendevin.core.logger import opendevin_logger as logger
from opendevin.events import EventStream
from opendevin.events.action import (
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from opendevin.events.action.action import Action
from opendevin.events.observation import (
ErrorObservation,
NullObservation,
Observation,
)
from opendevin.events.serialization import event_to_dict, observation_from_dict
from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
from opendevin.runtime.plugins import PluginRequirement
from opendevin.runtime.runtime import Runtime
from opendevin.runtime.tools import RuntimeTool
from opendevin.runtime.utils import find_available_tcp_port
from opendevin.runtime.utils.runtime_build import build_runtime_image
class EventStreamRuntime(Runtime):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to od-runtime-client which run inside the docker environment.
"""
container_name_prefix = 'opendevin-sandbox-'
def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
container_image: str | None = None,
):
super().__init__(
config, event_stream, sid, plugins
) # will initialize the event stream
self._port = find_available_tcp_port()
self.api_url = f'http://localhost:{self._port}'
self.session: Optional[aiohttp.ClientSession] = None
self.instance_id = (
sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
)
# TODO: We can switch to aiodocker when `get_od_sandbox_image` is updated to use aiodocker
self.docker_client: docker.DockerClient = self._init_docker_client()
self.container_image = (
self.config.sandbox.container_image
if container_image is None
else container_image
)
self.container_name = self.container_name_prefix + self.instance_id
self.container = None
self.action_semaphore = asyncio.Semaphore(1) # Ensure one action at a time
logger.debug(f'EventStreamRuntime `{sid}` config:\n{self.config}')
async def ainit(self, env_vars: dict[str, str] | None = None):
if self.config.sandbox.od_runtime_extra_deps:
logger.info(
f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.od_runtime_extra_deps}'
)
self.container_image = build_runtime_image(
self.container_image,
self.docker_client,
# NOTE: You can need set DEBUG=true to update the source code
# inside the container. This is useful when you want to test/debug the
# latest code in the runtime docker container.
update_source_code=self.config.sandbox.update_source_code,
extra_deps=self.config.sandbox.od_runtime_extra_deps,
)
self.container = await self._init_container(
self.sandbox_workspace_dir,
mount_dir=self.config.workspace_mount_path,
plugins=self.plugins,
)
# MUST call super().ainit() to initialize both default env vars
# AND the ones in env vars!
await super().ainit(env_vars)
logger.info(
f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
)
logger.info(f'Container initialized with env vars: {env_vars}')
@staticmethod
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
)
raise ex
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
)
async def _init_container(
self,
sandbox_workspace_dir: str,
mount_dir: str | None = None,
plugins: list[PluginRequirement] | None = None,
):
try:
logger.info(
f'Starting container with image: {self.container_image} and name: {self.container_name}'
)
plugin_arg = ''
if plugins is not None and len(plugins) > 0:
plugin_arg = (
f'--plugins {" ".join([plugin.name for plugin in plugins])} '
)
network_mode: str | None = None
port_mapping: dict[str, int] | None = None
if self.config.sandbox.use_host_network:
network_mode = 'host'
logger.warn(
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop'
)
else:
port_mapping = {f'{self._port}/tcp': self._port}
if mount_dir is not None:
volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
logger.info(f'Mount dir: {sandbox_workspace_dir}')
else:
logger.warn(
'Mount dir is not set, will not mount the workspace directory to the container.'
)
volumes = None
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
else:
browsergym_arg = ''
container = self.docker_client.containers.run(
self.container_image,
command=(
f'/opendevin/miniforge3/bin/mamba run --no-capture-output -n base '
'PYTHONUNBUFFERED=1 poetry run '
f'python -u -m opendevin.runtime.client.client {self._port} '
f'--working-dir {sandbox_workspace_dir} '
f'{plugin_arg}'
f'--username {"opendevin" if self.config.run_as_devin else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/opendevin/code/',
name=self.container_name,
detach=True,
environment={'DEBUG': 'true'} if self.config.debug else None,
volumes=volumes,
)
logger.info(f'Container started. Server url: {self.api_url}')
return container
except Exception as e:
logger.error('Failed to start container')
logger.exception(e)
await self.close(close_client=False)
raise e
async def _ensure_session(self):
await asyncio.sleep(1)
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession()
return self.session
@tenacity.retry(
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_exponential(multiplier=2, min=4, max=60),
)
async def _wait_until_alive(self):
logger.info('Reconnecting session')
async with aiohttp.ClientSession() as session:
async with session.get(f'{self.api_url}/alive') as response:
if response.status == 200:
return
else:
msg = f'Action execution API is not alive. Response: {response}'
logger.error(msg)
raise RuntimeError(msg)
@property
def sandbox_workspace_dir(self):
return self.config.workspace_mount_path_in_sandbox
async def close(self, close_client: bool = True):
if self.session is not None and not self.session.closed:
await self.session.close()
containers = self.docker_client.containers.list(all=True)
for container in containers:
try:
if container.name.startswith(self.container_name_prefix):
logs = container.logs(tail=1000).decode('utf-8')
logger.debug(
f'==== Container logs ====\n{logs}\n==== End of container logs ===='
)
container.remove(force=True)
except docker.errors.NotFound:
pass
if close_client:
self.docker_client.close()
async def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
session = await self._ensure_session()
await self._wait_until_alive()
try:
if recursive:
# For recursive copy, create a zip file
with tempfile.NamedTemporaryFile(
suffix='.zip', delete=False
) as temp_zip:
temp_zip_path = temp_zip.name
with ZipFile(temp_zip_path, 'w') as zipf:
for root, _, files in os.walk(host_src):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(
file_path, os.path.dirname(host_src)
)
zipf.write(file_path, arcname)
upload_data = {'file': open(temp_zip_path, 'rb')}
else:
# For single file copy
upload_data = {'file': open(host_src, 'rb')}
params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
async with session.post(
f'{self.api_url}/upload_file', data=upload_data, params=params
) as response:
if response.status == 200:
return
else:
error_message = await response.text()
raise Exception(f'Copy operation failed: {error_message}')
except asyncio.TimeoutError:
raise TimeoutError('Copy operation timed out')
except Exception as e:
raise RuntimeError(f'Copy operation failed: {str(e)}')
finally:
if recursive:
os.unlink(temp_zip_path)
logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}')
async def run_action(self, action: Action) -> Observation:
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
async with self.action_semaphore:
if not action.runnable:
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
return ErrorObservation(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.'
)
logger.info('Awaiting session')
session = await self._ensure_session()
await self._wait_until_alive()
assert action.timeout is not None
try:
logger.info('Executing command')
async with session.post(
f'{self.api_url}/execute_action',
json={'action': event_to_dict(action)},
timeout=action.timeout,
) as response:
if response.status == 200:
output = await response.json()
obs = observation_from_dict(output)
obs._cause = action.id # type: ignore[attr-defined]
return obs
else:
error_message = await response.text()
logger.error(f'Error from server: {error_message}')
obs = ErrorObservation(
f'Command execution failed: {error_message}'
)
except asyncio.TimeoutError:
logger.error('No response received within the timeout period.')
obs = ErrorObservation('Command execution timed out')
except Exception as e:
logger.error(f'Error during command execution: {e}')
obs = ErrorObservation(f'Command execution failed: {str(e)}')
return obs
async def run(self, action: CmdRunAction) -> Observation:
return await self.run_action(action)
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
return await self.run_action(action)
async def read(self, action: FileReadAction) -> Observation:
return await self.run_action(action)
async def write(self, action: FileWriteAction) -> Observation:
return await self.run_action(action)
async def browse(self, action: BrowseURLAction) -> Observation:
return await self.run_action(action)
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return await self.run_action(action)
############################################################################
# Keep the same with other runtimes
############################################################################
def get_working_directory(self):
raise NotImplementedError(
'This method is not implemented in the runtime client.'
)
def init_runtime_tools(
self,
runtime_tools: list[RuntimeTool],
runtime_tools_config: Optional[dict[RuntimeTool, Any]] = None,
) -> None:
# TODO: deprecate this method when we move to the new EventStreamRuntime
logger.warning('init_runtime_tools is not implemented in the runtime client.')