Add CLIRuntime implementation for local command execution (#8264)

Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
Co-authored-by: Engel Nyst <engel.nyst@gmail.com>
This commit is contained in:
Robert Brennan
2025-05-22 23:03:22 -04:00
committed by GitHub
parent f7cdf4720f
commit 50d4c79094
39 changed files with 1564 additions and 138 deletions

View File

@@ -47,8 +47,10 @@ jobs:
run: poetry install --without evaluation
- name: Build Environment
run: make build
- name: Run Tests
- name: Run Unit Tests
run: poetry run pytest --forked -n auto -svv ./tests/unit
- name: Run Runtime Tests with CLIRuntime
run: TEST_RUNTIME=cli poetry run pytest -svv tests/runtime/test_bash.py
# Run specific Windows python tests
test-on-windows:
@@ -72,5 +74,5 @@ jobs:
run: poetry install --without evaluation
- name: Run Windows unit tests
run: poetry run pytest -svv tests/unit/test_windows_bash.py
- name: Run Windows runtime tests
- name: Run Windows runtime tests with LocalRuntime
run: $env:TEST_RUNTIME="local"; poetry run pytest -svv tests/runtime/test_bash.py

View File

@@ -1,4 +1,3 @@
import copy
import os
import sys
from collections import deque
@@ -10,7 +9,6 @@ if TYPE_CHECKING:
from openhands.events.action import Action
from openhands.llm.llm import ModelResponse
from openhands.llm.llm_utils import check_tools
import openhands.agenthub.codeact_agent.function_calling as codeact_function_calling
from openhands.agenthub.codeact_agent.tools.bash import create_cmd_run_tool
from openhands.agenthub.codeact_agent.tools.browser import BrowserTool
@@ -29,6 +27,7 @@ from openhands.core.message import Message
from openhands.events.action import AgentFinishAction, MessageAction
from openhands.events.event import Event
from openhands.llm.llm import LLM
from openhands.llm.llm_utils import check_tools
from openhands.memory.condenser import Condenser
from openhands.memory.condenser.condenser import Condensation, View
from openhands.memory.conversation_memory import ConversationMemory

View File

@@ -29,7 +29,6 @@ from openhands.events.action import (
AgentFinishAction,
AgentThinkAction,
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileEditAction,
FileReadAction,

View File

@@ -32,7 +32,6 @@ from openhands.events.action import (
Action,
AgentFinishAction,
AgentThinkAction,
BrowseURLAction,
CmdRunAction,
FileReadAction,
MCPAction,
@@ -45,6 +44,9 @@ from openhands.events.tool import ToolCallMetadata
def grep_to_cmdrun(
pattern: str, path: str | None = None, include: str | None = None
) -> str:
# NOTE: This function currently relies on `rg` (ripgrep).
# `rg` may not be installed when using CLIRuntime or LocalRuntime.
# TODO: Implement a fallback to `grep` if `rg` is not available.
"""Convert grep tool arguments to a shell command string.
Args:
@@ -75,6 +77,9 @@ def grep_to_cmdrun(
def glob_to_cmdrun(pattern: str, path: str = '.') -> str:
# NOTE: This function currently relies on `rg` (ripgrep).
# `rg` may not be installed when using CLIRuntime or LocalRuntime
# TODO: Implement a fallback to `find` if `rg` is not available.
"""Convert glob tool arguments to a shell command string.
Args:

View File

@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import sys
from prompt_toolkit.shortcuts import clear
@@ -21,6 +22,7 @@ from openhands.cli.tui import (
process_agent_pause,
read_confirmation_input,
read_prompt_input,
update_streaming_output,
)
from openhands.cli.utils import (
update_usage_metrics,
@@ -70,31 +72,30 @@ async def cleanup_session(
controller: AgentController,
) -> None:
"""Clean up all resources from the current session."""
event_stream = runtime.event_stream
end_state = controller.get_state()
end_state.save_to_session(
event_stream.sid,
event_stream.file_store,
event_stream.user_id,
)
try:
# Cancel all running tasks except the current one
current_task = asyncio.current_task(loop)
pending = [task for task in asyncio.all_tasks(loop) if task is not current_task]
if pending:
done, pending_set = await asyncio.wait(set(pending), timeout=2.0)
pending = list(pending_set)
for task in pending:
task.cancel()
# Wait for all tasks to complete with a timeout
if pending:
await asyncio.wait(pending, timeout=5.0)
event_stream = runtime.event_stream
# Save the final state
end_state = controller.get_state()
end_state.save_to_session(
event_stream.sid,
event_stream.file_store,
event_stream.user_id,
)
# Reset agent, close runtime and controller
agent.reset()
runtime.close()
await controller.close()
except Exception as e:
logger.error(f'Error during session cleanup: {e}')
@@ -132,6 +133,12 @@ async def run_session(
agent=agent,
)
def stream_to_console(output: str) -> None:
# Instead of printing to stdout, pass the string to the TUI module
update_streaming_output(output)
runtime.subscribe_to_shell_stream(stream_to_console)
controller, initial_state = create_controller(agent, runtime, config)
event_stream = runtime.event_stream
@@ -356,6 +363,20 @@ async def main(loop: asyncio.AbstractEventLoop) -> None:
config.set_agent_config(agent_config)
config.enable_default_condenser = False
# Determine if CLI defaults should be overridden
val_override = args.override_cli_mode
should_override_cli_defaults = (
val_override is True
or (isinstance(val_override, str) and val_override.lower() in ('true', '1'))
or (isinstance(val_override, int) and val_override == 1)
)
if not should_override_cli_defaults:
config.runtime = 'cli'
if not config.workspace_base:
config.workspace_base = os.getcwd()
config.security.confirmation_mode = True
# TODO: Set working directory from config or use current working directory?
current_dir = config.workspace_base

View File

@@ -220,7 +220,6 @@ async def modify_llm_settings_basic(
config.set_llm_config(llm_config)
config.default_agent = OH_DEFAULT_AGENT
config.security.confirmation_mode = False
config.enable_default_condenser = True
agent_config = config.get_agent_config(config.default_agent)
@@ -238,7 +237,6 @@ async def modify_llm_settings_basic(
settings.llm_api_key = SecretStr(api_key)
settings.llm_base_url = None
settings.agent = OH_DEFAULT_AGENT
settings.confirmation_mode = False
settings.enable_default_condenser = True
await settings_store.store(settings)

View File

@@ -41,11 +41,17 @@ from openhands.events.event import Event
from openhands.events.observation import (
AgentStateChangedObservation,
CmdOutputObservation,
ErrorObservation,
FileEditObservation,
FileReadObservation,
)
from openhands.llm.metrics import Metrics
ENABLE_STREAMING = False # FIXME: this doesn't work
# Global TextArea for streaming output
streaming_output_text_area: TextArea | None = None
# Color and styling constants
COLOR_GOLD = '#FFD700'
COLOR_GREY = '#808080'
@@ -175,6 +181,7 @@ def display_initial_user_prompt(prompt: str) -> None:
# Prompt output display functions
def display_event(event: Event, config: AppConfig) -> None:
global streaming_output_text_area
with print_lock:
if isinstance(event, Action):
if hasattr(event, 'thought'):
@@ -186,6 +193,8 @@ def display_event(event: Event, config: AppConfig) -> None:
display_message(event.content)
if isinstance(event, CmdRunAction):
display_command(event)
if event.confirmation_state == ActionConfirmationStatus.CONFIRMED:
initialize_streaming_output()
if isinstance(event, CmdOutputObservation):
display_command_output(event.content)
if isinstance(event, FileEditObservation):
@@ -194,6 +203,8 @@ def display_event(event: Event, config: AppConfig) -> None:
display_file_read(event)
if isinstance(event, AgentStateChangedObservation):
display_agent_state_change_message(event.agent_state)
if isinstance(event, ErrorObservation):
display_error(event.content)
def display_message(message: str) -> None:
@@ -203,22 +214,39 @@ def display_message(message: str) -> None:
print_formatted_text(f'\n{message}')
def display_command(event: CmdRunAction) -> None:
if event.confirmation_state == ActionConfirmationStatus.AWAITING_CONFIRMATION:
def display_error(error: str) -> None:
error = error.strip()
if error:
container = Frame(
TextArea(
text=f'$ {event.command}',
text=error,
read_only=True,
style=COLOR_GREY,
style='ansired',
wrap_lines=True,
),
title='Action',
title='Error',
style='ansired',
)
print_formatted_text('')
print_container(container)
def display_command(event: CmdRunAction) -> None:
container = Frame(
TextArea(
text=f'$ {event.command}',
read_only=True,
style=COLOR_GREY,
wrap_lines=True,
),
title='Command',
style='ansiblue',
)
print_formatted_text('')
print_container(container)
def display_command_output(output: str) -> None:
lines = output.split('\n')
formatted_lines = []
@@ -240,7 +268,7 @@ def display_command_output(output: str) -> None:
style=COLOR_GREY,
wrap_lines=True,
),
title='Action Output',
title='Command Output',
style=f'fg:{COLOR_GREY}',
)
print_formatted_text('')
@@ -278,6 +306,36 @@ def display_file_read(event: FileReadObservation) -> None:
print_container(container)
def initialize_streaming_output():
"""Initialize the streaming output TextArea."""
if not ENABLE_STREAMING:
return
global streaming_output_text_area
streaming_output_text_area = TextArea(
text='',
read_only=True,
style=COLOR_GREY,
wrap_lines=True,
)
container = Frame(
streaming_output_text_area,
title='Streaming Output',
style=f'fg:{COLOR_GREY}',
)
print_formatted_text('')
print_container(container)
def update_streaming_output(text: str):
"""Update the streaming output TextArea with new text."""
global streaming_output_text_area
# Append the new text to the existing content
if streaming_output_text_area is not None:
current_text = streaming_output_text_area.text
streaming_output_text_area.text = current_text + text
# Interactive command output display functions
def display_help() -> None:
# Version header and introduction

View File

@@ -401,9 +401,6 @@ class AgentController:
if hasattr(event, 'hidden') and event.hidden:
return
# Give others a little chance
await asyncio.sleep(0.01)
# if the event is not filtered out, add it to the history
if not any(isinstance(event, filter_type) for filter_type in self.filter_out):
self.state.history.append(event)

View File

@@ -13,13 +13,15 @@ class AgentConfig(BaseModel):
classpath: str | None = Field(default=None)
"""The classpath of the agent to use. To be used for custom agents that are not defined in the openhands.agenthub package."""
enable_browsing: bool = Field(default=True)
"""Whether to enable browsing tool"""
"""Whether to enable browsing tool.
Note: If using CLIRuntime, browsing is not implemented and should be disabled."""
enable_llm_editor: bool = Field(default=False)
"""Whether to enable LLM editor tool"""
enable_editor: bool = Field(default=True)
"""Whether to enable the standard editor tool (str_replace_editor), only has an effect if enable_llm_editor is False."""
enable_jupyter: bool = Field(default=True)
"""Whether to enable Jupyter tool"""
"""Whether to enable Jupyter tool.
Note: If using CLIRuntime, Jupyter use is not implemented and should be disabled."""
enable_cmd: bool = Field(default=True)
"""Whether to enable bash tool"""
enable_think: bool = Field(default=True)

View File

@@ -48,7 +48,7 @@ class AppConfig(BaseModel):
file_uploads_allowed_extensions: Allowed file extensions. `['.*']` allows all.
cli_multiline_input: Whether to enable multiline input in CLI. When disabled,
input is read line by line. When enabled, input continues until /exit command.
mcp_host: Host for OpenHands' default MCP server
mcp_host: Host for OpenHands' default MCP server
mcp: MCP configuration settings.
"""

View File

@@ -384,6 +384,19 @@ def finalize_config(cfg: AppConfig) -> None:
)
)
# If CLIRuntime is selected, disable Jupyter for all agents
# Assuming 'cli' is the identifier for CLIRuntime
if cfg.runtime and cfg.runtime.lower() == 'cli':
for age_nt_name, agent_config in cfg.agents.items():
if agent_config.enable_jupyter:
agent_config.enable_jupyter = False
if agent_config.enable_browsing:
agent_config.enable_browsing = False
logger.openhands_logger.debug(
'Automatically disabled Jupyter plugin and browsing for all agents '
'because CLIRuntime is selected and does not support IPython execution.'
)
def get_agent_config_arg(
agent_config_arg: str, toml_file: str = 'config.toml'
@@ -725,6 +738,12 @@ def get_parser() -> argparse.ArgumentParser:
type=str,
default=None,
)
parser.add_argument(
'--override-cli-mode',
help='Override the default settings for CLI mode',
type=bool,
default=False,
)
return parser

View File

@@ -118,7 +118,7 @@ class RecallObservation(Observation):
f'additional_agent_instructions={self.additional_agent_instructions[:20]}...',
f'date={self.date}'
f'custom_secrets_descriptions={self.custom_secrets_descriptions}',
f'conversation_instructions={self.conversation_instructions[0:20]}...'
f'conversation_instructions={self.conversation_instructions[0:20]}...',
]
)
else:

View File

@@ -103,6 +103,12 @@ class EventStream(EventStore):
and callback_id in self._thread_loops[subscriber_id]
):
loop = self._thread_loops[subscriber_id][callback_id]
current_task = asyncio.current_task(loop)
pending = [
task for task in asyncio.all_tasks(loop) if task is not current_task
]
for task in pending:
task.cancel()
try:
loop.stop()
loop.close()

View File

@@ -196,7 +196,7 @@ class GitLabService(BaseGitService, GitService):
full_name=repo.get('path_with_namespace'),
stargazers_count=repo.get('star_count'),
git_provider=ProviderType.GITLAB,
is_public=True
is_public=True,
)
for repo in response
]
@@ -468,7 +468,9 @@ class GitLabService(BaseGitService, GitService):
# Set default description if none provided
if not description:
description = f'Merging changes from {source_branch} into {target_branch}'
description = (
f'Merging changes from {source_branch} into {target_branch}'
)
# Prepare the request payload
payload = {
@@ -477,7 +479,6 @@ class GitLabService(BaseGitService, GitService):
'title': title,
'description': description,
}
# Make the POST request to create the MR
response, _ = await self._make_request(

View File

@@ -162,13 +162,6 @@ async def add_mcp_tools_to_agent(
Add MCP tools to an agent.
"""
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient, # inline import to avoid circular import
)
assert isinstance(runtime, ActionExecutionClient), (
'Runtime must be an instance of ActionExecutionClient'
)
assert runtime.runtime_initialized, (
'Runtime must be initialized before adding MCP tools'
)
@@ -190,7 +183,7 @@ async def add_mcp_tools_to_agent(
logger.info(f'Added microagent stdio server: {stdio_server.name}')
# Add the runtime as another MCP server
updated_mcp_config = runtime.get_updated_mcp_config(extra_stdio_servers)
updated_mcp_config = runtime.get_mcp_config(extra_stdio_servers)
# Fetch the MCP tools
mcp_tools = await fetch_mcp_tools_from_config(updated_mcp_config)

View File

@@ -121,7 +121,9 @@ class ServiceContextPR(ServiceContext):
) -> tuple[str, str, list[str]]:
"""Generate instruction for the agent."""
user_instruction_template = jinja2.Template(user_instructions_prompt_template)
conversation_instructions_template = jinja2.Template(conversation_instructions_prompt_template)
conversation_instructions_template = jinja2.Template(
conversation_instructions_prompt_template
)
images = []
issues_str = None
@@ -159,12 +161,11 @@ class ServiceContextPR(ServiceContext):
review_comments=review_comments_str,
review_threads=review_thread_str,
files=review_thread_file_str,
thread_context=thread_context
thread_context=thread_context,
)
conversation_instructions = conversation_instructions_template.render(
issues=issues_str,
repo_instruction=repo_instruction
issues=issues_str, repo_instruction=repo_instruction
)
return user_instruction, conversation_instructions, images
@@ -354,10 +355,12 @@ class ServiceContextIssue(ServiceContext):
user_instructions_template = jinja2.Template(user_instructions_prompt_template)
user_instructions = user_instructions_template.render(
body=issue.title + '\n\n' + issue.body + thread_context
) # Issue body and comments
body=issue.title + '\n\n' + issue.body + thread_context
) # Issue body and comments
conversation_instructions_template = jinja2.Template(conversation_instructions_prompt_template)
conversation_instructions_template = jinja2.Template(
conversation_instructions_prompt_template
)
conversation_instructions = conversation_instructions_template.render(
repo_instruction=repo_instruction,
)

View File

@@ -1,4 +1,5 @@
from openhands.runtime.base import Runtime
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
from openhands.runtime.impl.daytona.daytona_runtime import DaytonaRuntime
from openhands.runtime.impl.docker.docker_runtime import (
DockerRuntime,
@@ -20,6 +21,7 @@ _DEFAULT_RUNTIME_CLASSES: dict[str, type[Runtime]] = {
'runloop': RunloopRuntime,
'local': LocalRuntime,
'daytona': DaytonaRuntime,
'cli': CLIRuntime,
}
@@ -48,5 +50,6 @@ __all__ = [
'RunloopRuntime',
'DockerRuntime',
'DaytonaRuntime',
'CLIRuntime',
'get_runtime_cls',
]

View File

@@ -16,6 +16,7 @@ from zipfile import ZipFile
import httpx
from openhands.core.config import AppConfig, SandboxConfig
from openhands.core.config.mcp_config import MCPConfig, MCPStdioServerConfig
from openhands.core.exceptions import AgentRuntimeDisconnectedError
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventSource, EventStream, EventStreamSubscriber
@@ -26,6 +27,7 @@ from openhands.events.action import (
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
@@ -99,6 +101,7 @@ class Runtime(FileEditRuntimeMixin):
initial_env_vars: dict[str, str]
attach_to_existing: bool
status_callback: Callable[[str, str, str], None] | None
_runtime_initialized: bool = False
def __init__(
self,
@@ -162,6 +165,10 @@ class Runtime(FileEditRuntimeMixin):
self.user_id = user_id
self.git_provider_tokens = git_provider_tokens
@property
def runtime_initialized(self) -> bool:
return self._runtime_initialized
def setup_initial_env(self) -> None:
if self.attach_to_existing:
return
@@ -427,6 +434,11 @@ class Runtime(FileEditRuntimeMixin):
if isinstance(obs, CmdOutputObservation) and obs.exit_code != 0:
self.log('error', f'Setup script failed: {obs.content}')
@property
def workspace_root(self) -> Path:
"""Return the workspace root path."""
return Path(self.config.workspace_mount_path_in_sandbox)
def maybe_setup_git_hooks(self):
"""Set up git hooks if .openhands/pre-commit.sh exists in the workspace or repository."""
pre_commit_script = '.openhands/pre-commit.sh'
@@ -611,7 +623,6 @@ fi
A list of loaded microagents from the org/user level repository
"""
loaded_microagents: list[BaseMicroagent] = []
workspace_root = Path(self.config.workspace_mount_path_in_sandbox)
repo_parts = selected_repository.split('/')
if len(repo_parts) < 2:
@@ -634,7 +645,7 @@ fi
# Try to clone the org-level .openhands repo
try:
# Create a temporary directory for the org-level repo
org_repo_dir = workspace_root / f'org_openhands_{org_name}'
org_repo_dir = self.workspace_root / f'org_openhands_{org_name}'
# Get authenticated URL and do a shallow clone (--depth 1) for efficiency
remote_url = self._get_authenticated_git_url(org_openhands_repo)
@@ -683,10 +694,8 @@ fi
For example, if the repository is github.com/acme-co/api, it will also check for
github.com/acme-co/.openhands and load microagents from there if it exists.
"""
loaded_microagents: list[BaseMicroagent] = []
workspace_root = Path(self.config.workspace_mount_path_in_sandbox)
microagents_dir = workspace_root / '.openhands' / 'microagents'
microagents_dir = self.workspace_root / '.openhands' / 'microagents'
repo_root = None
# Check for user/org level microagents if a repository is selected
@@ -696,7 +705,7 @@ fi
loaded_microagents.extend(org_microagents)
# Continue with repository-specific microagents
repo_root = workspace_root / selected_repository.split('/')[-1]
repo_root = self.workspace_root / selected_repository.split('/')[-1]
microagents_dir = repo_root / '.openhands' / 'microagents'
self.log(
@@ -707,7 +716,7 @@ fi
# Legacy Repo Instructions
# Check for legacy .openhands_instructions file
obs = self.read(
FileReadAction(path=str(workspace_root / '.openhands_instructions'))
FileReadAction(path=str(self.workspace_root / '.openhands_instructions'))
)
if isinstance(obs, ErrorObservation) and repo_root is not None:
# If the instructions file is not found in the workspace root, try to load it from the repo root
@@ -783,6 +792,12 @@ fi
async def connect(self) -> None:
pass
@abstractmethod
def get_mcp_config(
self, extra_stdio_servers: list[MCPStdioServerConfig] | None = None
) -> MCPConfig:
pass
# ====================================================================
# Action execution
# ====================================================================
@@ -803,6 +818,10 @@ fi
def write(self, action: FileWriteAction) -> Observation:
pass
@abstractmethod
def edit(self, action: FileEditAction) -> Observation:
pass
@abstractmethod
def browse(self, action: BrowseURLAction) -> Observation:
pass
@@ -884,3 +903,19 @@ fi
@property
def additional_agent_instructions(self) -> str:
return ''
def subscribe_to_shell_stream(
self, callback: Callable[[str], None] | None = None
) -> bool:
"""
Subscribe to shell command output stream.
This method is meant to be overridden by runtime implementations
that want to stream shell command output to external consumers.
Args:
callback: A function that will be called with each line of output from shell commands.
If None, any existing subscription will be removed.
Returns False by default.
"""
return False

View File

@@ -0,0 +1,27 @@
"""
Runtime implementations for OpenHands.
"""
from openhands.runtime.impl.action_execution.action_execution_client import (
ActionExecutionClient,
)
from openhands.runtime.impl.cli import CLIRuntime
from openhands.runtime.impl.daytona.daytona_runtime import DaytonaRuntime
from openhands.runtime.impl.docker.docker_runtime import DockerRuntime
from openhands.runtime.impl.e2b.e2b_runtime import E2BRuntime
from openhands.runtime.impl.local.local_runtime import LocalRuntime
from openhands.runtime.impl.modal.modal_runtime import ModalRuntime
from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime
from openhands.runtime.impl.runloop.runloop_runtime import RunloopRuntime
__all__ = [
'ActionExecutionClient',
'CLIRuntime',
'DaytonaRuntime',
'DockerRuntime',
'E2BRuntime',
'LocalRuntime',
'ModalRuntime',
'RemoteRuntime',
'RunloopRuntime',
]

View File

@@ -78,7 +78,6 @@ class ActionExecutionClient(Runtime):
):
self.session = HttpSession()
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
self._runtime_initialized: bool = False
self._runtime_closed: bool = False
self._vscode_token: str | None = None # initial dummy value
self._last_updated_mcp_stdio_servers: list[MCPStdioServerConfig] = []
@@ -99,10 +98,6 @@ class ActionExecutionClient(Runtime):
def action_execution_server_url(self) -> str:
raise NotImplementedError('Action execution server URL is not implemented')
@property
def runtime_initialized(self) -> bool:
return self._runtime_initialized
@retry(
retry=retry_if_exception(_is_retryable_error),
stop=stop_after_attempt(5) | stop_if_should_exit(),
@@ -356,7 +351,7 @@ class ActionExecutionClient(Runtime):
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
return self.send_action_for_execution(action)
def get_updated_mcp_config(
def get_mcp_config(
self, extra_stdio_servers: list[MCPStdioServerConfig] | None = None
) -> MCPConfig:
# Add the runtime as another MCP server
@@ -422,7 +417,6 @@ class ActionExecutionClient(Runtime):
else:
self.log('debug', 'No new stdio servers to update')
if len(self._last_updated_mcp_stdio_servers) > 0:
# We should always include the runtime as an MCP server whenever there's > 0 stdio servers
updated_mcp_config.sse_servers.append(
@@ -441,7 +435,7 @@ class ActionExecutionClient(Runtime):
from openhands.mcp.utils import create_mcp_clients
# Get the updated MCP config
updated_mcp_config = self.get_updated_mcp_config()
updated_mcp_config = self.get_mcp_config()
self.log(
'debug',
f'Creating MCP clients with servers: {updated_mcp_config.sse_servers}',

View File

@@ -0,0 +1,7 @@
"""
CLI Runtime implementation for OpenHands.
"""
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
__all__ = ['CLIRuntime']

View File

@@ -0,0 +1,782 @@
"""
This runtime runs commands locally using subprocess and performs file operations using Python's standard library.
It does not implement browser functionality.
"""
import asyncio
import os
import select
import shutil
import signal
import subprocess
import tempfile
import time
import zipfile
from pathlib import Path
from typing import Any, Callable
from binaryornot.check import is_binary
from openhands_aci.editor.editor import OHEditor
from openhands_aci.editor.exceptions import ToolError
from openhands_aci.editor.results import ToolResult
from openhands_aci.utils.diff import get_diff
from pydantic import SecretStr
from openhands.core.config import AppConfig
from openhands.core.config.mcp_config import MCPConfig, MCPStdioServerConfig
from openhands.core.exceptions import LLMMalformedActionError
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import (
BrowseInteractiveAction,
BrowseURLAction,
CmdRunAction,
FileEditAction,
FileReadAction,
FileWriteAction,
IPythonRunCellAction,
)
from openhands.events.action.mcp import MCPAction
from openhands.events.event import FileEditSource, FileReadSource
from openhands.events.observation import (
CmdOutputObservation,
ErrorObservation,
FileEditObservation,
FileReadObservation,
FileWriteObservation,
Observation,
)
from openhands.integrations.provider import PROVIDER_TOKEN_TYPE
from openhands.runtime.base import Runtime
from openhands.runtime.plugins import PluginRequirement
class CLIRuntime(Runtime):
"""
A runtime implementation that runs commands locally using subprocess and performs
file operations using Python's standard library. It does not implement browser functionality.
Args:
config (AppConfig): The application configuration.
event_stream (EventStream): The event stream to subscribe to.
sid (str, optional): The session ID. Defaults to 'default'.
plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
status_callback (Callable | None, optional): Callback for status updates. Defaults to None.
attach_to_existing (bool, optional): Whether to attach to an existing session. Defaults to False.
headless_mode (bool, optional): Whether to run in headless mode. Defaults to False.
user_id (str | None, optional): User ID for authentication. Defaults to None.
git_provider_tokens (PROVIDER_TOKEN_TYPE | None, optional): Git provider tokens. Defaults to None.
"""
def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_callback: Callable[[str, str, str], None] | None = None,
attach_to_existing: bool = False,
headless_mode: bool = False,
user_id: str | None = None,
git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
):
super().__init__(
config,
event_stream,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
headless_mode,
user_id,
git_provider_tokens,
)
# Set up workspace
if self.config.workspace_base is not None:
logger.warning(
f'Workspace base path is set to {self.config.workspace_base}. '
'It will be used as the path for the agent to run in. '
'Be careful, the agent can EDIT files in this directory!'
)
self._workspace_path = self.config.workspace_base
else:
# Create a temporary directory for the workspace
self._workspace_path = tempfile.mkdtemp(
prefix=f'openhands_workspace_{sid}_'
)
logger.info(f'Created temporary workspace at {self._workspace_path}')
# Initialize runtime state
self._runtime_initialized = False
self.file_editor = OHEditor(workspace_root=self._workspace_path)
self._shell_stream_callback: Callable[[str], None] | None = None
logger.warning(
'Initializing CLIRuntime. WARNING: NO SANDBOX IS USED. '
'This runtime executes commands directly on the local system. '
'Use with caution in untrusted environments.'
)
async def connect(self) -> None:
"""Initialize the runtime connection."""
self.send_status_message('STATUS$STARTING_RUNTIME')
# Ensure workspace directory exists
os.makedirs(self._workspace_path, exist_ok=True)
# Change to the workspace directory
os.chdir(self._workspace_path)
if not self.attach_to_existing:
await asyncio.to_thread(self.setup_initial_env)
self._runtime_initialized = True
self.send_status_message('STATUS$CONTAINER_STARTED')
logger.info(f'CLIRuntime initialized with workspace at {self._workspace_path}')
def add_env_vars(self, env_vars: dict[str, Any]) -> None:
"""
Adds environment variables to the current runtime environment.
For CLIRuntime, this means updating os.environ for the current process,
so that subsequent commands inherit these variables.
This overrides the BaseRuntime behavior which tries to run shell commands
before it's initialized and modify .bashrc, which is not ideal for local CLI.
"""
if not env_vars:
return
# We log only keys to avoid leaking sensitive values like tokens into logs.
logger.info(
f'[CLIRuntime] Setting environment variables for this session: {list(env_vars.keys())}'
)
for key, value in env_vars.items():
if isinstance(value, SecretStr):
os.environ[key] = value.get_secret_value()
logger.warning(f'[CLIRuntime] Set os.environ["{key}"] (from SecretStr)')
else:
os.environ[key] = value
logger.debug(f'[CLIRuntime] Set os.environ["{key}"]')
# We don't use self.run() here because this method is called
# during initialization before self._runtime_initialized is True.
def _safe_terminate_process(self, process_obj, signal_to_send=signal.SIGTERM):
"""
Safely attempts to terminate/kill a process group or a single process.
Args:
process_obj: the subprocess.Popen object started with start_new_session=True
signal_to_send: the signal to send to the process group or process.
"""
pid = getattr(process_obj, 'pid', None)
if pid is None:
return
group_desc = (
'kill process group'
if signal_to_send == signal.SIGKILL
else 'terminate process group'
)
process_desc = (
'kill process' if signal_to_send == signal.SIGKILL else 'terminate process'
)
try:
# Try to terminate/kill the entire process group
logger.debug(f'[_safe_terminate_process] Original PID to act on: {pid}')
pgid_to_kill = os.getpgid(
pid
) # This might raise ProcessLookupError if pid is already gone
logger.debug(
f'[_safe_terminate_process] Attempting to {group_desc} for PID {pid} (PGID: {pgid_to_kill}) with {signal_to_send}.'
)
os.killpg(pgid_to_kill, signal_to_send)
logger.debug(
f'[_safe_terminate_process] Successfully sent signal {signal_to_send} to PGID {pgid_to_kill} (original PID: {pid}).'
)
except ProcessLookupError as e_pgid:
logger.warning(
f'[_safe_terminate_process] ProcessLookupError getting PGID for PID {pid} (it might have already exited): {e_pgid}. Falling back to direct kill/terminate.'
)
try:
if signal_to_send == signal.SIGKILL:
process_obj.kill()
else:
process_obj.terminate()
logger.debug(
f'[_safe_terminate_process] Fallback: Terminated {process_desc} (PID: {pid}).'
)
except Exception as e_fallback:
logger.error(
f'[_safe_terminate_process] Fallback: Error during {process_desc} (PID: {pid}): {e_fallback}'
)
except (AttributeError, OSError) as e_os:
logger.error(
f'[_safe_terminate_process] OSError/AttributeError during {group_desc} for PID {pid}: {e_os}. Falling back.'
)
# Fallback: try to terminate/kill the main process directly.
try:
if signal_to_send == signal.SIGKILL:
process_obj.kill()
else:
process_obj.terminate()
logger.debug(
f'[_safe_terminate_process] Fallback: Terminated {process_desc} (PID: {pid}).'
)
except Exception as e_fallback:
logger.error(
f'[_safe_terminate_process] Fallback: Error during {process_desc} (PID: {pid}): {e_fallback}'
)
except (KeyboardInterrupt, SystemExit):
raise
except Exception as e:
logger.error(f'Error: {e}')
def _execute_shell_command(
self, command: str, timeout: float
) -> CmdOutputObservation:
"""
Execute a shell command and stream its output to a callback function.
Args:
command: The shell command to execute
timeout: Timeout in seconds for the command
Returns:
CmdOutputObservation containing the complete output and exit code
"""
output_lines = []
timed_out = False
start_time = time.monotonic()
# Use shell=True to run complex bash commands
process = subprocess.Popen(
['bash', '-c', command],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1, # Explicitly line-buffered for text mode
universal_newlines=True,
start_new_session=True,
)
logger.debug(
f'[_execute_shell_command] PID of bash -c: {process.pid} for command: "{command}"'
)
exit_code = None
try:
if process.stdout:
while process.poll() is None:
if (
timeout is not None
and (time.monotonic() - start_time) > timeout
):
logger.debug(
f'Command "{command}" timed out after {timeout:.1f} seconds. Terminating.'
)
# Attempt to terminate the process group (SIGTERM)
self._safe_terminate_process(
process, signal_to_send=signal.SIGTERM
)
timed_out = True
break
ready_to_read, _, _ = select.select([process.stdout], [], [], 0.1)
if ready_to_read:
line = process.stdout.readline()
if line:
logger.debug(f'LINE: {line}')
output_lines.append(line)
if self._shell_stream_callback:
self._shell_stream_callback(line)
# Attempt to read any remaining data from stdout
if process.stdout and not process.stdout.closed:
try:
while line:
line = process.stdout.readline()
if line:
logger.debug(f'LINE: {line}')
output_lines.append(line)
if self._shell_stream_callback:
self._shell_stream_callback(line)
except Exception as e:
logger.warning(
f'Error reading directly from stdout after loop for "{command}": {e}'
)
exit_code = process.returncode
# If timeout occurred, ensure exit_code reflects this for the observation.
if timed_out:
exit_code = -1
except Exception as e:
logger.error(
f'Outer exception in _execute_shell_command for "{command}": {e}'
)
if process and process.poll() is None:
self._safe_terminate_process(process, signal_to_send=signal.SIGKILL)
return CmdOutputObservation(
command=command,
content=''.join(output_lines) + f'\nError during execution: {e}',
exit_code=-1,
)
complete_output = ''.join(output_lines)
logger.debug(
f'[_execute_shell_command] Complete output for "{command}" (len: {len(complete_output)}): {complete_output!r}'
)
obs_metadata = {'working_dir': self._workspace_path}
if timed_out:
obs_metadata['suffix'] = (
f'[The command timed out after {timeout:.1f} seconds.]'
)
# exit_code = -1 # This is already set if timed_out is True
return CmdOutputObservation(
command=command,
content=complete_output,
exit_code=exit_code,
metadata=obs_metadata,
)
def run(self, action: CmdRunAction) -> Observation:
"""Run a command using subprocess."""
if not self._runtime_initialized:
return ErrorObservation(
f'Runtime not initialized for command: {action.command}'
)
if action.is_input:
logger.warning(
f"CLIRuntime received an action with `is_input=True` (command: '{action.command}'). "
'CLIRuntime currently does not support sending input or signals to active processes. '
'This action will be ignored and an error observation will be returned.'
)
return ErrorObservation(
content=f"CLIRuntime does not support interactive input from the agent (e.g., 'C-c'). The command '{action.command}' was not sent to any process.",
error_id='AGENT_ERROR$BAD_ACTION',
)
try:
effective_timeout = (
action.timeout
if action.timeout is not None
else self.config.sandbox.timeout
)
logger.debug(
f'Running command in CLIRuntime: "{action.command}" with effective timeout: {effective_timeout}s'
)
return self._execute_shell_command(
action.command, timeout=effective_timeout
)
except Exception as e:
logger.error(
f'Error in CLIRuntime.run for command "{action.command}": {str(e)}'
)
return ErrorObservation(
f'Error running command "{action.command}": {str(e)}'
)
def run_ipython(self, action: IPythonRunCellAction) -> Observation:
"""Run a Python code cell.
This functionality is not implemented in CLIRuntime.
Users should also disable the Jupyter plugin in AgentConfig.
"""
# This functionality is not implemented in CLIRuntime.
# If you need to run IPython/Jupyter cells, please consider using a different runtime
# or ensure the Jupyter plugin is disabled in your AgentConfig to avoid
# attempting to use this disabled feature.
logger.warning(
"run_ipython is called on CLIRuntime, but it's not implemented. "
'Please disable the Jupyter plugin in AgentConfig.'
)
return ErrorObservation(
'Executing IPython cells is not implemented in CLIRuntime. '
)
def _sanitize_filename(self, filename: str) -> str:
# if path is absolute, ensure it starts with _workspace_path
if filename == '/workspace':
actual_filename = self._workspace_path
elif filename.startswith('/workspace/'):
# Map /workspace/ to the actual workspace path
# Note: /workspace is widely used, so we map it to allow using it with CLIRuntime
actual_filename = os.path.join(
self._workspace_path, filename[len('/workspace/') :]
)
elif filename.startswith('/'):
if not filename.startswith(self._workspace_path):
raise LLMMalformedActionError(
f'Invalid path: {filename}. You can only work with files in {self._workspace_path}.'
)
actual_filename = filename
else:
actual_filename = os.path.join(self._workspace_path, filename.lstrip('/'))
# Resolve the path to handle any '..' or '.' components
resolved_path = os.path.realpath(actual_filename)
# Check if the resolved path is still within the workspace
if not resolved_path.startswith(self._workspace_path):
raise LLMMalformedActionError(
f'Invalid path traversal: {filename}. Path resolves outside the workspace. Resolved: {resolved_path}, Workspace: {self._workspace_path}'
)
return resolved_path
def read(self, action: FileReadAction) -> Observation:
"""Read a file using Python's standard library or OHEditor."""
if not self._runtime_initialized:
return ErrorObservation('Runtime not initialized')
file_path = self._sanitize_filename(action.path)
# Cannot read binary files
if os.path.exists(file_path) and is_binary(file_path):
return ErrorObservation('ERROR_BINARY_FILE')
# Use OHEditor for OH_ACI implementation source
if action.impl_source == FileReadSource.OH_ACI:
result_str, _ = self._execute_file_editor(
command='view',
path=file_path,
view_range=action.view_range,
)
return FileReadObservation(
content=result_str,
path=action.path,
impl_source=FileReadSource.OH_ACI,
)
try:
# Check if the file exists
if not os.path.exists(file_path):
return ErrorObservation(f'File not found: {action.path}')
# Check if it's a directory
if os.path.isdir(file_path):
return ErrorObservation(f'Cannot read directory: {action.path}')
# Read the file
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return FileReadObservation(content=content, path=action.path)
except Exception as e:
logger.error(f'Error reading file: {str(e)}')
return ErrorObservation(f'Error reading file {action.path}: {str(e)}')
def write(self, action: FileWriteAction) -> Observation:
"""Write to a file using Python's standard library."""
if not self._runtime_initialized:
return ErrorObservation('Runtime not initialized')
file_path = self._sanitize_filename(action.path)
try:
# Create parent directories if they don't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Write to the file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(action.content)
return FileWriteObservation(content='', path=action.path)
except Exception as e:
logger.error(f'Error writing to file: {str(e)}')
return ErrorObservation(f'Error writing to file {action.path}: {str(e)}')
def browse(self, action: BrowseURLAction) -> Observation:
"""Not implemented for CLI runtime."""
return ErrorObservation(
'Browser functionality is not implemented in CLIRuntime'
)
def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
"""Not implemented for CLI runtime."""
return ErrorObservation(
'Browser functionality is not implemented in CLIRuntime'
)
def _execute_file_editor(
self,
command: str,
path: str,
file_text: str | None = None,
view_range: list[int] | None = None,
old_str: str | None = None,
new_str: str | None = None,
insert_line: int | None = None,
enable_linting: bool = False,
) -> tuple[str, tuple[str | None, str | None]]:
"""Execute file editor command and handle exceptions.
Args:
command: Editor command to execute
path: File path
file_text: Optional file text content
view_range: Optional view range tuple (start, end)
old_str: Optional string to replace
new_str: Optional replacement string
insert_line: Optional line number for insertion
enable_linting: Whether to enable linting
Returns:
tuple: A tuple containing the output string and a tuple of old and new file content
"""
result: ToolResult | None = None
try:
result = self.file_editor(
command=command,
path=path,
file_text=file_text,
view_range=view_range,
old_str=old_str,
new_str=new_str,
insert_line=insert_line,
enable_linting=enable_linting,
)
except ToolError as e:
result = ToolResult(error=e.message)
if result.error:
return f'ERROR:\n{result.error}', (None, None)
if not result.output:
logger.warning(f'No output from file_editor for {path}')
return '', (None, None)
return result.output, (result.old_content, result.new_content)
def edit(self, action: FileEditAction) -> Observation:
"""Edit a file using the OHEditor."""
if not self._runtime_initialized:
return ErrorObservation('Runtime not initialized')
# Ensure the path is within the workspace
file_path = self._sanitize_filename(action.path)
# Check if it's a binary file
if os.path.exists(file_path) and is_binary(file_path):
return ErrorObservation('ERROR_BINARY_FILE')
assert action.impl_source == FileEditSource.OH_ACI
result_str, (old_content, new_content) = self._execute_file_editor(
command=action.command,
path=file_path,
file_text=action.file_text,
old_str=action.old_str,
new_str=action.new_str,
insert_line=action.insert_line,
enable_linting=False,
)
return FileEditObservation(
content=result_str,
path=action.path,
old_content=action.old_str,
new_content=action.new_str,
impl_source=FileEditSource.OH_ACI,
diff=get_diff(
old_contents=old_content or '',
new_contents=new_content or '',
filepath=action.path,
),
)
async def call_tool_mcp(self, action: MCPAction) -> Observation:
"""Not implemented for CLI runtime."""
return ErrorObservation('MCP functionality is not implemented in CLIRuntime')
@property
def workspace_root(self) -> Path:
"""Return the workspace root path."""
return Path(os.path.abspath(self._workspace_path))
def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
"""Copy a file or directory from the host to the sandbox."""
if not self._runtime_initialized:
raise RuntimeError('Runtime not initialized')
if not os.path.exists(host_src): # Source must exist on host
raise FileNotFoundError(f"Source path '{host_src}' does not exist.")
dest = self._sanitize_filename(sandbox_dest)
try:
# Case 1: Source is a directory and recursive copy.
if os.path.isdir(host_src) and recursive:
# Target is dest / basename(host_src)
final_target_dir = os.path.join(dest, os.path.basename(host_src))
# If source and final target are the same, skip.
if os.path.realpath(host_src) == os.path.realpath(final_target_dir):
logger.debug(
'Skipping recursive copy: source and target are identical.'
)
pass
else:
# Ensure parent of final_target_dir exists.
os.makedirs(dest, exist_ok=True)
shutil.copytree(host_src, final_target_dir, dirs_exist_ok=True)
# Why: Copies dir host_src into dest. Merges if target exists.
# Case 2: Source is a file.
elif os.path.isfile(host_src):
final_target_file_path: str
# Scenario A: sandbox_dest is clearly a directory.
if os.path.isdir(dest) or (sandbox_dest.endswith(('/', os.sep))):
target_dir = dest
os.makedirs(target_dir, exist_ok=True)
final_target_file_path = os.path.join(
target_dir, os.path.basename(host_src)
)
# Why: Copies file into specified directory.
# Scenario B: sandbox_dest is likely a new directory (e.g., 'new_dir').
elif not os.path.exists(dest) and '.' not in os.path.basename(dest):
target_dir = dest
os.makedirs(target_dir, exist_ok=True)
final_target_file_path = os.path.join(
target_dir, os.path.basename(host_src)
)
# Why: Creates 'new_dir' and copies file into it.
# Scenario C: sandbox_dest is a full file path.
else:
final_target_file_path = dest
os.makedirs(os.path.dirname(final_target_file_path), exist_ok=True)
# Why: Copies file to a specific path, possibly renaming.
shutil.copy2(host_src, final_target_file_path)
else: # Source is not a valid file or directory.
raise FileNotFoundError(
f"Source path '{host_src}' is not a valid file or directory."
)
except FileNotFoundError as e:
logger.error(f'File not found during copy: {str(e)}')
raise
except shutil.SameFileError as e:
# We can be lenient here, just ignore this error.
logger.debug(
f'Skipping copy as source and destination are the same: {str(e)}'
)
pass
except Exception as e:
logger.error(f'Unexpected error copying file: {str(e)}')
raise RuntimeError(f'Unexpected error copying file: {str(e)}')
def list_files(self, path: str | None = None) -> list[str]:
"""List files in the sandbox."""
if not self._runtime_initialized:
raise RuntimeError('Runtime not initialized')
if path is None:
dir_path = self._workspace_path
else:
dir_path = self._sanitize_filename(path)
try:
if not os.path.exists(dir_path):
return []
if not os.path.isdir(dir_path):
return [dir_path]
# List files in the directory
return [os.path.join(dir_path, f) for f in os.listdir(dir_path)]
except Exception as e:
logger.error(f'Error listing files: {str(e)}')
return []
def copy_from(self, path: str) -> Path:
"""Zip all files in the sandbox and return a path in the local filesystem."""
if not self._runtime_initialized:
raise RuntimeError('Runtime not initialized')
source_path = self._sanitize_filename(path)
if not os.path.exists(source_path):
raise FileNotFoundError(f'Path not found: {path}')
# Create a temporary zip file
temp_zip = tempfile.NamedTemporaryFile(suffix='.zip', delete=False)
temp_zip.close()
try:
with zipfile.ZipFile(temp_zip.name, 'w', zipfile.ZIP_DEFLATED) as zipf:
if os.path.isdir(source_path):
# Add all files in the directory
for root, _, files in os.walk(source_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, source_path)
zipf.write(file_path, arcname)
else:
# Add a single file
zipf.write(source_path, os.path.basename(source_path))
return Path(temp_zip.name)
except Exception as e:
logger.error(f'Error creating zip file: {str(e)}')
raise RuntimeError(f'Error creating zip file: {str(e)}')
def close(self) -> None:
self._runtime_initialized = False
super().close()
@classmethod
async def delete(cls, conversation_id: str) -> None:
"""Delete any resources associated with a conversation."""
# Look for temporary directories that might be associated with this conversation
temp_dir = tempfile.gettempdir()
prefix = f'openhands_workspace_{conversation_id}_'
for item in os.listdir(temp_dir):
if item.startswith(prefix):
try:
path = os.path.join(temp_dir, item)
if os.path.isdir(path):
shutil.rmtree(path)
logger.info(f'Deleted workspace directory: {path}')
except Exception as e:
logger.error(f'Error deleting workspace directory: {str(e)}')
@property
def additional_agent_instructions(self) -> str:
return '\n\n'.join(
[
f'Your working directory is {self._workspace_path}. You can only read and write files in this directory.',
"You are working directly on the user's machine. In most cases, the working environment is already set up.",
]
)
def get_mcp_config(
self, extra_stdio_servers: list[MCPStdioServerConfig] | None = None
) -> MCPConfig:
# TODO: Load MCP config from a local file
return MCPConfig()
def subscribe_to_shell_stream(
self, callback: Callable[[str], None] | None = None
) -> bool:
"""
Subscribe to shell command output stream.
Args:
callback: A function that will be called with each line of output from shell commands.
If None, any existing subscription will be removed.
"""
self._shell_stream_callback = callback
return True

View File

@@ -200,8 +200,8 @@ class LocalRuntime(ActionExecutionClient):
headless_mode,
)
#If there is an API key in the environment we use this in requests to the runtime
session_api_key = os.getenv("SESSION_API_KEY")
# If there is an API key in the environment we use this in requests to the runtime
session_api_key = os.getenv('SESSION_API_KEY')
if session_api_key:
self.session.headers['X-Session-API-Key'] = session_api_key

View File

@@ -11,6 +11,7 @@ from openhands.core.config import AppConfig, MCPConfig, load_app_config
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.base import Runtime
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
from openhands.runtime.impl.daytona.daytona_runtime import DaytonaRuntime
from openhands.runtime.impl.docker.docker_runtime import DockerRuntime
from openhands.runtime.impl.local.local_runtime import LocalRuntime
@@ -133,6 +134,8 @@ def get_runtime_classes() -> list[type[Runtime]]:
return [RunloopRuntime]
elif runtime.lower() == 'daytona':
return [DaytonaRuntime]
elif runtime.lower() == 'cli':
return [CLIRuntime]
else:
raise ValueError(f'Invalid runtime: {runtime}')
@@ -269,6 +272,16 @@ def _load_runtime(
sid=sid,
plugins=plugins,
)
# For CLIRuntime, the tests' assertions should be based on the physical workspace path,
# not the logical "/workspace". So, we adjust config.workspace_mount_path_in_sandbox
# to reflect the actual physical path used by CLIRuntime's OHEditor.
if isinstance(runtime, CLIRuntime):
config.workspace_mount_path_in_sandbox = str(runtime.workspace_root)
logger.info(
f'Adjusted workspace_mount_path_in_sandbox for CLIRuntime to: {config.workspace_mount_path_in_sandbox}'
)
call_async_from_sync(runtime.connect)
time.sleep(2)
return runtime, config

View File

@@ -8,6 +8,7 @@ from conftest import _close_test_runtime, _load_runtime
from openhands.core.logger import openhands_logger as logger
from openhands.events.action import FileEditAction, FileWriteAction
from openhands.runtime.action_execution_server import _execute_file_editor
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
def test_view_file(temp_dir, runtime_cls, run_as_openhands):
@@ -353,10 +354,19 @@ def test_str_replace_with_empty_old_str(temp_dir, runtime_cls, run_as_openhands)
)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert (
'No replacement was performed. Multiple occurrences of old_str `` in lines [1, 2, 3, 4]. Please ensure it is unique.'
in obs.content
)
if isinstance(runtime, CLIRuntime):
# CLIRuntime with a 3-line file without a trailing newline reports 3 occurrences for an empty old_str
assert (
'No replacement was performed. Multiple occurrences of old_str `` in lines [1, 2, 3]. Please ensure it is unique.'
in obs.content
)
else:
# Other runtimes might behave differently (e.g., implicitly add a newline, leading to 4 matches)
# TODO: Why do they have 4 lines?
assert (
'No replacement was performed. Multiple occurrences of old_str `` in lines [1, 2, 3, 4]. Please ensure it is unique.'
in obs.content
)
finally:
_close_test_runtime(runtime)

View File

@@ -14,6 +14,7 @@ from conftest import (
from openhands.core.logger import openhands_logger as logger
from openhands.events.action import CmdRunAction
from openhands.events.observation import CmdOutputObservation, ErrorObservation
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
from openhands.runtime.impl.local.local_runtime import LocalRuntime
# ============================================================================================================================
@@ -51,25 +52,37 @@ def test_bash_server(temp_dir, runtime_cls, run_as_openhands):
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == -1
assert 'Serving HTTP on' in obs.content
assert (
"[The command timed out after 1.0 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, or send keys to interrupt/kill the command.]"
in obs.metadata.suffix
)
if runtime_cls == CLIRuntime:
assert '[The command timed out after 1.0 seconds.]' in obs.metadata.suffix
else:
assert (
"[The command timed out after 1.0 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, or send keys to interrupt/kill the command.]"
in obs.metadata.suffix
)
action = CmdRunAction(command='C-c', is_input=True)
action.set_hard_timeout(30)
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0
if not is_windows():
# Linux/macOS behavior
assert 'Keyboard interrupt received, exiting.' in obs.content
assert config.workspace_mount_path_in_sandbox in obs.metadata.working_dir
obs_interrupt = runtime.run_action(action)
logger.info(obs_interrupt, extra={'msg_type': 'OBSERVATION'})
if runtime_cls == CLIRuntime:
assert isinstance(obs_interrupt, ErrorObservation)
assert (
"CLIRuntime does not support interactive input from the agent (e.g., 'C-c'). The command 'C-c' was not sent to any process."
in obs_interrupt.content
)
assert obs_interrupt.error_id == 'AGENT_ERROR$BAD_ACTION'
else:
# Windows behavior: Stop-Job might not produce output, but exit code should be 0
# The working directory check might also be less relevant/predictable here
pass
assert isinstance(obs_interrupt, CmdOutputObservation)
assert obs_interrupt.exit_code == 0
if not is_windows():
# Linux/macOS behavior
assert 'Keyboard interrupt received, exiting.' in obs_interrupt.content
assert (
config.workspace_mount_path_in_sandbox
in obs_interrupt.metadata.working_dir
)
# Verify the server is actually stopped by trying to start another one
# on the same port (regardless of OS)
@@ -82,7 +95,12 @@ def test_bash_server(temp_dir, runtime_cls, run_as_openhands):
# Check that the interrupt message is NOT present in subsequent output
assert 'Keyboard interrupt received, exiting.' not in obs.content
# Check working directory remains correct after interrupt handling
assert config.workspace_mount_path_in_sandbox in obs.metadata.working_dir
if runtime_cls == CLIRuntime:
# For CLIRuntime, working_dir is the absolute host path
assert obs.metadata.working_dir == config.workspace_base
else:
# For other runtimes (e.g., Docker), it's relative to or contains the sandbox path
assert config.workspace_mount_path_in_sandbox in obs.metadata.working_dir
# run it again!
action = CmdRunAction(command='python -u -m http.server 8081')
@@ -106,35 +124,66 @@ def test_bash_background_server(temp_dir, runtime_cls, run_as_openhands):
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0 # Should not timeout since this runs in background
# Give the server a moment to be ready
time.sleep(1)
if runtime_cls == CLIRuntime:
# The '&' does not detach cleanly; the PTY session remains active.
# the main cmd ends, then the server may receive SIGHUP.
assert obs.exit_code == 0
# Verify the server is running by curling it
if is_windows():
# Give the server a moment to be ready
time.sleep(1)
# `curl --fail` exits non-zero if connection fails or server returns an error.
# Use a short connect timeout as the server is expected to be down.
curl_action = CmdRunAction(
f'Invoke-WebRequest -Uri http://localhost:{server_port} -UseBasicParsing | Select-Object -ExpandProperty Content'
f'curl --fail --connect-timeout 1 http://localhost:{server_port}'
)
else:
curl_action = CmdRunAction(f'curl http://localhost:{server_port}')
curl_obs = runtime.run_action(curl_action)
logger.info(curl_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(curl_obs, CmdOutputObservation)
assert curl_obs.exit_code == 0
# Check for content typical of python http.server directory listing
assert 'Directory listing for' in curl_obs.content
curl_obs = runtime.run_action(curl_action)
logger.info(curl_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(curl_obs, CmdOutputObservation)
assert curl_obs.exit_code != 0
# Kill the server
if is_windows():
# Use PowerShell job management commands instead of trying to kill process directly
kill_action = CmdRunAction('Get-Job | Stop-Job')
else:
# Confirm with pkill (CLIRuntime is assumed non-Windows here).
# pkill returns 1 if no processes were matched.
kill_action = CmdRunAction('pkill -f "http.server"')
kill_obs = runtime.run_action(kill_action)
logger.info(kill_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(kill_obs, CmdOutputObservation)
assert kill_obs.exit_code == 0
kill_obs = runtime.run_action(kill_action)
logger.info(kill_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(kill_obs, CmdOutputObservation)
# For CLIRuntime, bash -c "cmd &" exits quickly, orphaning "cmd".
# CLIRuntime's timeout tries to kill the already-exited bash -c.
# The orphaned http.server continues running.
# So, pkill should find and kill the server.
assert kill_obs.exit_code == 0
else:
assert obs.exit_code == 0
# Give the server a moment to be ready
time.sleep(1)
# Verify the server is running by curling it
if is_windows():
curl_action = CmdRunAction(
f'Invoke-WebRequest -Uri http://localhost:{server_port} -UseBasicParsing | Select-Object -ExpandProperty Content'
)
else:
curl_action = CmdRunAction(f'curl http://localhost:{server_port}')
curl_obs = runtime.run_action(curl_action)
logger.info(curl_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(curl_obs, CmdOutputObservation)
assert curl_obs.exit_code == 0
# Check for content typical of python http.server directory listing
assert 'Directory listing for' in curl_obs.content
# Kill the server
if is_windows():
# This assumes PowerShell context if LocalRuntime is used on Windows.
kill_action = CmdRunAction('Get-Job | Stop-Job')
else:
kill_action = CmdRunAction('pkill -f "http.server"')
kill_obs = runtime.run_action(kill_action)
logger.info(kill_obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(kill_obs, CmdOutputObservation)
assert kill_obs.exit_code == 0
finally:
_close_test_runtime(runtime)
@@ -241,6 +290,10 @@ done && echo "success"
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime uses bash -c which handles newline-separated commands. This test expects rejection. See test_cliruntime_multiple_newline_commands.',
)
def test_multiple_multiline_commands(temp_dir, runtime_cls, run_as_openhands):
if is_windows():
cmds = [
@@ -302,6 +355,45 @@ def test_multiple_multiline_commands(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
def test_cliruntime_multiple_newline_commands(temp_dir, run_as_openhands):
# This test is specific to CLIRuntime
runtime_cls = CLIRuntime
if is_windows():
# Minimal check for Windows if CLIRuntime were to support it robustly with PowerShell for this.
# For now, this test primarily targets the bash -c behavior on non-Windows.
pytest.skip(
'CLIRuntime newline command test primarily for non-Windows bash behavior'
)
# cmds = [
# 'Get-ChildItem -Name .git_config', # Simpler command
# 'Write-Output "hello`nworld"'
# ]
# expected_outputs = ['.git_config', 'hello\nworld']
else:
cmds = [
'echo "hello"', # A command that will always work
'echo -e "hello\nworld"',
"""echo -e "hello it's me\"""",
]
expected_outputs = [
'hello', # Simple string output
'hello\nworld',
"hello it's me",
] # Simplified expectations
joined_cmds = '\n'.join(cmds)
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
obs = _run_cmd_action(runtime, joined_cmds)
assert isinstance(obs, CmdOutputObservation)
assert obs.exit_code == 0
# Check that parts of each command's expected output are present
for expected_part in expected_outputs:
assert expected_part in obs.content
finally:
_close_test_runtime(runtime)
def test_cmd_run(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -348,9 +440,13 @@ def test_cmd_run(temp_dir, runtime_cls, run_as_openhands):
obs = _run_cmd_action(runtime, 'ls -l')
assert obs.exit_code == 0
if run_as_openhands:
if (
run_as_openhands
and runtime_cls != CLIRuntime
and runtime_cls != LocalRuntime
):
assert 'openhands' in obs.content
elif runtime_cls == LocalRuntime:
elif runtime_cls == LocalRuntime or runtime_cls == CLIRuntime:
assert 'root' not in obs.content and 'openhands' not in obs.content
else:
assert 'root' in obs.content
@@ -372,6 +468,10 @@ def test_cmd_run(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
@pytest.mark.skipif(
sys.platform != 'win32' and os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime runs as the host user, so ~ is the host home. This test assumes a sandboxed user.',
)
def test_run_as_user_correct_home_dir(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -445,12 +545,22 @@ def test_stateful_cmd(temp_dir, runtime_cls):
obs = _run_cmd_action(runtime, 'mkdir -p test')
assert obs.exit_code == 0, 'The exit code should be 0.'
obs = _run_cmd_action(runtime, 'cd test')
assert obs.exit_code == 0, 'The exit code should be 0.'
if runtime_cls == CLIRuntime:
# For CLIRuntime, test CWD change and command execution within a single action
# as CWD is enforced in the workspace.
obs = _run_cmd_action(runtime, 'cd test && pwd')
else:
# For other runtimes, test stateful CWD change across actions
obs = _run_cmd_action(runtime, 'cd test')
assert obs.exit_code == 0, 'The exit code should be 0 for cd test.'
obs = _run_cmd_action(runtime, 'pwd')
obs = _run_cmd_action(runtime, 'pwd')
assert obs.exit_code == 0, 'The exit code should be 0.'
assert f'{config.workspace_mount_path_in_sandbox}/test' in obs.content
assert obs.exit_code == 0, (
'The exit code for the pwd command (or combined command) should be 0.'
)
assert (
f'{config.workspace_mount_path_in_sandbox}/test' in obs.content.strip()
)
finally:
_close_test_runtime(runtime)
@@ -694,7 +804,8 @@ def test_git_operation(temp_dir, runtime_cls):
# this will happen if permission of runtime is not properly configured
# fatal: detected dubious ownership in repository at config.workspace_mount_path_in_sandbox
try:
if runtime_cls != LocalRuntime:
if runtime_cls != LocalRuntime and runtime_cls != CLIRuntime:
# on local machine, permissionless sudo will probably not be available
obs = _run_cmd_action(runtime, 'sudo chown -R openhands:root .')
assert obs.exit_code == 0
@@ -704,7 +815,7 @@ def test_git_operation(temp_dir, runtime_cls):
# drwx--S--- 2 openhands root 64 Aug 7 23:32 .
# drwxr-xr-x 1 root root 4.0K Aug 7 23:33 ..
for line in obs.content.split('\n'):
if runtime_cls == LocalRuntime:
if runtime_cls == LocalRuntime or runtime_cls == CLIRuntime:
continue # skip these checks
if ' ..' in line:
@@ -725,17 +836,17 @@ def test_git_operation(temp_dir, runtime_cls):
obs = _run_cmd_action(runtime, 'echo "hello" > test_file.txt')
assert obs.exit_code == 0
if runtime_cls == LocalRuntime:
if runtime_cls == LocalRuntime or runtime_cls == CLIRuntime:
# set git config author in CI only, not on local machine
logger.info('Setting git config author')
obs = _run_cmd_action(
runtime,
'git config --file ./.git_config user.name "openhands" && git config --file ./.git_config user.email "openhands@all-hands.dev"',
'git config user.name "openhands" && git config user.email "openhands@all-hands.dev"',
)
assert obs.exit_code == 0
# Set up git config
obs = _run_cmd_action(runtime, 'git config --file ./.git_config')
# Set up git config - list current settings (should be empty or just what was set)
obs = _run_cmd_action(runtime, 'git config --list')
assert obs.exit_code == 0
# git add
@@ -847,6 +958,10 @@ def test_basic_command(temp_dir, runtime_cls, run_as_openhands):
@pytest.mark.skipif(
is_windows(), reason='Powershell does not support interactive commands'
)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support interactive commands from the agent.',
)
def test_interactive_command(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(
temp_dir,
@@ -905,6 +1020,10 @@ def test_long_output(temp_dir, runtime_cls, run_as_openhands):
is_windows(),
reason='Test relies on Linux-specific commands like seq and bash for loops',
)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not truncate command output.',
)
def test_long_output_exceed_history_limit(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -983,6 +1102,10 @@ def test_command_backslash(temp_dir, runtime_cls, run_as_openhands):
@pytest.mark.skipif(
is_windows(), reason='Test uses Linux-specific ps aux, awk, and grep commands'
)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support interactive commands from the agent.',
)
def test_stress_long_output_with_soft_and_hard_timeout(
temp_dir, runtime_cls, run_as_openhands
):
@@ -1073,6 +1196,10 @@ def test_stress_long_output_with_soft_and_hard_timeout(
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='FIXME: CLIRuntime does not watch previously timed-out commands except for getting full output a short time after timeout.',
)
def test_command_output_continuation(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -1157,6 +1284,10 @@ def test_command_output_continuation(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='FIXME: CLIRuntime does not implement empty command behavior.',
)
def test_long_running_command_follow_by_execute(
temp_dir, runtime_cls, run_as_openhands
):
@@ -1204,6 +1335,10 @@ def test_long_running_command_follow_by_execute(
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='FIXME: CLIRuntime does not implement empty command behavior.',
)
def test_empty_command_errors(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -1220,6 +1355,10 @@ def test_empty_command_errors(temp_dir, runtime_cls, run_as_openhands):
@pytest.mark.skipif(
is_windows(), reason='Powershell does not support interactive commands'
)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support interactive commands from the agent.',
)
def test_python_interactive_input(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -1252,6 +1391,10 @@ def test_python_interactive_input(temp_dir, runtime_cls, run_as_openhands):
@pytest.mark.skipif(
is_windows(), reason='Powershell does not support interactive commands'
)
@pytest.mark.skipif(
os.getenv('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support interactive commands from the agent.',
)
def test_python_interactive_input_without_set_input(
temp_dir, runtime_cls, run_as_openhands
):

View File

@@ -2,6 +2,7 @@
import os
import pytest
from conftest import _close_test_runtime, _load_runtime
from openhands.core.logger import openhands_logger as logger
@@ -21,6 +22,10 @@ from openhands.events.observation import (
# ============================================================================================================================
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support browsing actions',
)
def test_simple_browse(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
@@ -65,6 +70,10 @@ def test_simple_browse(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support browsing actions',
)
def test_read_pdf_browse(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:
@@ -135,6 +144,10 @@ def test_read_pdf_browse(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support browsing actions',
)
def test_read_png_browse(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
try:

View File

@@ -1,5 +1,7 @@
"""Image-related tests for the DockerRuntime, which connects to the ActionExecutor running in the sandbox."""
import os
import pytest
from conftest import _close_test_runtime, _load_runtime
@@ -10,6 +12,13 @@ from openhands.events.action import CmdRunAction
# Image-specific tests
# ============================================================================================================================
# Skip all tests in this file if running with CLIRuntime or LocalRuntime,
# as these tests are specific to Docker images.
pytestmark = pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') in ['cli', 'local'],
reason='Image tests are specific to DockerRuntime and not applicable to CLIRuntime or LocalRuntime.',
)
def test_bash_python_version(temp_dir, runtime_cls, base_container_image):
"""Make sure Python is available in bash."""

View File

@@ -3,6 +3,7 @@
import os
from unittest.mock import patch
import pytest
from conftest import _close_test_runtime, _load_runtime
from openhands.events.action import CmdRunAction
@@ -83,6 +84,10 @@ def test_env_vars_added_by_config(temp_dir, runtime_cls):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') in ['cli', 'local'],
reason='This test is specific to DockerRuntime and its pause/resume persistence',
)
def test_docker_runtime_env_vars_persist_after_restart(temp_dir):
from openhands.runtime.impl.docker.docker_runtime import DockerRuntime

View File

@@ -1,5 +1,8 @@
"""Tests for the command helper functions in function_calling.py."""
import os
import pytest
from conftest import (
_close_test_runtime,
_load_runtime,
@@ -13,6 +16,15 @@ from openhands.core.logger import openhands_logger as logger
from openhands.events.action import CmdRunAction
from openhands.events.observation import CmdOutputObservation, ErrorObservation
# Skip all tests in this file if running with CLIRuntime,
# as they depend on `rg` (ripgrep) which is not guaranteed to be available.
# The underlying ReadOnlyAgent tools (GrepTool, GlobTool) also currently depend on `rg`.
# TODO: implement a fallback version of these tools that uses `find` and `grep`.
pytestmark = pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason="CLIRuntime: ReadOnlyAgent's GrepTool/GlobTool tests require `rg` (ripgrep), which may not be installed.",
)
def _run_cmd_action(runtime, custom_command: str):
action = CmdRunAction(command=custom_command)

View File

@@ -1,5 +1,7 @@
"""Test the DockerRuntime, which connects to the ActionExecutor running in the sandbox."""
import os
import pytest
from conftest import (
TEST_IN_CI,
@@ -27,6 +29,10 @@ from openhands.events.observation import (
# ============================================================================================================================
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support full IPython/Jupyter kernel features or return IPythonRunCellObservation',
)
def test_simple_cmd_ipython_and_fileop(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
@@ -99,6 +105,10 @@ def test_simple_cmd_ipython_and_fileop(temp_dir, runtime_cls, run_as_openhands):
TEST_IN_CI != 'True',
reason='This test is not working in WSL (file ownership)',
)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support full IPython/Jupyter kernel features or return IPythonRunCellObservation',
)
def test_ipython_multi_user(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
@@ -171,6 +181,10 @@ def test_ipython_multi_user(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support full IPython/Jupyter kernel features or return IPythonRunCellObservation',
)
def test_ipython_simple(temp_dir, runtime_cls):
runtime, config = _load_runtime(temp_dir, runtime_cls)
@@ -194,6 +208,10 @@ def test_ipython_simple(temp_dir, runtime_cls):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support full IPython/Jupyter kernel features or return IPythonRunCellObservation',
)
def test_ipython_chdir(temp_dir, runtime_cls):
"""Test that os.chdir correctly handles paths with slashes."""
runtime, config = _load_runtime(temp_dir, runtime_cls)
@@ -240,6 +258,10 @@ shutil.rmtree('test_dir', ignore_errors=True)
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support IPython magics like %pip or return IPythonRunCellObservation',
)
def test_ipython_package_install(temp_dir, runtime_cls, run_as_openhands):
"""Make sure that cd in bash also update the current working directory in ipython."""
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
@@ -275,6 +297,10 @@ def test_ipython_package_install(temp_dir, runtime_cls, run_as_openhands):
_close_test_runtime(runtime)
@pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support sudo with password prompts if the user has not enabled passwordless sudo',
)
def test_ipython_file_editor_permissions_as_openhands(temp_dir, runtime_cls):
"""Test file editor permission behavior when running as different users."""
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands=True)

View File

@@ -22,6 +22,11 @@ from openhands.events.observation import CmdOutputObservation, MCPObservation
# Bash-specific tests
# ============================================================================================================================
pytestmark = pytest.mark.skipif(
os.environ.get('TEST_RUNTIME') == 'cli',
reason='CLIRuntime does not support MCP actions',
)
@pytest.fixture
def sse_mcp_docker_server():
@@ -291,11 +296,11 @@ async def test_microagent_and_one_stdio_mcp_in_config(
# NOTE: this simulate the case where the microagent adds a new stdio server to the runtime
# but that stdio server is not in the initial config
# Actual invocation of the microagent involves `add_mcp_tools_to_agent`
# which will call `get_updated_mcp_config` with the stdio server from microagent's config
# which will call `get_mcp_config` with the stdio server from microagent's config
fetch_config = MCPStdioServerConfig(
name='fetch', command='uvx', args=['mcp-server-fetch']
)
updated_config = runtime.get_updated_mcp_config([fetch_config])
updated_config = runtime.get_mcp_config([fetch_config])
logger.info(f'updated_config: {updated_config}')
# ======= Test the stdio server in the config =======

View File

@@ -226,7 +226,7 @@ async def test_add_mcp_tools_from_microagents():
# Configure the mock runtime
mock_runtime.runtime_initialized = True
mock_runtime.get_updated_mcp_config.return_value = mock_microagent_mcp_config
mock_runtime.get_mcp_config.return_value = mock_microagent_mcp_config
# Mock the fetch_mcp_tools_from_config function to return a mock tool
mock_tool = {
@@ -250,9 +250,9 @@ async def test_add_mcp_tools_from_microagents():
# Verify that the memory's get_microagent_mcp_tools was called
mock_memory.get_microagent_mcp_tools.assert_called_once()
# Verify that the runtime's get_updated_mcp_config was called with the extra stdio servers
mock_runtime.get_updated_mcp_config.assert_called_once()
args, kwargs = mock_runtime.get_updated_mcp_config.call_args
# Verify that the runtime's get_mcp_config was called with the extra stdio servers
mock_runtime.get_mcp_config.assert_called_once()
args, kwargs = mock_runtime.get_mcp_config.call_args
assert len(args) == 1
assert len(args[0]) == 1
assert args[0][0].name == 'test-tool'

View File

@@ -1,6 +1,7 @@
"""Replay tests"""
import asyncio
from pathlib import Path
from conftest import _close_test_runtime, _load_runtime
@@ -22,7 +23,9 @@ def _get_config(trajectory_name: str, agent: str = OH_DEFAULT_AGENT):
# do not mount workspace
workspace_base=None,
workspace_mount_path=None,
replay_trajectory_path=f'./tests/runtime/trajs/{trajectory_name}.json',
replay_trajectory_path=str(
(Path(__file__).parent / 'trajs' / f'{trajectory_name}.json').resolve()
),
)
@@ -32,7 +35,10 @@ def test_simple_replay(temp_dir, runtime_cls, run_as_openhands):
(creating a simple 2048 game), using the default agent
"""
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
config.replay_trajectory_path = './tests/runtime/trajs/basic.json'
config.replay_trajectory_path = str(
(Path(__file__).parent / 'trajs' / 'basic.json').resolve()
)
config.security.confirmation_mode = False
state: State | None = asyncio.run(
run_controller(
@@ -61,6 +67,7 @@ def test_simple_gui_replay(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
config = _get_config('basic_gui_mode')
config.security.confirmation_mode = False
state: State | None = asyncio.run(
run_controller(
@@ -87,7 +94,10 @@ def test_replay_wrong_initial_state(temp_dir, runtime_cls, run_as_openhands):
meaningless.
"""
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
config.replay_trajectory_path = './tests/runtime/trajs/wrong_initial_state.json'
config.replay_trajectory_path = str(
(Path(__file__).parent / 'trajs' / 'wrong_initial_state.json').resolve()
)
config.security.confirmation_mode = False
state: State | None = asyncio.run(
run_controller(
@@ -121,6 +131,7 @@ def test_replay_basic_interactions(temp_dir, runtime_cls, run_as_openhands):
runtime, config = _load_runtime(temp_dir, runtime_cls, run_as_openhands)
config = _get_config('basic_interactions')
config.security.confirmation_mode = False
state: State | None = asyncio.run(
run_controller(

View File

@@ -0,0 +1,135 @@
import pytest
from openhands.core.config.agent_config import AgentConfig
from openhands.core.config.app_config import AppConfig
from openhands.core.config.utils import finalize_config, load_from_env, load_from_toml
# Define a dummy agent name often used in tests or as a default
DEFAULT_AGENT_NAME = 'CodeActAgent'
def test_finalize_config_cli_disables_jupyter_and_browsing_when_true():
"""
Test that finalize_config sets enable_jupyter and enable_browsing to False
when runtime is 'cli' and they were initially True.
"""
app_config = AppConfig()
app_config.runtime = 'cli'
agent_config = AgentConfig(enable_jupyter=True, enable_browsing=True)
app_config.agents[DEFAULT_AGENT_NAME] = agent_config
finalize_config(app_config)
assert not app_config.agents[DEFAULT_AGENT_NAME].enable_jupyter, \
"enable_jupyter should be False when runtime is 'cli'"
assert not app_config.agents[DEFAULT_AGENT_NAME].enable_browsing, \
"enable_browsing should be False when runtime is 'cli'"
def test_finalize_config_cli_keeps_jupyter_and_browsing_false_when_false():
"""
Test that finalize_config keeps enable_jupyter and enable_browsing as False
when runtime is 'cli' and they were initially False.
"""
app_config = AppConfig()
app_config.runtime = 'cli'
agent_config = AgentConfig(enable_jupyter=False, enable_browsing=False)
app_config.agents[DEFAULT_AGENT_NAME] = agent_config
finalize_config(app_config)
assert not app_config.agents[DEFAULT_AGENT_NAME].enable_jupyter, \
"enable_jupyter should remain False when runtime is 'cli' and initially False"
assert not app_config.agents[DEFAULT_AGENT_NAME].enable_browsing, \
"enable_browsing should remain False when runtime is 'cli' and initially False"
def test_finalize_config_other_runtime_keeps_jupyter_and_browsing_true_by_default():
"""
Test that finalize_config keeps enable_jupyter and enable_browsing as True (default)
when runtime is not 'cli'.
"""
app_config = AppConfig()
app_config.runtime = 'docker' # A non-cli runtime
# AgentConfig defaults enable_jupyter and enable_browsing to True
agent_config = AgentConfig()
app_config.agents[DEFAULT_AGENT_NAME] = agent_config
finalize_config(app_config)
assert app_config.agents[DEFAULT_AGENT_NAME].enable_jupyter, \
"enable_jupyter should remain True by default for non-cli runtimes"
assert app_config.agents[DEFAULT_AGENT_NAME].enable_browsing, \
"enable_browsing should remain True by default for non-cli runtimes"
def test_finalize_config_other_runtime_keeps_jupyter_and_browsing_false_if_set():
"""
Test that finalize_config keeps enable_jupyter and enable_browsing as False
when runtime is not 'cli' but they were explicitly set to False.
"""
app_config = AppConfig()
app_config.runtime = 'docker' # A non-cli runtime
agent_config = AgentConfig(enable_jupyter=False, enable_browsing=False)
app_config.agents[DEFAULT_AGENT_NAME] = agent_config
finalize_config(app_config)
assert not app_config.agents[DEFAULT_AGENT_NAME].enable_jupyter, \
"enable_jupyter should remain False for non-cli runtimes if explicitly set to False"
assert not app_config.agents[DEFAULT_AGENT_NAME].enable_browsing, \
"enable_browsing should remain False for non-cli runtimes if explicitly set to False"
def test_finalize_config_no_agents_defined():
"""
Test that finalize_config runs without error if no agents are defined in the config,
even when runtime is 'cli'.
"""
app_config = AppConfig()
app_config.runtime = 'cli'
# No agents are added to app_config.agents
try:
finalize_config(app_config)
except Exception as e:
pytest.fail(f"finalize_config raised an exception with no agents defined: {e}")
def test_finalize_config_multiple_agents_cli_runtime():
"""
Test that finalize_config correctly disables jupyter and browsing for multiple agents
when runtime is 'cli'.
"""
app_config = AppConfig()
app_config.runtime = 'cli'
agent_config1 = AgentConfig(enable_jupyter=True, enable_browsing=True)
agent_config2 = AgentConfig(enable_jupyter=True, enable_browsing=True)
app_config.agents['Agent1'] = agent_config1
app_config.agents['Agent2'] = agent_config2
finalize_config(app_config)
assert not app_config.agents['Agent1'].enable_jupyter, "Jupyter should be disabled for Agent1"
assert not app_config.agents['Agent1'].enable_browsing, "Browsing should be disabled for Agent1"
assert not app_config.agents['Agent2'].enable_jupyter, "Jupyter should be disabled for Agent2"
assert not app_config.agents['Agent2'].enable_browsing, "Browsing should be disabled for Agent2"
def test_finalize_config_multiple_agents_other_runtime():
"""
Test that finalize_config correctly keeps jupyter and browsing enabled (or as set)
for multiple agents when runtime is not 'cli'.
"""
app_config = AppConfig()
app_config.runtime = 'docker'
agent_config1 = AgentConfig(enable_jupyter=True, enable_browsing=True) # Defaults
agent_config2 = AgentConfig(enable_jupyter=False, enable_browsing=False) # Explicitly false
app_config.agents['Agent1'] = agent_config1
app_config.agents['Agent2'] = agent_config2
finalize_config(app_config)
assert app_config.agents['Agent1'].enable_jupyter, "Jupyter should be True for Agent1"
assert app_config.agents['Agent1'].enable_browsing, "Browsing should be True for Agent1"
assert not app_config.agents['Agent2'].enable_jupyter, "Jupyter should be False for Agent2"
assert not app_config.agents['Agent2'].enable_browsing, "Browsing should be False for Agent2"

View File

@@ -137,13 +137,14 @@ def test_help_message(capsys):
'--config-file CONFIG_FILE',
'--no-auto-continue',
'--selected-repo SELECTED_REPO',
'--override-cli-mode OVERRIDE_CLI_MODE',
]
for element in expected_elements:
assert element in help_output, f"Expected '{element}' to be in the help message"
option_count = help_output.count(' -')
assert option_count == 19, f'Expected 19 options, found {option_count}'
assert option_count == 20, f'Expected 20 options, found {option_count}'
def test_selected_repo_format():

View File

@@ -76,6 +76,7 @@ async def test_cleanup_session_cancels_pending_tasks(
# Run cleanup session directly from the test task
await cli.cleanup_session(loop, mock_agent, mock_runtime, mock_controller)
await asyncio.sleep(0)
# Check that the other task was indeed cancelled
assert other_task.cancelled() or other_task_cancelled is True

View File

@@ -0,0 +1,82 @@
"""Test CLIRuntime class."""
import os
import tempfile
import pytest
from openhands.core.config import AppConfig
from openhands.core.exceptions import LLMMalformedActionError
from openhands.events import EventStream
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
from openhands.storage import get_file_store
@pytest.fixture
def temp_dir():
"""Create a temporary directory for testing."""
with tempfile.TemporaryDirectory() as temp_dir:
yield temp_dir
@pytest.fixture
def cli_runtime(temp_dir):
"""Create a CLIRuntime instance for testing."""
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('test', file_store)
config = AppConfig()
config.workspace_base = temp_dir
runtime = CLIRuntime(config, event_stream)
runtime._runtime_initialized = True # Skip initialization
return runtime
def test_sanitize_filename_valid_path(cli_runtime):
"""Test _sanitize_filename with a valid path."""
test_path = os.path.join(cli_runtime._workspace_path, 'test.txt')
sanitized_path = cli_runtime._sanitize_filename(test_path)
assert sanitized_path == os.path.realpath(test_path)
def test_sanitize_filename_relative_path(cli_runtime):
"""Test _sanitize_filename with a relative path."""
test_path = 'test.txt'
expected_path = os.path.join(cli_runtime._workspace_path, test_path)
sanitized_path = cli_runtime._sanitize_filename(test_path)
assert sanitized_path == os.path.realpath(expected_path)
def test_sanitize_filename_outside_workspace(cli_runtime):
"""Test _sanitize_filename with a path outside the workspace."""
test_path = '/tmp/test.txt' # Path outside workspace
with pytest.raises(LLMMalformedActionError) as exc_info:
cli_runtime._sanitize_filename(test_path)
assert 'Invalid path:' in str(exc_info.value)
assert 'You can only work with files in' in str(exc_info.value)
def test_sanitize_filename_path_traversal(cli_runtime):
"""Test _sanitize_filename with path traversal attempt."""
test_path = os.path.join(cli_runtime._workspace_path, '..', 'test.txt')
with pytest.raises(LLMMalformedActionError) as exc_info:
cli_runtime._sanitize_filename(test_path)
assert 'Invalid path traversal:' in str(exc_info.value)
assert 'Path resolves outside the workspace' in str(exc_info.value)
def test_sanitize_filename_absolute_path_with_dots(cli_runtime):
"""Test _sanitize_filename with absolute path containing dots."""
test_path = os.path.join(cli_runtime._workspace_path, 'subdir', '..', 'test.txt')
# Create the parent directory
os.makedirs(os.path.join(cli_runtime._workspace_path, 'subdir'), exist_ok=True)
sanitized_path = cli_runtime._sanitize_filename(test_path)
assert sanitized_path == os.path.join(cli_runtime._workspace_path, 'test.txt')
def test_sanitize_filename_nested_path(cli_runtime):
"""Test _sanitize_filename with a nested path."""
nested_dir = os.path.join(cli_runtime._workspace_path, 'dir1', 'dir2')
os.makedirs(nested_dir, exist_ok=True)
test_path = os.path.join(nested_dir, 'test.txt')
sanitized_path = cli_runtime._sanitize_filename(test_path)
assert sanitized_path == os.path.realpath(test_path)

View File

@@ -5,6 +5,7 @@ import pytest
from pydantic import SecretStr
from openhands.core.config import AppConfig
from openhands.core.config.mcp_config import MCPConfig, MCPStdioServerConfig
from openhands.events.action import Action
from openhands.events.action.commands import CmdRunAction
from openhands.events.observation import NullObservation, Observation
@@ -65,6 +66,14 @@ class TestRuntime(Runtime):
def call_tool_mcp(self, action):
return NullObservation(content='')
def edit(self, action):
return NullObservation(content='')
def get_mcp_config(
self, extra_stdio_servers: list[MCPStdioServerConfig] | None = None
):
return MCPConfig()
@pytest.fixture
def temp_dir(tmp_path_factory: pytest.TempPathFactory) -> str: