OpenHands/tests/runtime/test_bash.py
Graham Neubig 689d3c9046
Update pre-commit hook versions to most recent versions (#8343)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-05-08 03:59:13 +00:00

1313 lines
52 KiB
Python

"""Bash-related tests for the DockerRuntime, which connects to the ActionExecutor running in the sandbox."""
import os
import sys
import time
from pathlib import Path
import pytest
from conftest import (
_close_test_runtime,
_load_runtime,
)
from openhands.core.logger import openhands_logger as logger
from openhands.events.action import CmdRunAction
from openhands.events.observation import CmdOutputObservation, ErrorObservation
from openhands.runtime.impl.local.local_runtime import LocalRuntime
# ============================================================================================================================
# Bash-specific tests
# ============================================================================================================================
# Helper function to determine if running on Windows
def is_windows():
return sys.platform == 'win32'
def _run_cmd_action(runtime, custom_command: str):
action = CmdRunAction(command=custom_command)
logger.info(action, extra={'msg_type': 'ACTION'})
obs = runtime.run_action(action)
assert isinstance(obs, (CmdOutputObservation, ErrorObservation))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
return obs
# Get platform-appropriate command
def get_platform_command(linux_cmd, windows_cmd):
return windows_cmd if is_windows() else linux_cmd
def test_bash_server(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Use python -u for unbuffered output, potentially helping capture initial output on Windows
action = CmdRunAction(command='python -u -m http.server 8081')
action.set_hard_timeout(1)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == -1
assert 'Serving HTTP on' in obs.content
assert (
"[The command timed out after 1.0 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, or send keys to interrupt/kill the command.]"
in obs.metadata.suffix
)
action = CmdRunAction(command='C-c', is_input=True)
action.set_hard_timeout(30)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0
if not is_windows():
# Linux/macOS behavior
assert 'Keyboard interrupt received, exiting.' in obs.content
assert config.workspace_mount_path_in_sandbox in obs.metadata.working_dir
else:
# Windows behavior: Stop-Job might not produce output, but exit code should be 0
# The working directory check might also be less relevant/predictable here
pass
# Verify the server is actually stopped by trying to start another one
# on the same port (regardless of OS)
action = CmdRunAction(command='ls')
action.set_hard_timeout(1)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0
# Check that the interrupt message is NOT present in subsequent output
assert 'Keyboard interrupt received, exiting.' not in obs.content
# Check working directory remains correct after interrupt handling
assert config.workspace_mount_path_in_sandbox in obs.metadata.working_dir
# run it again!
action = CmdRunAction(command='python -u -m http.server 8081')
action.set_hard_timeout(1)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == -1
assert 'Serving HTTP on' in obs.content
finally:
_close_test_runtime(runtime)
def test_bash_background_server(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
server_port = 8081
try:
# Start the server, expect it to timeout (run in background manner)
action = CmdRunAction(f'python3 -m http.server {server_port} &')
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0 # Should not timeout since this runs in background
# Give the server a moment to be ready
time.sleep(1)
# Verify the server is running by curling it
if is_windows():
curl_action = CmdRunAction(
f'Invoke-WebRequest -Uri http://localhost:{server_port} -UseBasicParsing | Select-Object -ExpandProperty Content'
)
else:
curl_action = CmdRunAction(f'curl http://localhost:{server_port}')
curl_obs = runtime.run_action(curl_action)
logger.info(curl_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(curl_obs, CmdOutputObservation)
assert curl_obs.exit_code == 0
# Check for content typical of python http.server directory listing
assert 'Directory listing for' in curl_obs.content
# Kill the server
if is_windows():
# Use PowerShell job management commands instead of trying to kill process directly
kill_action = CmdRunAction('Get-Job | Stop-Job')
else:
kill_action = CmdRunAction('pkill -f "http.server"')
kill_obs = runtime.run_action(kill_action)
logger.info(kill_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(kill_obs, CmdOutputObservation)
assert kill_obs.exit_code == 0
finally:
_close_test_runtime(runtime)
def test_multiline_commands(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
if is_windows():
# Windows PowerShell version using backticks for line continuation
obs = _run_cmd_action(runtime, 'Write-Output `\n "foo"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'foo' in obs.content
# test multiline output
obs = _run_cmd_action(runtime, 'Write-Output "hello`nworld"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'hello\nworld' in obs.content
# test whitespace
obs = _run_cmd_action(runtime, 'Write-Output "a`n`n`nz"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert '\n\n\n' in obs.content
else:
# Original Linux bash version
# single multiline command
obs = _run_cmd_action(runtime, 'echo \\\n -e "foo"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'foo' in obs.content
# test multiline echo
obs = _run_cmd_action(runtime, 'echo -e "hello\nworld"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'hello\nworld' in obs.content
# test whitespace
obs = _run_cmd_action(runtime, 'echo -e "a\\n\\n\\nz"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert '\n\n\n' in obs.content
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Test relies on Linux bash-specific complex commands'
)
def test_complex_commands(temp_dir, runtime_cls, run_as_openhands):
cmd = """count=0; tries=0; while [ $count -lt 3 ]; do result=$(echo "Heads"); tries=$((tries+1)); echo "Flip $tries: $result"; if [ "$result" = "Heads" ]; then count=$((count+1)); else count=0; fi; done; echo "Got 3 heads in a row after $tries flips!";"""
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
obs = _run_cmd_action(runtime, cmd)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'Got 3 heads in a row after 3 flips!' in obs.content
finally:
_close_test_runtime(runtime)
def test_no_ps2_in_output(temp_dir, runtime_cls, run_as_openhands):
"""Test that the PS2 sign is not added to the output of a multiline command."""
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
if is_windows():
obs = _run_cmd_action(runtime, 'Write-Output "hello`nworld"')
else:
obs = _run_cmd_action(runtime, 'echo -e "hello\nworld"')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'hello\nworld' in obs.content
assert '>' not in obs.content
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Test uses Linux-specific bash loops and sed commands'
)
def test_multiline_command_loop(temp_dir, runtime_cls):
# https://github.com/All-Hands-AI/OpenHands/issues/3143
init_cmd = """mkdir -p _modules && \
for month in {01..04}; do
for day in {01..05}; do
touch "_modules/2024-${month}-${day}-sample.md"
done
done && echo "created files"
"""
follow_up_cmd = """for file in _modules/*.md; do
new_date=$(echo $file | sed -E 's/2024-(01|02|03|04)-/2024-/;s/2024-01/2024-08/;s/2024-02/2024-09/;s/2024-03/2024-10/;s/2024-04/2024-11/')
mv "$file" "$new_date"
done && echo "success"
"""
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
obs = _run_cmd_action(runtime, init_cmd)
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'created files' in obs.content
obs = _run_cmd_action(runtime, follow_up_cmd)
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'success' in obs.content
finally:
_close_test_runtime(runtime)
def test_multiple_multiline_commands(temp_dir, runtime_cls, run_as_openhands):
if is_windows():
cmds = [
'Get-ChildItem',
'Write-Output "hello`nworld"',
"""Write-Output "hello it's me\"""",
"""Write-Output `
('hello ' + `
'world')""",
"""Write-Output 'hello\nworld\nare\nyou\nthere?'""",
"""Write-Output 'hello\nworld\nare\nyou\n\nthere?'""",
"""Write-Output 'hello\nworld "'""", # Escape the trailing double quote
]
else:
cmds = [
'ls -l',
'echo -e "hello\nworld"',
"""echo -e "hello it's me\"""",
"""echo \\
-e 'hello' \\
world""",
"""echo -e 'hello\\nworld\\nare\\nyou\\nthere?'""",
"""echo -e 'hello\nworld\nare\nyou\n\nthere?'""",
"""echo -e 'hello\nworld "'""",
]
joined_cmds = '\n'.join(cmds)
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# First test that running multiple commands at once fails
obs = _run_cmd_action(runtime, joined_cmds)
assert isinstance(obs, ErrorObservation)
assert 'Cannot execute multiple commands at once' in obs.content
# Now run each command individually and verify they work
results = []
for cmd in cmds:
obs = _run_cmd_action(runtime, cmd)
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0
results.append(obs.content)
# Verify all expected outputs are present
if is_windows():
assert '.git_config' in results[0] # Get-ChildItem
else:
assert 'total 0' in results[0] # ls -l
assert 'hello\nworld' in results[1] # echo -e "hello\nworld"
assert "hello it's me" in results[2] # echo -e "hello it\'s me"
assert 'hello world' in results[3] # echo -e 'hello' world
assert (
'hello\nworld\nare\nyou\nthere?' in results[4]
) # echo -e 'hello\nworld\nare\nyou\nthere?'
assert (
'hello\nworld\nare\nyou\n\nthere?' in results[5]
) # echo -e with literal newlines
assert 'hello\nworld "' in results[6] # echo -e with quote
finally:
_close_test_runtime(runtime)
def test_cmd_run(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
if is_windows():
# Windows PowerShell version
obs = _run_cmd_action(
runtime, f'Get-ChildItem -Path {config.workspace_mount_path_in_sandbox}'
)
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'Get-ChildItem')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'New-Item -ItemType Directory -Path test')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'Get-ChildItem')
assert obs.exit_code == 0
assert 'test' in obs.content
obs = _run_cmd_action(runtime, 'New-Item -ItemType File -Path test/foo.txt')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'Get-ChildItem test')
assert obs.exit_code == 0
assert 'foo.txt' in obs.content
# clean up
_run_cmd_action(runtime, 'Remove-Item -Recurse -Force test')
assert obs.exit_code == 0
else:
# Unix version
obs = _run_cmd_action(
runtime, f'ls -l {config.workspace_mount_path_in_sandbox}'
)
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'ls -l')
assert obs.exit_code == 0
assert 'total 0' in obs.content
obs = _run_cmd_action(runtime, 'mkdir test')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'ls -l')
assert obs.exit_code == 0
if run_as_openhands:
assert 'openhands' in obs.content
elif runtime_cls == LocalRuntime:
assert 'root' not in obs.content and 'openhands' not in obs.content
else:
assert 'root' in obs.content
assert 'test' in obs.content
obs = _run_cmd_action(runtime, 'touch test/foo.txt')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'ls -l test')
assert obs.exit_code == 0
assert 'foo.txt' in obs.content
# clean up: this is needed, since CI will not be
# run as root, and this test may leave a file
# owned by root
_run_cmd_action(runtime, 'rm -rf test')
assert obs.exit_code == 0
finally:
_close_test_runtime(runtime)
def test_run_as_user_correct_home_dir(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
if is_windows():
# Windows PowerShell version
obs = _run_cmd_action(runtime, 'cd $HOME && Get-Location')
assert obs.exit_code == 0
# Check for Windows-style home paths
if runtime_cls == LocalRuntime:
assert (
os.getenv('USERPROFILE') in obs.content
or os.getenv('HOME') in obs.content
)
# For non-local runtime, we are less concerned with precise paths
else:
# Original Linux version
obs = _run_cmd_action(runtime, 'cd ~ && pwd')
assert obs.exit_code == 0
if runtime_cls == LocalRuntime:
assert os.getenv('HOME') in obs.content
elif run_as_openhands:
assert '/home/openhands' in obs.content
else:
assert '/root' in obs.content
finally:
_close_test_runtime(runtime)
def test_multi_cmd_run_in_single_line(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
if is_windows():
# Windows PowerShell version using semicolon
obs = _run_cmd_action(runtime, 'Get-Location && Get-ChildItem')
assert obs.exit_code == 0
assert config.workspace_mount_path_in_sandbox in obs.content
assert '.git_config' in obs.content
else:
# Original Linux version using &&
obs = _run_cmd_action(runtime, 'pwd && ls -l')
assert obs.exit_code == 0
assert config.workspace_mount_path_in_sandbox in obs.content
assert 'total 0' in obs.content
finally:
_close_test_runtime(runtime)
def test_stateful_cmd(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
if is_windows():
# Windows PowerShell version
obs = _run_cmd_action(
runtime, 'New-Item -ItemType Directory -Path test -Force'
)
assert obs.exit_code == 0, 'The exit code should be 0.'
obs = _run_cmd_action(runtime, 'Set-Location test')
assert obs.exit_code == 0, 'The exit code should be 0.'
obs = _run_cmd_action(runtime, 'Get-Location')
assert obs.exit_code == 0, 'The exit code should be 0.'
# Account for both forward and backward slashes in path
norm_path = config.workspace_mount_path_in_sandbox.replace(
'\\', '/'
).replace('//', '/')
test_path = f'{norm_path}/test'.replace('//', '/')
assert test_path in obs.content.replace('\\', '/')
else:
# Original Linux version
obs = _run_cmd_action(runtime, 'mkdir -p test')
assert obs.exit_code == 0, 'The exit code should be 0.'
obs = _run_cmd_action(runtime, 'cd test')
assert obs.exit_code == 0, 'The exit code should be 0.'
obs = _run_cmd_action(runtime, 'pwd')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert f'{config.workspace_mount_path_in_sandbox}/test' in obs.content
finally:
_close_test_runtime(runtime)
def test_failed_cmd(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
obs = _run_cmd_action(runtime, 'non_existing_command')
assert obs.exit_code != 0, 'The exit code should not be 0 for a failed command.'
finally:
_close_test_runtime(runtime)
def _create_test_file(host_temp_dir):
# Single file
with open(os.path.join(host_temp_dir, 'test_file.txt'), 'w') as f:
f.write('Hello, World!')
def test_copy_single_file(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
sandbox_dir = config.workspace_mount_path_in_sandbox
sandbox_file = os.path.join(sandbox_dir, 'test_file.txt')
_create_test_file(temp_dir)
runtime.copy_to(os.path.join(temp_dir, 'test_file.txt'), sandbox_dir)
if is_windows():
obs = _run_cmd_action(runtime, f'Get-ChildItem -Path {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_file.txt' in obs.content
obs = _run_cmd_action(runtime, f'Get-Content {sandbox_file}')
assert obs.exit_code == 0
assert 'Hello, World!' in obs.content
else:
obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_file.txt' in obs.content
obs = _run_cmd_action(runtime, f'cat {sandbox_file}')
assert obs.exit_code == 0
assert 'Hello, World!' in obs.content
finally:
_close_test_runtime(runtime)
def _create_host_test_dir_with_files(test_dir):
logger.debug(f'creating `{test_dir}`')
if not os.path.isdir(test_dir):
os.makedirs(test_dir, exist_ok=True)
logger.debug('creating test files in `test_dir`')
with open(os.path.join(test_dir, 'file1.txt'), 'w') as f:
f.write('File 1 content')
with open(os.path.join(test_dir, 'file2.txt'), 'w') as f:
f.write('File 2 content')
def test_copy_directory_recursively(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
sandbox_dir = config.workspace_mount_path_in_sandbox
try:
temp_dir_copy = os.path.join(temp_dir, 'test_dir')
# We need a separate directory, since temp_dir is mounted to /workspace
_create_host_test_dir_with_files(temp_dir_copy)
runtime.copy_to(temp_dir_copy, sandbox_dir, recursive=True)
if is_windows():
obs = _run_cmd_action(runtime, f'Get-ChildItem -Path {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_dir' in obs.content
assert 'file1.txt' not in obs.content
assert 'file2.txt' not in obs.content
obs = _run_cmd_action(
runtime, f'Get-ChildItem -Path {sandbox_dir}/test_dir'
)
assert obs.exit_code == 0
assert 'file1.txt' in obs.content
assert 'file2.txt' in obs.content
obs = _run_cmd_action(
runtime, f'Get-Content {sandbox_dir}/test_dir/file1.txt'
)
assert obs.exit_code == 0
assert 'File 1 content' in obs.content
else:
obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_dir' in obs.content
assert 'file1.txt' not in obs.content
assert 'file2.txt' not in obs.content
obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}/test_dir')
assert obs.exit_code == 0
assert 'file1.txt' in obs.content
assert 'file2.txt' in obs.content
obs = _run_cmd_action(runtime, f'cat {sandbox_dir}/test_dir/file1.txt')
assert obs.exit_code == 0
assert 'File 1 content' in obs.content
finally:
_close_test_runtime(runtime)
def test_copy_to_non_existent_directory(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
sandbox_dir = config.workspace_mount_path_in_sandbox
_create_test_file(temp_dir)
runtime.copy_to(
os.path.join(temp_dir, 'test_file.txt'), f'{sandbox_dir}/new_dir'
)
obs = _run_cmd_action(runtime, f'cat {sandbox_dir}/new_dir/test_file.txt')
assert obs.exit_code == 0
assert 'Hello, World!' in obs.content
finally:
_close_test_runtime(runtime)
def test_overwrite_existing_file(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
sandbox_dir = config.workspace_mount_path_in_sandbox
sandbox_file = os.path.join(sandbox_dir, 'test_file.txt')
if is_windows():
# Check initial state
obs = _run_cmd_action(runtime, f'Get-ChildItem -Path {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_file.txt' not in obs.content
# Create an empty file
obs = _run_cmd_action(
runtime, f'New-Item -ItemType File -Path {sandbox_file} -Force'
)
assert obs.exit_code == 0
# Verify file exists and is empty
obs = _run_cmd_action(runtime, f'Get-ChildItem -Path {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_file.txt' in obs.content
obs = _run_cmd_action(runtime, f'Get-Content {sandbox_file}')
assert obs.exit_code == 0
assert obs.content.strip() == '' # Empty file
assert 'Hello, World!' not in obs.content
# Create host file and copy to overwrite
_create_test_file(temp_dir)
runtime.copy_to(os.path.join(temp_dir, 'test_file.txt'), sandbox_dir)
# Verify file content is overwritten
obs = _run_cmd_action(runtime, f'Get-Content {sandbox_file}')
assert obs.exit_code == 0
assert 'Hello, World!' in obs.content
else:
# Original Linux version
obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_file.txt' not in obs.content # Check initial state
obs = _run_cmd_action(runtime, f'touch {sandbox_file}')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, f'ls -alh {sandbox_dir}')
assert obs.exit_code == 0
assert 'test_file.txt' in obs.content
obs = _run_cmd_action(runtime, f'cat {sandbox_file}')
assert obs.exit_code == 0
assert obs.content.strip() == '' # Empty file
assert 'Hello, World!' not in obs.content
_create_test_file(temp_dir)
runtime.copy_to(os.path.join(temp_dir, 'test_file.txt'), sandbox_dir)
obs = _run_cmd_action(runtime, f'cat {sandbox_file}')
assert obs.exit_code == 0
assert 'Hello, World!' in obs.content
finally:
_close_test_runtime(runtime)
def test_copy_non_existent_file(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
try:
sandbox_dir = config.workspace_mount_path_in_sandbox
with pytest.raises(FileNotFoundError):
runtime.copy_to(
os.path.join(sandbox_dir, 'non_existent_file.txt'),
f'{sandbox_dir}/should_not_exist.txt',
)
obs = _run_cmd_action(runtime, f'ls {sandbox_dir}/should_not_exist.txt')
assert obs.exit_code != 0 # File should not exist
finally:
_close_test_runtime(runtime)
def test_copy_from_directory(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
sandbox_dir = config.workspace_mount_path_in_sandbox
try:
temp_dir_copy = os.path.join(temp_dir, 'test_dir')
# We need a separate directory, since temp_dir is mounted to /workspace
_create_host_test_dir_with_files(temp_dir_copy)
# Initial state
runtime.copy_to(temp_dir_copy, sandbox_dir, recursive=True)
path_to_copy_from = f'{sandbox_dir}/test_dir'
result = runtime.copy_from(path=path_to_copy_from)
# Result is returned as a path
assert isinstance(result, Path)
if result.exists() and not is_windows():
result.unlink()
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Test uses Linux-specific file permissions and sudo commands'
)
def test_git_operation(temp_dir, runtime_cls):
# do not mount workspace, since workspace mount by tests will be owned by root
# while the user_id we get via os.getuid() is different from root
# which causes permission issues
runtime, config = _load_runtime(
temp_dir=temp_dir,
use_workspace=False,
runtime_cls=runtime_cls,
# Need to use non-root user to expose issues
run_as_openhands=True,
)
# this will happen if permission of runtime is not properly configured
# fatal: detected dubious ownership in repository at config.workspace_mount_path_in_sandbox
try:
if runtime_cls != LocalRuntime:
obs = _run_cmd_action(runtime, 'sudo chown -R openhands:root .')
assert obs.exit_code == 0
# check the ownership of the current directory
obs = _run_cmd_action(runtime, 'ls -alh .')
assert obs.exit_code == 0
# drwx--S--- 2 openhands root 64 Aug 7 23:32 .
# drwxr-xr-x 1 root root 4.0K Aug 7 23:33 ..
for line in obs.content.split('\n'):
if runtime_cls == LocalRuntime:
continue # skip these checks
if ' ..' in line:
# parent directory should be owned by root
assert 'root' in line
assert 'openhands' not in line
elif ' .' in line:
# current directory should be owned by openhands
# and its group should be root
assert 'openhands' in line
assert 'root' in line
# make sure all git operations are allowed
obs = _run_cmd_action(runtime, 'git init')
assert obs.exit_code == 0
# create a file
obs = _run_cmd_action(runtime, 'echo "hello" > test_file.txt')
assert obs.exit_code == 0
if runtime_cls == LocalRuntime:
# set git config author in CI only, not on local machine
logger.info('Setting git config author')
obs = _run_cmd_action(
runtime,
'git config --file ./.git_config user.name "openhands" && git config --file ./.git_config user.email "openhands@all-hands.dev"',
)
assert obs.exit_code == 0
# Set up git config
obs = _run_cmd_action(runtime, 'git config --file ./.git_config')
assert obs.exit_code == 0
# git add
obs = _run_cmd_action(runtime, 'git add test_file.txt')
assert obs.exit_code == 0
# git diff
obs = _run_cmd_action(runtime, 'git diff --no-color --cached')
assert obs.exit_code == 0
assert 'b/test_file.txt' in obs.content
assert '+hello' in obs.content
# git commit
obs = _run_cmd_action(runtime, 'git commit -m "test commit"')
assert obs.exit_code == 0
finally:
_close_test_runtime(runtime)
def test_python_version(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
obs = runtime.run_action(CmdRunAction(command='python --version'))
assert isinstance(obs, CmdOutputObservation), (
'The observation should be a CmdOutputObservation.'
)
assert obs.exit_code == 0, 'The exit code should be 0.'
assert 'Python 3' in obs.content, 'The output should contain "Python 3".'
finally:
_close_test_runtime(runtime)
def test_pwd_property(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Create a subdirectory and verify pwd updates
obs = _run_cmd_action(runtime, 'mkdir -p random_dir')
assert obs.exit_code == 0
obs = _run_cmd_action(runtime, 'cd random_dir && pwd')
assert obs.exit_code == 0
assert 'random_dir' in obs.content
finally:
_close_test_runtime(runtime)
def test_basic_command(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
if is_windows():
# Test simple command
obs = _run_cmd_action(runtime, "Write-Output 'hello world'")
assert 'hello world' in obs.content
assert obs.exit_code == 0
# Test command with error
obs = _run_cmd_action(runtime, 'nonexistent_command')
assert obs.exit_code != 0
assert 'not recognized' in obs.content or 'command not found' in obs.content
# Test command with special characters
obs = _run_cmd_action(
runtime, 'Write-Output "hello world with`nspecial chars"'
)
assert 'hello world with\nspecial chars' in obs.content
assert obs.exit_code == 0
# Test multiple commands in sequence
obs = _run_cmd_action(
runtime,
'Write-Output "first" && Write-Output "second" && Write-Output "third"',
)
assert 'first' in obs.content
assert 'second' in obs.content
assert 'third' in obs.content
assert obs.exit_code == 0
else:
# Original Linux version
# Test simple command
obs = _run_cmd_action(runtime, "echo 'hello world'")
assert 'hello world' in obs.content
assert obs.exit_code == 0
# Test command with error
obs = _run_cmd_action(runtime, 'nonexistent_command')
assert obs.exit_code == 127
assert 'nonexistent_command: command not found' in obs.content
# Test command with special characters
obs = _run_cmd_action(
runtime, "echo 'hello world with\nspecial chars'"
)
assert 'hello world with\nspecial chars' in obs.content
assert obs.exit_code == 0
# Test multiple commands in sequence
obs = _run_cmd_action(
runtime, 'echo "first" && echo "second" && echo "third"'
)
assert 'first' in obs.content
assert 'second' in obs.content
assert 'third' in obs.content
assert obs.exit_code == 0
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Powershell does not support interactive commands'
)
def test_interactive_command(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(
temp_dir,
runtime_cls,
run_as_openhands,
runtime_startup_env_vars={'NO_CHANGE_TIMEOUT_SECONDS': '1'},
)
try:
# Test interactive command
action = CmdRunAction('read -p "Enter name: " name && echo "Hello $name"')
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
# This should trigger SOFT timeout, so no need to set hard timeout
assert 'Enter name:' in obs.content
assert '[The command has no new output after 1 seconds.' in obs.metadata.suffix
action = CmdRunAction('John', is_input=True)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Hello John' in obs.content
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
# Test multiline command input with here document
action = CmdRunAction("""cat << EOF
line 1
line 2
EOF""")
obs = runtime.run_action(action)
assert 'line 1\nline 2' in obs.content
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
assert obs.exit_code == 0
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(),
reason='Test relies on Linux-specific commands like seq and bash for loops',
)
def test_long_output(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Generate a long output
action = CmdRunAction('for i in $(seq 1 5000); do echo "Line $i"; done')
action.set_hard_timeout(10)
obs = runtime.run_action(action)
assert obs.exit_code == 0
assert 'Line 1' in obs.content
assert 'Line 5000' in obs.content
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(),
reason='Test relies on Linux-specific commands like seq and bash for loops',
)
def test_long_output_exceed_history_limit(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Generate a long output
action = CmdRunAction('for i in $(seq 1 50000); do echo "Line $i"; done')
action.set_hard_timeout(30)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert obs.exit_code == 0
assert 'Previous command outputs are truncated' in obs.metadata.prefix
assert 'Line 40000' in obs.content
assert 'Line 50000' in obs.content
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Test uses Linux-specific temp directory and bash for loops'
)
def test_long_output_from_nested_directories(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Create nested directories with many files
setup_cmd = 'mkdir -p /tmp/test_dir && cd /tmp/test_dir && for i in $(seq 1 100); do mkdir -p "folder_$i"; for j in $(seq 1 100); do touch "folder_$i/file_$j.txt"; done; done'
setup_action = CmdRunAction(setup_cmd.strip())
setup_action.set_hard_timeout(60)
obs = runtime.run_action(setup_action)
assert obs.exit_code == 0
# List the directory structure recursively
action = CmdRunAction('ls -R /tmp/test_dir')
action.set_hard_timeout(60)
obs = runtime.run_action(action)
assert obs.exit_code == 0
# Verify output contains expected files
assert 'folder_1' in obs.content
assert 'file_1.txt' in obs.content
assert 'folder_100' in obs.content
assert 'file_100.txt' in obs.content
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(),
reason='Test uses Linux-specific commands like find and grep with complex syntax',
)
def test_command_backslash(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Create a file with the content "implemented_function"
action = CmdRunAction(
'mkdir -p /tmp/test_dir && echo "implemented_function" > /tmp/test_dir/file_1.txt'
)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert obs.exit_code == 0
# Reproduce an issue we ran into during evaluation
# find /workspace/sympy__sympy__1.0 -type f -exec grep -l "implemented_function" {} \;
# find: missing argument to `-exec'
# --> This is unexpected output due to incorrect escaping of \;
# This tests for correct escaping of \;
action = CmdRunAction(
'find /tmp/test_dir -type f -exec grep -l "implemented_function" {} \\;'
)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert obs.exit_code == 0
assert '/tmp/test_dir/file_1.txt' in obs.content
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Test uses Linux-specific ps aux, awk, and grep commands'
)
def test_stress_long_output_with_soft_and_hard_timeout(
temp_dir, runtime_cls, run_as_openhands
):
runtime, config = _load_runtime(
temp_dir,
runtime_cls,
run_as_openhands,
runtime_startup_env_vars={'NO_CHANGE_TIMEOUT_SECONDS': '1'},
docker_runtime_kwargs={
'cpu_period': 100000, # 100ms
'cpu_quota': 100000, # Can use 100ms out of each 100ms period (1 CPU)
'mem_limit': '4G', # 4 GB of memory
},
)
try:
# Run a command that generates long output multiple times
for i in range(10):
start_time = time.time()
# Check tmux memory usage (in KB)
mem_action = CmdRunAction(
'ps aux | awk \'{printf "%8.1f KB %s\\n", $6, $0}\' | sort -nr | grep "/usr/bin/tmux" | grep -v grep | awk \'{print $1}\''
)
mem_obs = runtime.run_action(mem_action)
assert mem_obs.exit_code == 0
logger.info(
f'Tmux memory usage (iteration {i}): {mem_obs.content.strip()} KB'
)
# Check action_execution_server mem
mem_action = CmdRunAction(
'ps aux | awk \'{printf "%8.1f KB %s\\n", $6, $0}\' | sort -nr | grep "action_execution_server" | grep "/openhands/poetry" | grep -v grep | awk \'{print $1}\''
)
mem_obs = runtime.run_action(mem_action)
assert mem_obs.exit_code == 0
logger.info(
f'Action execution server memory usage (iteration {i}): {mem_obs.content.strip()} KB'
)
# Test soft timeout
action = CmdRunAction(
'read -p "Do you want to continue? [Y/n] " answer; if [[ $answer == "Y" ]]; then echo "Proceeding with operation..."; echo "Operation completed successfully!"; else echo "Operation cancelled."; exit 1; fi'
)
obs = runtime.run_action(action)
assert 'Do you want to continue?' in obs.content
assert obs.exit_code == -1 # Command is still running, waiting for input
# Send the confirmation
action = CmdRunAction('Y', is_input=True)
obs = runtime.run_action(action)
assert 'Proceeding with operation...' in obs.content
assert 'Operation completed successfully!' in obs.content
assert obs.exit_code == 0
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
# Test hard timeout w/ long output
# Generate long output with 1000 asterisks per line
action = CmdRunAction(
f'export i={i}; for j in $(seq 1 100); do echo "Line $j - Iteration $i - $(printf \'%1000s\' | tr " " "*")"; sleep 1; done'
)
action.set_hard_timeout(2)
obs = runtime.run_action(action)
# Verify the output
assert obs.exit_code == -1
assert f'Line 1 - Iteration {i}' in obs.content
# assert f'Line 1000 - Iteration {i}' in obs.content
# assert '[The command completed with exit code 0.]' in obs.metadata.suffix
# Because hard-timeout is triggered, the terminal will in a weird state
# where it will not accept any new commands.
obs = runtime.run_action(CmdRunAction('ls'))
assert obs.exit_code == -1
assert 'The previous command is still running' in obs.metadata.suffix
# We need to send a Ctrl+C to reset the terminal.
obs = runtime.run_action(CmdRunAction('C-c', is_input=True))
assert obs.exit_code == 130
# Now make sure the terminal is in a good state
obs = runtime.run_action(CmdRunAction('ls'))
assert obs.exit_code == 0
duration = time.time() - start_time
logger.info(f'Completed iteration {i} in {duration:.2f} seconds')
finally:
_close_test_runtime(runtime)
def test_command_output_continuation(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
if is_windows():
# Windows PowerShell version
action = CmdRunAction(
'1..5 | ForEach-Object { Write-Output $_; Start-Sleep 3 }'
)
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert obs.content.strip() == '1'
assert obs.metadata.prefix == ''
assert '[The command timed out after 2.5 seconds.' in obs.metadata.suffix
# Continue watching output
action = CmdRunAction('')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert (
'[Below is the output of the previous command.]' in obs.metadata.prefix
)
assert obs.content.strip() == '2'
assert '[The command timed out after 2.5 seconds.' in obs.metadata.suffix
# Continue until completion
for expected in ['3', '4', '5']:
action = CmdRunAction('')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert (
'[Below is the output of the previous command.]'
in obs.metadata.prefix
)
assert obs.content.strip() == expected
assert (
'[The command timed out after 2.5 seconds.' in obs.metadata.suffix
)
# Final empty command to complete
action = CmdRunAction('')
obs = runtime.run_action(action)
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
else:
# Original Linux version
# Start a command that produces output slowly
action = CmdRunAction('for i in {1..5}; do echo $i; sleep 3; done')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert obs.content.strip() == '1'
assert obs.metadata.prefix == ''
assert '[The command timed out after 2.5 seconds.' in obs.metadata.suffix
# Continue watching output
action = CmdRunAction('')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert (
'[Below is the output of the previous command.]' in obs.metadata.prefix
)
assert obs.content.strip() == '2'
assert '[The command timed out after 2.5 seconds.' in obs.metadata.suffix
# Continue until completion
for expected in ['3', '4', '5']:
action = CmdRunAction('')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert (
'[Below is the output of the previous command.]'
in obs.metadata.prefix
)
assert obs.content.strip() == expected
assert (
'[The command timed out after 2.5 seconds.' in obs.metadata.suffix
)
# Final empty command to complete
action = CmdRunAction('')
obs = runtime.run_action(action)
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
finally:
_close_test_runtime(runtime)
def test_long_running_command_follow_by_execute(
temp_dir, runtime_cls, run_as_openhands
):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
if is_windows():
action = CmdRunAction('1..3 | ForEach-Object { Write-Output $_; sleep 3 }')
else:
# Test command that produces output slowly
action = CmdRunAction('for i in {1..3}; do echo $i; sleep 3; done')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert '1' in obs.content # First number should appear before timeout
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
assert '[The command timed out after 2.5 seconds.' in obs.metadata.suffix
assert obs.metadata.prefix == ''
# Continue watching output
action = CmdRunAction('')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
assert '2' in obs.content
assert obs.metadata.prefix == '[Below is the output of the previous command.]\n'
assert '[The command timed out after 2.5 seconds.' in obs.metadata.suffix
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
# Test command that produces no output
action = CmdRunAction('sleep 15')
action.set_hard_timeout(2.5)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert '3' not in obs.content
assert obs.metadata.prefix == '[Below is the output of the previous command.]\n'
assert 'The previous command is still running' in obs.metadata.suffix
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
# Finally continue again
action = CmdRunAction('')
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert '3' in obs.content
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
finally:
_close_test_runtime(runtime)
def test_empty_command_errors(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Test empty command without previous command - behavior should be the same on all platforms
obs = runtime.run_action(CmdRunAction(''))
assert isinstance(obs, CmdOutputObservation)
assert (
'ERROR: No previous running command to retrieve logs from.' in obs.content
)
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Powershell does not support interactive commands'
)
def test_python_interactive_input(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Test Python program that asks for input - same for both platforms
python_script = """name = input('Enter your name: '); age = input('Enter your age: '); print(f'Hello {name}, you are {age} years old')"""
# Start Python with the interactive script
# For both platforms we can use the same command
obs = runtime.run_action(CmdRunAction(f'python -c "{python_script}"'))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Enter your name:' in obs.content
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
# Send first input (name)
obs = runtime.run_action(CmdRunAction('Alice', is_input=True))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Enter your age:' in obs.content
assert obs.metadata.exit_code == -1
# Send second input (age)
obs = runtime.run_action(CmdRunAction('25', is_input=True))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Hello Alice, you are 25 years old' in obs.content
assert obs.metadata.exit_code == 0
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
finally:
_close_test_runtime(runtime)
@pytest.mark.skipif(
is_windows(), reason='Powershell does not support interactive commands'
)
def test_python_interactive_input_without_set_input(
temp_dir, runtime_cls, run_as_openhands
):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# Test Python program that asks for input
python_script = """name = input('Enter your name: '); age = input('Enter your age: '); print(f'Hello {name}, you are {age} years old')"""
# Start Python with the interactive script
obs = runtime.run_action(CmdRunAction(f'python -c "{python_script}"'))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Enter your name:' in obs.content
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
# Send first input (name)
obs = runtime.run_action(CmdRunAction('Alice', is_input=False))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Enter your age:' not in obs.content
assert (
'Your command "Alice" is NOT executed. The previous command is still running'
in obs.metadata.suffix
)
assert obs.metadata.exit_code == -1
# Try again now with input
obs = runtime.run_action(CmdRunAction('Alice', is_input=True))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Enter your age:' in obs.content
assert obs.metadata.exit_code == -1
obs = runtime.run_action(CmdRunAction('25', is_input=True))
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert 'Hello Alice, you are 25 years old' in obs.content
assert obs.metadata.exit_code == 0
assert '[The command completed with exit code 0.]' in obs.metadata.suffix
finally:
_close_test_runtime(runtime)
def test_bash_remove_prefix(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
# create a git repo - same for both platforms
action = CmdRunAction(
'git init && git remote add origin https://github.com/All-Hands-AI/OpenHands'
)
obs = runtime.run_action(action)
# logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert obs.metadata.exit_code == 0
# Check git remote - same for both platforms
obs = runtime.run_action(CmdRunAction('git remote -v'))
# logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert obs.metadata.exit_code == 0
assert 'https://github.com/All-Hands-AI/OpenHands' in obs.content
assert 'git remote -v' not in obs.content
finally:
_close_test_runtime(runtime)