Xingyao Wang b19b724eae
feat: show exact python interpreter to the agent in IPython and Bash (#3448)
* try to fix pip unavailable

* update test case for pip

* force rebuild in CI

* remove extra symlink

* fix newline

* added semi-colon to line 31

* Dockerfile.j2: activate env at the end

* Revert "Dockerfile.j2: activate env at the end"

This reverts commit cf2f5651021fe80d4ab69a35a85f0a35b29dc3d7.

* cleanup Dockerfile

* switch default python image

* remove image agnostic (no longer used)

* fix tests

* simplify integration tests default image

* add nodejs specific runtime tests

* update tests and workflows

* switch to nikolaik/python-nodejs:python3.11-nodejs22

* update build sh to output image name correctly

* increase custom images to test

* fix test

* fix test

* fix double quote

* try fixing ci

* update ghcr workflow

* fix artifact name

* try to fix ghcr again

* fix workflow

* save built image to correct dir

* remove extra -docker-image

* make last tag to be human readable image tag

* fix hyphen to underscore

* run test runtime on all tags

* revert app build

* separate ghcr workflow

* update dockerfile for eval

* fix tag for test run

* try fix tag

* try fix tag via matrix output

* try workflow again

* update comments

* try fixing test matrix

* fix artifact name

* try fix tag again

* Revert "try fix tag again"

This reverts commit b369badd8cccf4a526e36d27eafb77ea2d32f6be.

* tweak filename

* try different path

* fix filepath

* try fix tag artifact path again

* save json instead of line

* update matrix

* print all tags in workflow

* support only streaming diff logs from the runtime client

* remove strip from log line to fix indentation

* get py interpreter for jupyter

* rstrip to remove newline on the rightside for logging

* fix blocking issue for stream logs

* set python interpreter path in bash ps1

* update testcase for jupyter py interpreter path

* remove accidentally added changes

* remove accidentally added changes

* only print dockerfile when debug

* add docs

* remove extra tests that weren't supposed to be in this pr

* add back missing test

* revert

* make LogBuffer synchronous to fix hang in integration tests

* fix integration tests

* Update opendevin/runtime/client/client.py

Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>

* fix test case

* fix integration tests

* change deque to list

* update integration tests

* rename test runtime

* fix docs

* rename opendevin to openhands in tests

---------

Co-authored-by: tobitege <tobitege@gmx.de>
Co-authored-by: Graham Neubig <neubig@gmail.com>
Co-authored-by: tobitege <10787084+tobitege@users.noreply.github.com>
Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
2024-08-21 20:08:50 +00:00

455 lines
17 KiB
Python

import asyncio
import os
import tempfile
import threading
import uuid
from zipfile import ZipFile
import aiohttp
import docker
import tenacity
from openhands.core.config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import (
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.action.action import Action
from openhands.events.observation import (
ErrorObservation,
NullObservation,
Observation,
)
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.runtime import Runtime
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.runtime_build import build_runtime_image
class LogBuffer:
"""
Synchronous buffer for Docker container logs.
This class provides a thread-safe way to collect, store, and retrieve logs
from a Docker container. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
"""
def __init__(self, container: docker.models.containers.Container):
self.buffer: list[str] = []
self.lock = threading.Lock()
self.log_generator = container.logs(stream=True, follow=True)
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()
self._stop_event = threading.Event()
def append(self, log_line: str):
with self.lock:
self.buffer.append(log_line)
def get_and_clear(self) -> list[str]:
with self.lock:
logs = list(self.buffer)
self.buffer.clear()
return logs
def stream_logs(self):
"""
Stream logs from the Docker container in a separate thread.
This method runs in its own thread to handle the blocking
operation of reading log lines from the Docker SDK's synchronous generator.
"""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
self.append(log_line.decode('utf-8').rstrip())
except Exception as e:
logger.error(f'Error in stream_logs: {e}')
def __del__(self):
if self.log_stream_thread.is_alive():
logger.warn(
"LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown."
)
self.close(timeout=5)
def close(self, timeout: float = 10.0):
self._stop_event.set()
self.log_stream_thread.join(timeout)
class EventStreamRuntime(Runtime):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the docker environment.
"""
container_name_prefix = 'openhands-sandbox-'
def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
container_image: str | None = None,
):
super().__init__(
config, event_stream, sid, plugins
) # will initialize the event stream
self._port = find_available_tcp_port()
self.api_url = f'http://{self.config.sandbox.api_hostname}:{self._port}'
self.session: aiohttp.ClientSession | None = None
self.instance_id = (
sid + '_' + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
)
# TODO: We can switch to aiodocker when `get_od_sandbox_image` is updated to use aiodocker
self.docker_client: docker.DockerClient = self._init_docker_client()
self.container_image = (
self.config.sandbox.container_image
if container_image is None
else container_image
)
self.container_name = self.container_name_prefix + self.instance_id
self.container = None
self.action_semaphore = asyncio.Semaphore(1) # Ensure one action at a time
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
logger.debug(f'EventStreamRuntime `{sid}` config:\n{self.config}')
# Buffer for container logs
self.log_buffer: LogBuffer | None = None
async def ainit(self, env_vars: dict[str, str] | None = None):
if self.config.sandbox.runtime_extra_deps:
logger.info(
f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}'
)
self.container_image = build_runtime_image(
self.container_image,
self.runtime_builder,
extra_deps=self.config.sandbox.runtime_extra_deps,
)
self.container = await self._init_container(
self.sandbox_workspace_dir,
mount_dir=self.config.workspace_mount_path,
plugins=self.plugins,
)
# MUST call super().ainit() to initialize both default env vars
# AND the ones in env vars!
await super().ainit(env_vars)
logger.info(
f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
)
logger.info(f'Container initialized with env vars: {env_vars}')
@staticmethod
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
)
raise ex
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
)
async def _init_container(
self,
sandbox_workspace_dir: str,
mount_dir: str | None = None,
plugins: list[PluginRequirement] | None = None,
):
try:
logger.info(
f'Starting container with image: {self.container_image} and name: {self.container_name}'
)
plugin_arg = ''
if plugins is not None and len(plugins) > 0:
plugin_arg = (
f'--plugins {" ".join([plugin.name for plugin in plugins])} '
)
network_mode: str | None = None
port_mapping: dict[str, int] | None = None
if self.config.sandbox.use_host_network:
network_mode = 'host'
logger.warn(
'Using host network mode. If you are using MacOS, please make sure you have the latest version of Docker Desktop and enabled host network feature: https://docs.docker.com/network/drivers/host/#docker-desktop'
)
else:
port_mapping = {f'{self._port}/tcp': self._port}
if mount_dir is not None:
volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
logger.info(f'Mount dir: {sandbox_workspace_dir}')
else:
logger.warn(
'Mount dir is not set, will not mount the workspace directory to the container.'
)
volumes = None
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
f'--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}'
)
else:
browsergym_arg = ''
container = self.docker_client.containers.run(
self.container_image,
command=(
f'/openhands/miniforge3/bin/mamba run --no-capture-output -n base '
'PYTHONUNBUFFERED=1 poetry run '
f'python -u -m openhands.runtime.client.client {self._port} '
f'--working-dir {sandbox_workspace_dir} '
f'{plugin_arg}'
f'--username {"openhands" if self.config.run_as_openhands else "root"} '
f'--user-id {self.config.sandbox.user_id} '
f'{browsergym_arg}'
),
network_mode=network_mode,
ports=port_mapping,
working_dir='/openhands/code/',
name=self.container_name,
detach=True,
environment={'DEBUG': 'true'} if self.config.debug else None,
volumes=volumes,
)
self.log_buffer = LogBuffer(container)
logger.info(f'Container started. Server url: {self.api_url}')
return container
except Exception as e:
logger.error('Failed to start container')
logger.exception(e)
await self.close(close_client=False)
raise e
async def _ensure_session(self):
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession()
return self.session
@tenacity.retry(
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_exponential(multiplier=2, min=10, max=60),
)
async def _wait_until_alive(self):
logger.debug('Getting container logs...')
# Print and clear the log buffer
assert (
self.log_buffer is not None
), 'Log buffer is expected to be initialized when container is started'
logs = self.log_buffer.get_and_clear()
if logs:
formatted_logs = '\n'.join([f' |{log}' for log in logs])
logger.info(
'\n'
+ '-' * 30
+ 'Container logs:'
+ '-' * 30
+ f'\n{formatted_logs}'
+ '\n'
+ '-' * 90
)
async with aiohttp.ClientSession() as session:
async with session.get(f'{self.api_url}/alive') as response:
if response.status == 200:
return
else:
msg = f'Action execution API is not alive. Response: {response}'
logger.error(msg)
raise RuntimeError(msg)
@property
def sandbox_workspace_dir(self):
return self.config.workspace_mount_path_in_sandbox
async def close(self, close_client: bool = True):
if self.log_buffer:
self.log_buffer.close()
if self.session is not None and not self.session.closed:
await self.session.close()
containers = self.docker_client.containers.list(all=True)
for container in containers:
try:
if container.name.startswith(self.container_name_prefix):
logs = container.logs(tail=1000).decode('utf-8')
logger.debug(
f'==== Container logs ====\n{logs}\n==== End of container logs ===='
)
container.remove(force=True)
except docker.errors.NotFound:
pass
if close_client:
self.docker_client.close()
async def run_action(self, action: Action) -> Observation:
# set timeout to default if not set
if action.timeout is None:
action.timeout = self.config.sandbox.timeout
async with self.action_semaphore:
if not action.runnable:
return NullObservation('')
action_type = action.action # type: ignore[attr-defined]
if action_type not in ACTION_TYPE_TO_CLASS:
return ErrorObservation(f'Action {action_type} does not exist.')
if not hasattr(self, action_type):
return ErrorObservation(
f'Action {action_type} is not supported in the current runtime.'
)
logger.info('Awaiting session')
session = await self._ensure_session()
await self._wait_until_alive()
assert action.timeout is not None
try:
logger.info('Executing command')
async with session.post(
f'{self.api_url}/execute_action',
json={'action': event_to_dict(action)},
timeout=action.timeout,
) as response:
if response.status == 200:
output = await response.json()
obs = observation_from_dict(output)
obs._cause = action.id # type: ignore[attr-defined]
return obs
else:
error_message = await response.text()
logger.error(f'Error from server: {error_message}')
obs = ErrorObservation(
f'Command execution failed: {error_message}'
)
except asyncio.TimeoutError:
logger.error('No response received within the timeout period.')
obs = ErrorObservation('Command execution timed out')
except Exception as e:
logger.error(f'Error during command execution: {e}')
obs = ErrorObservation(f'Command execution failed: {str(e)}')
return obs
async def run(self, action: CmdRunAction) -> Observation:
return await self.run_action(action)
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
return await self.run_action(action)
async def read(self, action: FileReadAction) -> Observation:
return await self.run_action(action)
async def write(self, action: FileWriteAction) -> Observation:
return await self.run_action(action)
async def browse(self, action: BrowseURLAction) -> Observation:
return await self.run_action(action)
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return await self.run_action(action)
# ====================================================================
# Implement these methods (for file operations) in the subclass
# ====================================================================
async def copy_to(
self, host_src: str, sandbox_dest: str, recursive: bool = False
) -> None:
if not os.path.exists(host_src):
raise FileNotFoundError(f'Source file {host_src} does not exist')
session = await self._ensure_session()
await self._wait_until_alive()
try:
if recursive:
# For recursive copy, create a zip file
with tempfile.NamedTemporaryFile(
suffix='.zip', delete=False
) as temp_zip:
temp_zip_path = temp_zip.name
with ZipFile(temp_zip_path, 'w') as zipf:
for root, _, files in os.walk(host_src):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(
file_path, os.path.dirname(host_src)
)
zipf.write(file_path, arcname)
upload_data = {'file': open(temp_zip_path, 'rb')}
else:
# For single file copy
upload_data = {'file': open(host_src, 'rb')}
params = {'destination': sandbox_dest, 'recursive': str(recursive).lower()}
async with session.post(
f'{self.api_url}/upload_file', data=upload_data, params=params
) as response:
if response.status == 200:
return
else:
error_message = await response.text()
raise Exception(f'Copy operation failed: {error_message}')
except asyncio.TimeoutError:
raise TimeoutError('Copy operation timed out')
except Exception as e:
raise RuntimeError(f'Copy operation failed: {str(e)}')
finally:
if recursive:
os.unlink(temp_zip_path)
logger.info(f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}')
async def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox.
If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
"""
session = await self._ensure_session()
await self._wait_until_alive()
try:
data = {}
if path is not None:
data['path'] = path
async with session.post(
f'{self.api_url}/list_files', json=data
) as response:
if response.status == 200:
response_json = await response.json()
assert isinstance(response_json, list)
return response_json
else:
error_message = await response.text()
raise Exception(f'List files operation failed: {error_message}')
except asyncio.TimeoutError:
raise TimeoutError('List files operation timed out')
except Exception as e:
raise RuntimeError(f'List files operation failed: {str(e)}')