From 8b1f207d39c899499375b8d9c33fad6595cd796d Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Thu, 29 Aug 2024 10:53:37 -0500 Subject: [PATCH] feat: support remote runtime (#3406) * feat: refactor building logic into runtime builder * return image name * fix testcases * use runtime builder for eventstream runtime * have runtime builder return str * add api_key to sandbox config * draft remote runtime * remove extra if clause * initialize runtime based on box class * add build logic * use base64 for file upload * get runtime image prefix from API * replace ___ with _s_ to make it a valid image name * use /build to start build and /build_status to check the build progress * update logging * fix exit code * always use port * add remote runtime * rename runtime * fix tests import * make dir first if work_dir does not exists; * update debug print to remote runtime * fix exit close_sync * update logging * add retry for stop * use all box class for test keep prompt * fix test browsing * add retry stop * merge init commands to save startup time * fix await * remove sandbox url * support execute through specific runtime url * fix file ops * simplify close * factor out runtime retry code * fix exception handling * fix content type error (e.g., bad gateway when runtime is not ready) * add retry for wait until alive; add retry for check image exists * Revert "add retry for wait until alive;" This reverts commit dd013cd2681a159cd07747497d8c95e145d01c32. * retry when wait until alive * clean up msg * directly save sdist to temp dir for _put_source_code_to_dir * support running testcases in parallel * tweak logging; try to close session * try to close session even on exception * update poetry lock * support remote to run integration tests * add warning for workspace base on remote runtime * set default runtime api * remove server runtime * update poetry lock * support running swe-bench (n=1) eval on remoteruntime * add a timeout of 30 min * add todo for docker namespace * update poetry loc --- evaluation/swe_bench/README.md | 6 + evaluation/swe_bench/run_infer.py | 27 +- openhands/core/config.py | 5 +- openhands/runtime/__init__.py | 4 + openhands/runtime/builder/base.py | 4 +- openhands/runtime/builder/remote.py | 117 +++++++ openhands/runtime/client/client.py | 8 +- openhands/runtime/remote/runtime.py | 424 +++++++++++++++++++++++ openhands/runtime/runtime.py | 12 +- openhands/runtime/utils/runtime_build.py | 80 ++--- poetry.lock | 38 +- pyproject.toml | 3 + tests/integration/test_agent.py | 2 +- tests/runtime/conftest.py | 3 + tests/runtime/test_bash.py | 8 +- tests/runtime/test_browsing.py | 6 +- tests/unit/test_runtime_build.py | 22 +- 17 files changed, 683 insertions(+), 86 deletions(-) create mode 100644 openhands/runtime/builder/remote.py create mode 100644 openhands/runtime/remote/runtime.py diff --git a/evaluation/swe_bench/README.md b/evaluation/swe_bench/README.md index ef2d3492fa..90697359cd 100644 --- a/evaluation/swe_bench/README.md +++ b/evaluation/swe_bench/README.md @@ -72,6 +72,12 @@ then your command would be: ./evaluation/swe_bench/scripts/run_infer.sh llm.eval_gpt4_1106_preview HEAD CodeActAgent 10 ``` +**Evaluate on `RemoteRuntime` (alpha)** (contact Xingyao over slack if you want to try this out!) +```bash +SANDBOX_API_KEY="CONTACT-XINGYAO-TO-GET-A-TESTING-API-KEY" RUNTIME=remote EVAL_DOCKER_IMAGE_PREFIX="us-docker.pkg.dev/evaluation-428620/swe-bench-images" ./evaluation/swe_bench/scripts/run_infer.sh llm.eval HEAD CodeActAgent 300 +``` +Multi-processing is still WIP. + ### Specify a subset of tasks to run infer If you would like to specify a list of tasks you'd like to benchmark on, you could diff --git a/evaluation/swe_bench/run_infer.py b/evaluation/swe_bench/run_infer.py index 2f308ba61e..2a17980335 100644 --- a/evaluation/swe_bench/run_infer.py +++ b/evaluation/swe_bench/run_infer.py @@ -24,6 +24,7 @@ from openhands.core.config import ( AppConfig, SandboxConfig, get_llm_config_arg, + load_from_env, parse_arguments, ) from openhands.core.logger import openhands_logger as logger @@ -86,6 +87,19 @@ def get_instruction(instance: pd.Series, metadata: EvalMetadata): return instruction +# TODO: migrate all swe-bench docker to ghcr.io/openhands +DOCKER_IMAGE_PREFIX = os.environ.get('EVAL_DOCKER_IMAGE_PREFIX', 'docker.io/xingyaoww/') +logger.info(f'Using docker image prefix: {DOCKER_IMAGE_PREFIX}') + + +def get_instance_docker_image(instance_id: str) -> str: + image_name = 'sweb.eval.x86_64.' + instance_id + image_name = image_name.replace( + '__', '_s_' + ) # to comply with docker image naming convention + return DOCKER_IMAGE_PREFIX.rstrip('/') + '/' + image_name + + def get_config( instance: pd.Series, metadata: EvalMetadata, @@ -93,14 +107,14 @@ def get_config( SWE_BENCH_CONTAINER_IMAGE = 'ghcr.io/opendevin/eval-swe-bench:full-v1.2.1' if USE_INSTANCE_IMAGE: # We use a different instance image for the each instance of swe-bench eval - base_container_image = 'sweb.eval.x86_64.' + instance['instance_id'] + base_container_image = get_instance_docker_image(instance['instance_id']) else: base_container_image = SWE_BENCH_CONTAINER_IMAGE + logger.info(f'Using swe-bench container image: {base_container_image}') config = AppConfig( default_agent=metadata.agent_class, run_as_openhands=False, - runtime='eventstream', max_budget_per_task=4, max_iterations=metadata.max_iterations, sandbox=SandboxConfig( @@ -114,6 +128,15 @@ def get_config( workspace_base=None, workspace_mount_path=None, ) + selected_env_vars = {'runtime', 'sandbox_api_key'} + selected_env_vars = { + k: v for k, v in os.environ.items() if k.lower() in selected_env_vars + } + if selected_env_vars: + logger.info( + f'Loading config keys from env vars: {list(selected_env_vars.keys())}' + ) + load_from_env(config, selected_env_vars) config.set_llm_config(metadata.llm_config) return config diff --git a/openhands/core/config.py b/openhands/core/config.py index 3537522ad9..da49d4f33a 100644 --- a/openhands/core/config.py +++ b/openhands/core/config.py @@ -201,9 +201,8 @@ class SandboxConfig: """ api_hostname: str = 'localhost' - base_container_image: str | None = ( - 'nikolaik/python-nodejs:python3.11-nodejs22' # default to nikolaik/python-nodejs:python3.11-nodejs22 for eventstream runtime - ) + api_key: str | None = None + base_container_image: str = 'nikolaik/python-nodejs:python3.11-nodejs22' # default to nikolaik/python-nodejs:python3.11-nodejs22 for eventstream runtime runtime_container_image: str | None = None user_id: int = os.getuid() if hasattr(os, 'getuid') else 1000 timeout: int = 120 diff --git a/openhands/runtime/__init__.py b/openhands/runtime/__init__.py index 97c212cc42..1452ef0f40 100644 --- a/openhands/runtime/__init__.py +++ b/openhands/runtime/__init__.py @@ -11,6 +11,10 @@ def get_runtime_cls(name: str): from openhands.runtime.e2b.runtime import E2BRuntime return E2BRuntime + elif name == 'remote': + from openhands.runtime.remote.runtime import RemoteRuntime + + return RemoteRuntime else: raise ValueError(f'Runtime {name} not supported') diff --git a/openhands/runtime/builder/base.py b/openhands/runtime/builder/base.py index cd78cc4c0b..9101d65000 100644 --- a/openhands/runtime/builder/base.py +++ b/openhands/runtime/builder/base.py @@ -16,7 +16,9 @@ class RuntimeBuilder(abc.ABC): tags (list[str]): The tags to apply to the runtime image (e.g., ["repo:my-repo", "sha:my-sha"]). Returns: - str: The name of the runtime image (e.g., "repo:sha"). + str: The name:tag of the runtime image after build (e.g., "repo:sha"). + This can be different from the tags input if the builder chooses to mutate the tags (e.g., adding a + registry prefix). This should be used for subsequent use (e.g., `docker run`). Raises: RuntimeError: If the build failed. diff --git a/openhands/runtime/builder/remote.py b/openhands/runtime/builder/remote.py new file mode 100644 index 0000000000..f3270b6b8d --- /dev/null +++ b/openhands/runtime/builder/remote.py @@ -0,0 +1,117 @@ +import base64 +import io +import tarfile +import time + +import requests + +from openhands.core.logger import openhands_logger as logger +from openhands.runtime.builder import RuntimeBuilder + + +class RemoteRuntimeBuilder(RuntimeBuilder): + """This class interacts with the remote Runtime API for building and managing container images.""" + + def __init__(self, api_url: str, api_key: str): + self.api_url = api_url + self.api_key = api_key + + def build(self, path: str, tags: list[str]) -> str: + """Builds a Docker image using the Runtime API's /build endpoint.""" + # Create a tar archive of the build context + tar_buffer = io.BytesIO() + with tarfile.open(fileobj=tar_buffer, mode='w:gz') as tar: + tar.add(path, arcname='.') + tar_buffer.seek(0) + + # Encode the tar file as base64 + base64_encoded_tar = base64.b64encode(tar_buffer.getvalue()).decode('utf-8') + + # Prepare the multipart form data + files = [ + ('context', ('context.tar.gz', base64_encoded_tar)), + ('target_image', (None, tags[0])), + ] + + # Add additional tags if present + for tag in tags[1:]: + files.append(('tags', (None, tag))) + + # Send the POST request to /build + headers = {'X-API-Key': self.api_key} + response = requests.post(f'{self.api_url}/build', files=files, headers=headers) + + if response.status_code != 202: + logger.error(f'Build initiation failed: {response.text}') + raise RuntimeError(f'Build initiation failed: {response.text}') + + build_data = response.json() + build_id = build_data['build_id'] + logger.info(f'Build initiated with ID: {build_id}') + + # Poll /build_status until the build is complete + start_time = time.time() + timeout = 30 * 60 # 20 minutes in seconds + while True: + if time.time() - start_time > timeout: + logger.error('Build timed out after 30 minutes') + raise RuntimeError('Build timed out after 30 minutes') + + status_response = requests.get( + f'{self.api_url}/build_status', + params={'build_id': build_id}, + headers=headers, + ) + + if status_response.status_code != 200: + logger.error(f'Failed to get build status: {status_response.text}') + raise RuntimeError( + f'Failed to get build status: {status_response.text}' + ) + + status_data = status_response.json() + status = status_data['status'] + logger.info(f'Build status: {status}') + + if status == 'SUCCESS': + logger.info(f"Successfully built {status_data['image']}") + return status_data['image'] + elif status in [ + 'FAILURE', + 'INTERNAL_ERROR', + 'TIMEOUT', + 'CANCELLED', + 'EXPIRED', + ]: + error_message = status_data.get( + 'error', f'Build failed with status: {status}' + ) + logger.error(error_message) + raise RuntimeError(error_message) + + # Wait before polling again + time.sleep(5) + + def image_exists(self, image_name: str) -> bool: + """Checks if an image exists in the remote registry using the /image_exists endpoint.""" + params = {'image': image_name} + session = requests.Session() + session.headers.update({'X-API-Key': self.api_key}) + response = session.get(f'{self.api_url}/image_exists', params=params) + + if response.status_code != 200: + logger.error(f'Failed to check image existence: {response.text}') + raise RuntimeError(f'Failed to check image existence: {response.text}') + + result = response.json() + + if result['exists']: + logger.info( + f"Image {image_name} exists. " + f"Uploaded at: {result['image']['upload_time']}, " + f"Size: {result['image']['image_size_bytes'] / 1024 / 1024:.2f} MB" + ) + else: + logger.info(f'Image {image_name} does not exist.') + + return result['exists'] diff --git a/openhands/runtime/client/client.py b/openhands/runtime/client/client.py index 8816186c43..8ca4df3323 100644 --- a/openhands/runtime/client/client.py +++ b/openhands/runtime/client/client.py @@ -58,9 +58,7 @@ class ActionRequest(BaseModel): ROOT_GID = 0 INIT_COMMANDS = [ - 'git config --global user.name "openhands"', - 'git config --global user.email "openhands@all-hands.dev"', - "alias git='git --no-pager'", + 'git config --global user.name "openhands" && git config --global user.email "openhands@all-hands.dev" && alias git="git --no-pager"', ] @@ -187,7 +185,9 @@ class RuntimeClient: self.shell.sendline(f'export PS1="{self.__bash_PS1}"; export PS2=""') self.shell.expect(self.__bash_expect_regex) - self.shell.sendline(f'cd {work_dir}') + self.shell.sendline( + f'if [ ! -d "{work_dir}" ]; then mkdir -p "{work_dir}"; fi && cd "{work_dir}"' + ) self.shell.expect(self.__bash_expect_regex) logger.debug( f'Bash initialized. Working directory: {work_dir}. Output: {self.shell.before}' diff --git a/openhands/runtime/remote/runtime.py b/openhands/runtime/remote/runtime.py new file mode 100644 index 0000000000..1ff01edb28 --- /dev/null +++ b/openhands/runtime/remote/runtime.py @@ -0,0 +1,424 @@ +import asyncio +import os +import ssl +import tempfile +import uuid +from typing import Any, Optional, Type +from zipfile import ZipFile + +import aiohttp +import aiohttp.client_exceptions +import tenacity + +from openhands.core.config import AppConfig +from openhands.core.logger import openhands_logger as logger +from openhands.events import EventStream +from openhands.events.action import ( + BrowseInteractiveAction, + BrowseURLAction, + CmdRunAction, + FileReadAction, + FileWriteAction, + IPythonRunCellAction, +) +from openhands.events.action.action import Action +from openhands.events.observation import ( + ErrorObservation, + NullObservation, + Observation, +) +from openhands.events.serialization import event_to_dict, observation_from_dict +from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS +from openhands.runtime.builder.remote import RemoteRuntimeBuilder +from openhands.runtime.plugins import PluginRequirement +from openhands.runtime.runtime import Runtime +from openhands.runtime.utils.runtime_build import build_runtime_image + +DEFAULT_RETRY_EXCEPTIONS = [ + ssl.SSLCertVerificationError, + aiohttp.ClientError, + aiohttp.client_exceptions.ContentTypeError, + aiohttp.client_exceptions.ClientConnectorCertificateError, + ssl.SSLCertVerificationError, + asyncio.TimeoutError, +] + + +class RemoteRuntime(Runtime): + """This runtime will connect to a remote od-runtime-client.""" + + port: int = 60000 # default port for the remote runtime client + + def __init__( + self, + config: AppConfig, + event_stream: EventStream, + sid: str = 'default', + plugins: list[PluginRequirement] | None = None, + ): + super().__init__(config, event_stream, sid, plugins) + if self.config.sandbox.api_hostname == 'localhost': + self.config.sandbox.api_hostname = 'api.all-hands.dev/v0/runtime' + logger.warning( + 'Using localhost as the API hostname is not supported in the RemoteRuntime. Please set a proper hostname.\n' + 'Setting it to default value: api.all-hands.dev/v0/runtime' + ) + self.api_url = f'https://{self.config.sandbox.api_hostname.rstrip("/")}' + + self.session: Optional[aiohttp.ClientSession] = None + + self.action_semaphore = asyncio.Semaphore(1) # Ensure one action at a time + + if self.config.workspace_base is not None: + logger.warning( + 'Setting workspace_base is not supported in the remote runtime.' + ) + + if self.config.sandbox.api_key is None: + raise ValueError( + 'API key is required to use the remote runtime. ' + 'Please set the API key in the config (config.toml) or as an environment variable (SANDBOX_API_KEY).' + ) + self.runtime_builder = RemoteRuntimeBuilder( + self.api_url, self.config.sandbox.api_key + ) + self.runtime_id: str | None = None + self.runtime_url: str | None = None + + self.instance_id = ( + sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4()) + ) + if self.config.sandbox.runtime_container_image is not None: + raise ValueError( + 'Setting runtime_container_image is not supported in the remote runtime.' + ) + self.container_image: str = self.config.sandbox.base_container_image + self.container_name = 'od-remote-runtime-' + self.instance_id + logger.debug(f'RemoteRuntime `{sid}` config:\n{self.config}') + + async def _send_request( + self, + method: str, + url: str, + retry_exceptions: list[Type[Exception]] | None = None, + **kwargs: Any, + ) -> aiohttp.ClientResponse: + if retry_exceptions is None: + retry_exceptions = DEFAULT_RETRY_EXCEPTIONS + + session = await self._ensure_session() + + def log_retry(retry_state): + exception = retry_state.outcome.exception() + logger.warning( + f'Retry attempt {retry_state.attempt_number} failed with exception: {exception}' + ) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(10), + wait=tenacity.wait_exponential(multiplier=1, min=4, max=60), + retry=tenacity.retry_if_exception_type(tuple(retry_exceptions)), + reraise=True, + after=log_retry, + ) + async def _send_request_with_retry(): + async with session.request(method, url, **kwargs) as response: + await response.read() + return response + + return await _send_request_with_retry() + + async def ainit(self, env_vars: dict[str, str] | None = None): + # Check if the container image exists + # Use the /registry_prefix endpoint to get the registry prefix + response = await self._send_request('GET', f'{self.api_url}/registry_prefix') + if response.status != 200: + raise RuntimeError( + f'Failed to get registry prefix: {await response.text()}' + ) + response_json = await response.json() + registry_prefix = response_json['registry_prefix'] + os.environ['OD_RUNTIME_RUNTIME_IMAGE_REPO'] = ( + registry_prefix.rstrip('/') + '/runtime' + ) + logger.info( + f'Runtime image repo: {os.environ["OD_RUNTIME_RUNTIME_IMAGE_REPO"]}' + ) + + if self.config.sandbox.runtime_extra_deps: + logger.info( + f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}' + ) + + # Build the container image + self.container_image = build_runtime_image( + self.container_image, + self.runtime_builder, + extra_deps=self.config.sandbox.runtime_extra_deps, + ) + + # Use the /image_exists endpoint to check if the image exists + response = await self._send_request( + 'GET', + f'{self.api_url}/image_exists', + params={'image': self.container_image}, + ) + if response.status != 200 or not (await response.json())['exists']: + raise RuntimeError(f'Container image {self.container_image} does not exist') + + # Prepare the request body for the /start endpoint + plugin_arg = '' + if self.plugins is not None and len(self.plugins) > 0: + plugin_arg = ( + f'--plugins {" ".join([plugin.name for plugin in self.plugins])} ' + ) + if self.config.sandbox.browsergym_eval_env is not None: + browsergym_arg = ( + f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}' + ) + else: + browsergym_arg = '' + start_request = { + 'image': self.container_image, + 'command': ( + f'/openhands/miniforge3/bin/mamba run --no-capture-output -n base ' + 'PYTHONUNBUFFERED=1 poetry run ' + f'python -u -m openhands.runtime.client.client {self.port} ' + f'--working-dir {self.sandbox_workspace_dir} ' + f'{plugin_arg}' + f'--username {"openhands" if self.config.run_as_openhands else "root"} ' + f'--user-id {self.config.sandbox.user_id} ' + f'{browsergym_arg}' + ), + 'working_dir': '/openhands/code/', + 'name': self.container_name, + 'environment': {'DEBUG': 'true'} if self.config.debug else {}, + } + + # Start the sandbox using the /start endpoint + response = await self._send_request( + 'POST', f'{self.api_url}/start', json=start_request + ) + if response.status != 201: + raise RuntimeError(f'Failed to start sandbox: {await response.text()}') + start_response = await response.json() + self.runtime_id = start_response['runtime_id'] + self.runtime_url = start_response['url'] + + logger.info( + f'Sandbox started. Runtime ID: {self.runtime_id}, URL: {self.runtime_url}' + ) + + # Initialize environment variables + await super().ainit(env_vars) + + logger.info( + f'Runtime initialized with plugins: {[plugin.name for plugin in self.plugins]}' + ) + logger.info(f'Runtime initialized with env vars: {env_vars}') + assert ( + self.runtime_id is not None + ), 'Runtime ID is not set. This should never happen.' + assert ( + self.runtime_url is not None + ), 'Runtime URL is not set. This should never happen.' + + async def _ensure_session(self): + if self.session is None or self.session.closed: + self.session = aiohttp.ClientSession( + headers={'X-API-Key': self.config.sandbox.api_key} + ) + return self.session + + @tenacity.retry( + stop=tenacity.stop_after_attempt(10), + wait=tenacity.wait_exponential(multiplier=1, min=4, max=60), + retry=tenacity.retry_if_exception_type(RuntimeError), + reraise=True, + ) + async def _wait_until_alive(self): + logger.info('Waiting for sandbox to be alive...') + response = await self._send_request('GET', f'{self.runtime_url}/alive') + if response.status == 200: + return + else: + msg = f'Runtime is not alive (id={self.runtime_id}). Status: {response.status}.' + logger.warning(msg) + raise RuntimeError(msg) + + @property + def sandbox_workspace_dir(self): + return self.config.workspace_mount_path_in_sandbox + + async def close(self): + if self.runtime_id: + try: + response = await self._send_request( + 'POST', f'{self.api_url}/stop', json={'runtime_id': self.runtime_id} + ) + if response.status != 200: + logger.error(f'Failed to stop sandbox: {await response.text()}') + else: + logger.info(f'Sandbox stopped. Runtime ID: {self.runtime_id}') + except Exception as e: + raise e + finally: + if self.session is not None: + await self.session.close() + self.session = None + + async def run_action(self, action: Action) -> Observation: + if action.timeout is None: + action.timeout = self.config.sandbox.timeout + + async with self.action_semaphore: + if not action.runnable: + return NullObservation('') + action_type = action.action # type: ignore[attr-defined] + if action_type not in ACTION_TYPE_TO_CLASS: + return ErrorObservation(f'Action {action_type} does not exist.') + if not hasattr(self, action_type): + return ErrorObservation( + f'Action {action_type} is not supported in the current runtime.' + ) + + await self._wait_until_alive() + + assert action.timeout is not None + + try: + logger.info('Executing action') + request_body = {'action': event_to_dict(action)} + logger.debug(f'Request body: {request_body}') + response = await self._send_request( + 'POST', + f'{self.runtime_url}/execute_action', + json=request_body, + timeout=action.timeout, + retry_exceptions=list( + filter( + lambda e: e != asyncio.TimeoutError, + DEFAULT_RETRY_EXCEPTIONS, + ) + ), + ) + if response.status == 200: + output = await response.json() + obs = observation_from_dict(output) + obs._cause = action.id # type: ignore[attr-defined] + return obs + else: + error_message = await response.text() + logger.error(f'Error from server: {error_message}') + obs = ErrorObservation(f'Action execution failed: {error_message}') + except asyncio.TimeoutError: + logger.error('No response received within the timeout period.') + obs = ErrorObservation('Action execution timed out') + except Exception as e: + logger.error(f'Error during action execution: {e}') + obs = ErrorObservation(f'Action execution failed: {str(e)}') + return obs + + async def run(self, action: CmdRunAction) -> Observation: + return await self.run_action(action) + + async def run_ipython(self, action: IPythonRunCellAction) -> Observation: + return await self.run_action(action) + + async def read(self, action: FileReadAction) -> Observation: + return await self.run_action(action) + + async def write(self, action: FileWriteAction) -> Observation: + return await self.run_action(action) + + async def browse(self, action: BrowseURLAction) -> Observation: + return await self.run_action(action) + + async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: + return await self.run_action(action) + + async def copy_to( + self, host_src: str, sandbox_dest: str, recursive: bool = False + ) -> None: + if not os.path.exists(host_src): + raise FileNotFoundError(f'Source file {host_src} does not exist') + + await self._wait_until_alive() + try: + if recursive: + with tempfile.NamedTemporaryFile( + suffix='.zip', delete=False + ) as temp_zip: + temp_zip_path = temp_zip.name + + with ZipFile(temp_zip_path, 'w') as zipf: + for root, _, files in os.walk(host_src): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath( + file_path, os.path.dirname(host_src) + ) + zipf.write(file_path, arcname) + + upload_data = {'file': open(temp_zip_path, 'rb')} + else: + upload_data = {'file': open(host_src, 'rb')} + + params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()} + + response = await self._send_request( + 'POST', + f'{self.runtime_url}/upload_file', + data=upload_data, + params=params, + retry_exceptions=list( + filter( + lambda e: e != asyncio.TimeoutError, DEFAULT_RETRY_EXCEPTIONS + ) + ), + ) + if response.status == 200: + logger.info( + f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}. Response: {await response.text()}' + ) + return + else: + error_message = await response.text() + raise Exception(f'Copy operation failed: {error_message}') + except asyncio.TimeoutError: + raise TimeoutError('Copy operation timed out') + except Exception as e: + raise RuntimeError(f'Copy operation failed: {str(e)}') + finally: + if recursive: + os.unlink(temp_zip_path) + logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}') + + async def list_files(self, path: str | None = None) -> list[str]: + await self._wait_until_alive() + try: + data = {} + if path is not None: + data['path'] = path + + response = await self._send_request( + 'POST', + f'{self.runtime_url}/list_files', + json=data, + retry_exceptions=list( + filter( + lambda e: e != asyncio.TimeoutError, DEFAULT_RETRY_EXCEPTIONS + ) + ), + ) + if response.status == 200: + response_json = await response.json() + assert isinstance(response_json, list) + return response_json + else: + error_message = await response.text() + raise Exception(f'List files operation failed: {error_message}') + except asyncio.TimeoutError: + raise TimeoutError('List files operation timed out') + except Exception as e: + raise RuntimeError(f'List files operation failed: {str(e)}') diff --git a/openhands/runtime/runtime.py b/openhands/runtime/runtime.py index aca73d22f0..d59c10c0fe 100644 --- a/openhands/runtime/runtime.py +++ b/openhands/runtime/runtime.py @@ -87,16 +87,16 @@ class Runtime: def close_sync(self) -> None: try: - loop = asyncio.get_running_loop() - except RuntimeError: - # No running event loop, use asyncio.run() - asyncio.run(self.close()) - else: - # There is a running event loop, create a task + loop = asyncio.get_event_loop() + if loop.is_closed(): + return if loop.is_running(): loop.create_task(self.close()) else: loop.run_until_complete(self.close()) + except RuntimeError: + # Event loop is already closed, nothing to do + pass # ==================================================================== diff --git a/openhands/runtime/utils/runtime_build.py b/openhands/runtime/utils/runtime_build.py index 6f6fa5ba3e..cf1a2cbdd3 100644 --- a/openhands/runtime/utils/runtime_build.py +++ b/openhands/runtime/utils/runtime_build.py @@ -13,9 +13,9 @@ import openhands from openhands.core.logger import openhands_logger as logger from openhands.runtime.builder import DockerRuntimeBuilder, RuntimeBuilder -RUNTIME_IMAGE_REPO = os.getenv( - 'OD_RUNTIME_RUNTIME_IMAGE_REPO', 'ghcr.io/all-hands-ai/runtime' -) + +def get_runtime_image_repo(): + return os.getenv('OD_RUNTIME_RUNTIME_IMAGE_REPO', 'ghcr.io/all-hands-ai/runtime') def _get_package_version(): @@ -31,18 +31,27 @@ def _get_package_version(): return pyproject_data['tool']['poetry']['version'] -def _create_project_source_dist(): - """Create a source distribution of the project. +def _put_source_code_to_dir(temp_dir: str): + """Builds the project source tarball directly in temp_dir and unpacks it. + The OpenHands source code ends up in the temp_dir/code directory. - Returns: - - str: The path to the project tarball + Parameters: + - temp_dir (str): The directory to put the source code in """ project_root = os.path.dirname(os.path.dirname(os.path.abspath(openhands.__file__))) logger.info(f'Using project root: {project_root}') - # run "python -m build -s" on project_root to create project tarball + # Fetch the correct version from pyproject.toml + package_version = _get_package_version() + tarball_filename = f'openhands_ai-{package_version}.tar.gz' + tarball_path = os.path.join(temp_dir, tarball_filename) + + # Run "python -m build -s" on project_root to create project tarball directly in temp_dir + _cleaned_project_root = project_root.replace( + ' ', r'\ ' + ) # escape spaces in the project root result = subprocess.run( - 'python -m build -s ' + project_root.replace(' ', r'\ '), + f'python -m build -s -o {temp_dir} {_cleaned_project_root}', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -56,47 +65,20 @@ def _create_project_source_dist(): logger.error(f'Build failed: {result}') raise Exception(f'Build failed: {result}') - # Fetch the correct version from pyproject.toml - package_version = _get_package_version() - tarball_path = os.path.join( - project_root, 'dist', f'openhands_ai-{package_version}.tar.gz' - ) if not os.path.exists(tarball_path): logger.error(f'Source distribution not found at {tarball_path}') raise Exception(f'Source distribution not found at {tarball_path}') logger.info(f'Source distribution created at {tarball_path}') - return tarball_path - - -def _put_source_code_to_dir(temp_dir: str): - """Builds the project source tarball. Copies it to temp_dir and unpacks it. - The OpenHands source code ends up in the temp_dir/code directory - - Parameters: - - temp_dir (str): The directory to put the source code in - """ - project_tar = 'project.tar.gz' - project_path = os.path.join(temp_dir, project_tar) - logger.info('Building source distribution...') - - # Build the project source tarball - tarball_path = _create_project_source_dist() - filename = os.path.basename(tarball_path) - filename = filename.removesuffix('.tar.gz') - - # Move the project tarball to temp_dir - _res = shutil.copy(tarball_path, project_path) - if _res: - os.remove(tarball_path) - logger.info('Source distribution moved to ' + project_path) - # Unzip the tarball - shutil.unpack_archive(project_path, temp_dir) + shutil.unpack_archive(tarball_path, temp_dir) # Remove the tarball - os.remove(project_path) + os.remove(tarball_path) # Rename the directory containing the code to 'code' - os.rename(os.path.join(temp_dir, filename), os.path.join(temp_dir, 'code')) + os.rename( + os.path.join(temp_dir, f'openhands_ai-{package_version}'), + os.path.join(temp_dir, 'code'), + ) logger.info(f'Unpacked source code directory: {os.path.join(temp_dir, "code")}') @@ -187,7 +169,7 @@ def get_runtime_image_repo_and_tag(base_image: str) -> tuple[str, str]: - tuple[str, str]: The Docker repo and tag of the Docker image """ - if RUNTIME_IMAGE_REPO in base_image: + if get_runtime_image_repo() in base_image: logger.info( f'The provided image [{base_image}] is already a valid runtime image.\n' f'Will try to reuse it as is.' @@ -201,9 +183,11 @@ def get_runtime_image_repo_and_tag(base_image: str) -> tuple[str, str]: if ':' not in base_image: base_image = base_image + ':latest' [repo, tag] = base_image.split(':') - repo = repo.replace('/', '___') + # replace '/' with '_s_' to avoid '/' in the image name + # while make it a valid docker image name + repo = repo.replace('/', '_s_') od_version = _get_package_version() - return RUNTIME_IMAGE_REPO, f'od_v{od_version}_image_{repo}_tag_{tag}' + return get_runtime_image_repo(), f'od_v{od_version}_image_{repo}_tag_{tag}' def build_runtime_image( @@ -368,16 +352,16 @@ def _build_sandbox_image( target_image_generic_name = f'{target_image_repo}:{target_image_tag}' try: - success = runtime_builder.build( + image_name = runtime_builder.build( path=docker_folder, tags=[target_image_hash_name, target_image_generic_name] ) - if not success: + if not image_name: raise RuntimeError(f'Build failed for image {target_image_hash_name}') except Exception as e: logger.error(f'Sandbox image build failed: {e}') raise - return target_image_hash_name + return image_name if __name__ == '__main__': diff --git a/poetry.lock b/poetry.lock index 28992b1f4d..b5769e6429 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aenum" @@ -1607,6 +1607,20 @@ tensorflow-gpu = ["tensorflow-gpu (>=2.2.0,!=2.6.0,!=2.6.1)"] tests = ["Werkzeug (>=1.0.1)", "absl-py", "accelerate", "bert-score (>=0.3.6)", "cer (>=1.2.0)", "charcut (>=1.1.1)", "jiwer", "mauve-text", "nltk", "pytest", "pytest-datadir", "pytest-xdist", "requests-file (>=1.5.1)", "rouge-score (>=0.1.2)", "sacrebleu", "sacremoses", "scikit-learn", "scipy (>=1.10.0)", "sentencepiece", "seqeval", "six (>=1.15.0,<1.16.0)", "tensorflow (>=2.3,!=2.6.0,!=2.6.1,<=2.10)", "texttable (>=1.6.3)", "tldextract (>=3.1.0)", "toml (>=0.10.1)", "torch", "transformers", "trectools", "unidecode (>=1.3.4)"] torch = ["torch"] +[[package]] +name = "execnet" +version = "2.1.1" +description = "execnet: rapid multi-Python deployment" +optional = false +python-versions = ">=3.8" +files = [ + {file = "execnet-2.1.1-py3-none-any.whl", hash = "sha256:26dee51f1b80cebd6d0ca8e74dd8745419761d3bef34163928cbebbdc4749fdc"}, + {file = "execnet-2.1.1.tar.gz", hash = "sha256:5189b52c6121c24feae288166ab41b32549c7e2348652736540b9e6e7d4e72e3"}, +] + +[package.extras] +testing = ["hatch", "pre-commit", "pytest", "tox"] + [[package]] name = "executing" version = "2.0.1" @@ -6542,6 +6556,26 @@ files = [ py = "*" pytest = ">=3.10" +[[package]] +name = "pytest-xdist" +version = "3.6.1" +description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest_xdist-3.6.1-py3-none-any.whl", hash = "sha256:9ed4adfb68a016610848639bb7e02c9352d5d9f03d04809919e2dafc3be4cca7"}, + {file = "pytest_xdist-3.6.1.tar.gz", hash = "sha256:ead156a4db231eec769737f57668ef58a2084a34b2e55c4a8fa20d861107300d"}, +] + +[package.dependencies] +execnet = ">=2.1" +pytest = ">=7.0.0" + +[package.extras] +psutil = ["psutil (>=3.0)"] +setproctitle = ["setproctitle"] +testing = ["filelock"] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -9477,4 +9511,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "b7a2c28cf99b0e85de3148ab3edbeaf1e721ad8430f8c57cb0cc7f6ccafc5666" +content-hash = "d69e66db7f0ba4063db8c7d5f98313f536c514e843637ebdccc2b5ac02f0d54c" diff --git a/pyproject.toml b/pyproject.toml index 7fe5df8890..7692d2bc1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ pytest = "*" pytest-cov = "*" pytest-asyncio = "*" pytest-forked = "*" +pytest-xdist = "*" flake8 = "*" openai = "*" opencv-python = "*" @@ -84,6 +85,7 @@ reportlab = "*" [tool.coverage.run] concurrency = ["gevent"] + [tool.poetry.group.runtime.dependencies] jupyterlab = "*" notebook = "*" @@ -114,6 +116,7 @@ ignore = ["D1"] [tool.ruff.lint.pydocstyle] convention = "google" + [tool.poetry.group.evaluation.dependencies] streamlit = "*" whatthepatch = "*" diff --git a/tests/integration/test_agent.py b/tests/integration/test_agent.py index 54d59b834b..32954d4366 100644 --- a/tests/integration/test_agent.py +++ b/tests/integration/test_agent.py @@ -18,7 +18,7 @@ from openhands.events.observation.delegate import AgentDelegateObservation from openhands.runtime import get_runtime_cls TEST_RUNTIME = os.getenv('TEST_RUNTIME') -assert TEST_RUNTIME in ['eventstream', 'server'] +assert TEST_RUNTIME in ['eventstream', 'remote'] _ = get_runtime_cls(TEST_RUNTIME) # make sure it does not raise an error CONFIG = AppConfig( diff --git a/tests/runtime/conftest.py b/tests/runtime/conftest.py index b640f207cd..99fda5dd59 100644 --- a/tests/runtime/conftest.py +++ b/tests/runtime/conftest.py @@ -9,6 +9,7 @@ from openhands.core.config import AppConfig, SandboxConfig, load_from_env from openhands.events import EventStream from openhands.runtime.client.runtime import EventStreamRuntime from openhands.runtime.plugins import AgentSkillsRequirement, JupyterRequirement +from openhands.runtime.remote.runtime import RemoteRuntime from openhands.runtime.runtime import Runtime from openhands.storage import get_file_store @@ -34,6 +35,8 @@ def get_box_classes(): runtime = TEST_RUNTIME if runtime.lower() == 'eventstream': return [EventStreamRuntime] + elif runtime.lower() == 'remote': + return [RemoteRuntime] else: raise ValueError(f'Invalid runtime: {runtime}') diff --git a/tests/runtime/test_bash.py b/tests/runtime/test_bash.py index a4108c0ae5..f8197609ff 100644 --- a/tests/runtime/test_bash.py +++ b/tests/runtime/test_bash.py @@ -10,7 +10,6 @@ from conftest import _load_runtime from openhands.core.logger import openhands_logger as logger from openhands.events.action import CmdRunAction from openhands.events.observation import CmdOutputObservation -from openhands.runtime.client.runtime import EventStreamRuntime # ============================================================================================================================ # Bash-specific tests @@ -517,10 +516,11 @@ async def test_copy_non_existent_file(temp_dir, box_class): @pytest.mark.asyncio -async def test_keep_prompt(temp_dir): - # only EventStreamRuntime supports keep_prompt +async def test_keep_prompt(box_class, temp_dir): runtime = await _load_runtime( - temp_dir, box_class=EventStreamRuntime, run_as_openhands=False + temp_dir, + box_class=box_class, + run_as_openhands=False, ) action = CmdRunAction(command='touch /workspace/test_file.txt') diff --git a/tests/runtime/test_browsing.py b/tests/runtime/test_browsing.py index 6c2bdd408c..b8b3a3d05f 100644 --- a/tests/runtime/test_browsing.py +++ b/tests/runtime/test_browsing.py @@ -16,7 +16,6 @@ from openhands.events.observation import ( BrowserOutputObservation, CmdOutputObservation, ) -from openhands.runtime.client.runtime import EventStreamRuntime # ============================================================================================================================ # Browsing tests @@ -74,11 +73,10 @@ async def test_simple_browse(temp_dir, box_class, run_as_openhands): @pytest.mark.asyncio -async def test_browsergym_eval_env(temp_dir): +async def test_browsergym_eval_env(box_class, temp_dir): runtime = await _load_runtime( temp_dir, - # only supported in event stream runtime - box_class=EventStreamRuntime, + box_class=box_class, run_as_openhands=False, # need root permission to access file base_container_image='xingyaoww/od-eval-miniwob:v1.0', browsergym_eval_env='browsergym/miniwob.choose-list', diff --git a/tests/unit/test_runtime_build.py b/tests/unit/test_runtime_build.py index c608d6f6b8..3f8632ddc7 100644 --- a/tests/unit/test_runtime_build.py +++ b/tests/unit/test_runtime_build.py @@ -8,11 +8,11 @@ import toml from pytest import TempPathFactory from openhands.runtime.utils.runtime_build import ( - RUNTIME_IMAGE_REPO, _generate_dockerfile, _get_package_version, _put_source_code_to_dir, build_runtime_image, + get_runtime_image_repo, get_runtime_image_repo_and_tag, prep_docker_build_folder, ) @@ -175,22 +175,22 @@ def test_get_runtime_image_repo_and_tag_eventstream(): base_image = 'debian:11' img_repo, img_tag = get_runtime_image_repo_and_tag(base_image) assert ( - img_repo == f'{RUNTIME_IMAGE_REPO}' + img_repo == f'{get_runtime_image_repo()}' and img_tag == f'{OD_VERSION}_image_debian_tag_11' ) base_image = 'nikolaik/python-nodejs:python3.11-nodejs22' img_repo, img_tag = get_runtime_image_repo_and_tag(base_image) assert ( - img_repo == f'{RUNTIME_IMAGE_REPO}' + img_repo == f'{get_runtime_image_repo()}' and img_tag - == f'{OD_VERSION}_image_nikolaik___python-nodejs_tag_python3.11-nodejs22' + == f'{OD_VERSION}_image_nikolaik_s_python-nodejs_tag_python3.11-nodejs22' ) base_image = 'ubuntu' img_repo, img_tag = get_runtime_image_repo_and_tag(base_image) assert ( - img_repo == f'{RUNTIME_IMAGE_REPO}' + img_repo == f'{get_runtime_image_repo()}' and img_tag == f'{OD_VERSION}_image_ubuntu_tag_latest' ) @@ -207,18 +207,18 @@ def test_build_runtime_image_from_scratch(temp_dir): mock_runtime_builder = MagicMock() mock_runtime_builder.image_exists.return_value = False mock_runtime_builder.build.return_value = ( - f'{RUNTIME_IMAGE_REPO}:{from_scratch_hash}' + f'{get_runtime_image_repo()}:{from_scratch_hash}' ) image_name = build_runtime_image(base_image, mock_runtime_builder) mock_runtime_builder.build.assert_called_once_with( path=ANY, tags=[ - f'{RUNTIME_IMAGE_REPO}:{from_scratch_hash}', - f'{RUNTIME_IMAGE_REPO}:{OD_VERSION}_image_debian_tag_11', + f'{get_runtime_image_repo()}:{from_scratch_hash}', + f'{get_runtime_image_repo()}:{OD_VERSION}_image_debian_tag_11', ], ) - assert image_name == f'{RUNTIME_IMAGE_REPO}:{from_scratch_hash}' + assert image_name == f'{get_runtime_image_repo()}:{from_scratch_hash}' def test_build_runtime_image_exact_hash_exist(temp_dir): @@ -233,11 +233,11 @@ def test_build_runtime_image_exact_hash_exist(temp_dir): mock_runtime_builder = MagicMock() mock_runtime_builder.image_exists.return_value = True mock_runtime_builder.build.return_value = ( - f'{RUNTIME_IMAGE_REPO}:{from_scratch_hash}' + f'{get_runtime_image_repo()}:{from_scratch_hash}' ) image_name = build_runtime_image(base_image, mock_runtime_builder) - assert image_name == f'{RUNTIME_IMAGE_REPO}:{from_scratch_hash}' + assert image_name == f'{get_runtime_image_repo()}:{from_scratch_hash}' mock_runtime_builder.build.assert_not_called()