OpenHands/openhands/cli/commands.py

858 lines
27 KiB
Python

import asyncio
import os
import sys
from pathlib import Path
from typing import Any
import toml
from prompt_toolkit import HTML, print_formatted_text
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.shortcuts import clear, print_container
from prompt_toolkit.widgets import Frame, TextArea
from pydantic import ValidationError
from openhands.cli.settings import (
display_settings,
modify_llm_settings_advanced,
modify_llm_settings_basic,
modify_search_api_settings,
)
from openhands.cli.tui import (
COLOR_GREY,
UsageMetrics,
cli_confirm,
create_prompt_session,
display_help,
display_mcp_errors,
display_shutdown_message,
display_status,
read_prompt_input,
)
from openhands.cli.utils import (
add_local_config_trusted_dir,
get_local_config_trusted_dirs,
read_file,
write_to_file,
)
from openhands.core.config import (
OpenHandsConfig,
)
from openhands.core.config.mcp_config import (
MCPSHTTPServerConfig,
MCPSSEServerConfig,
MCPStdioServerConfig,
)
from openhands.core.schema import AgentState
from openhands.core.schema.exit_reason import ExitReason
from openhands.events import EventSource
from openhands.events.action import (
ChangeAgentStateAction,
MessageAction,
)
from openhands.events.stream import EventStream
from openhands.storage.settings.file_settings_store import FileSettingsStore
async def collect_input(config: OpenHandsConfig, prompt_text: str) -> str | None:
"""Collect user input with cancellation support.
Args:
config: OpenHands configuration
prompt_text: Text to display to user
Returns:
str | None: User input string, or None if user cancelled
"""
print_formatted_text(prompt_text, end=' ')
user_input = await read_prompt_input(config, '', multiline=False)
# Check for cancellation
if user_input.strip().lower() in ['/exit', '/cancel', 'cancel']:
return None
return user_input.strip()
def restart_cli() -> None:
"""Restart the CLI by replacing the current process."""
print_formatted_text('🔄 Restarting OpenHands CLI...')
# Get the current Python executable and script arguments
python_executable = sys.executable
script_args = sys.argv
# Use os.execv to replace the current process
# This preserves the original command line arguments
try:
os.execv(python_executable, [python_executable] + script_args)
except Exception as e:
print_formatted_text(f'❌ Failed to restart CLI: {e}')
print_formatted_text(
'Please restart OpenHands manually for changes to take effect.'
)
async def prompt_for_restart(config: OpenHandsConfig) -> bool:
"""Prompt user if they want to restart the CLI and return their choice."""
print_formatted_text('📝 MCP server configuration updated successfully!')
print_formatted_text('The changes will take effect after restarting OpenHands.')
prompt_session = create_prompt_session(config)
while True:
try:
with patch_stdout():
response = await prompt_session.prompt_async(
HTML(
'<gold>Would you like to restart OpenHands now? (y/n): </gold>'
)
)
response = response.strip().lower() if response else ''
if response in ['y', 'yes']:
return True
elif response in ['n', 'no']:
return False
else:
print_formatted_text('Please enter "y" for yes or "n" for no.')
except (KeyboardInterrupt, EOFError):
return False
async def handle_commands(
command: str,
event_stream: EventStream,
usage_metrics: UsageMetrics,
sid: str,
config: OpenHandsConfig,
current_dir: str,
settings_store: FileSettingsStore,
agent_state: str,
) -> tuple[bool, bool, bool, ExitReason]:
close_repl = False
reload_microagents = False
new_session_requested = False
exit_reason = ExitReason.ERROR
if command == '/exit':
close_repl = handle_exit_command(
config,
event_stream,
usage_metrics,
sid,
)
if close_repl:
exit_reason = ExitReason.INTENTIONAL
elif command == '/help':
handle_help_command()
elif command == '/init':
close_repl, reload_microagents = await handle_init_command(
config, event_stream, current_dir
)
elif command == '/status':
handle_status_command(usage_metrics, sid)
elif command == '/new':
close_repl, new_session_requested = handle_new_command(
config, event_stream, usage_metrics, sid
)
if close_repl:
exit_reason = ExitReason.INTENTIONAL
elif command == '/settings':
await handle_settings_command(config, settings_store)
elif command == '/resume':
close_repl, new_session_requested = await handle_resume_command(
event_stream, agent_state
)
elif command == '/mcp':
await handle_mcp_command(config)
else:
close_repl = True
action = MessageAction(content=command)
event_stream.add_event(action, EventSource.USER)
return close_repl, reload_microagents, new_session_requested, exit_reason
def handle_exit_command(
config: OpenHandsConfig,
event_stream: EventStream,
usage_metrics: UsageMetrics,
sid: str,
) -> bool:
close_repl = False
confirm_exit = (
cli_confirm(config, '\nTerminate session?', ['Yes, proceed', 'No, dismiss'])
== 0
)
if confirm_exit:
event_stream.add_event(
ChangeAgentStateAction(AgentState.STOPPED),
EventSource.ENVIRONMENT,
)
display_shutdown_message(usage_metrics, sid)
close_repl = True
return close_repl
def handle_help_command() -> None:
display_help()
async def handle_init_command(
config: OpenHandsConfig, event_stream: EventStream, current_dir: str
) -> tuple[bool, bool]:
REPO_MD_CREATE_PROMPT = """
Please explore this repository. Create the file .openhands/microagents/repo.md with:
- A description of the project
- An overview of the file structure
- Any information on how to run tests or other relevant commands
- Any other information that would be helpful to a brand new developer
Keep it short--just a few paragraphs will do.
"""
close_repl = False
reload_microagents = False
if config.runtime in ('local', 'cli'):
init_repo = await init_repository(config, current_dir)
if init_repo:
event_stream.add_event(
MessageAction(content=REPO_MD_CREATE_PROMPT),
EventSource.USER,
)
reload_microagents = True
close_repl = True
else:
print_formatted_text(
'\nRepository initialization through the CLI is only supported for CLI and local runtimes.\n'
)
return close_repl, reload_microagents
def handle_status_command(usage_metrics: UsageMetrics, sid: str) -> None:
display_status(usage_metrics, sid)
def handle_new_command(
config: OpenHandsConfig,
event_stream: EventStream,
usage_metrics: UsageMetrics,
sid: str,
) -> tuple[bool, bool]:
close_repl = False
new_session_requested = False
new_session_requested = (
cli_confirm(
config,
'\nCurrent session will be terminated and you will lose the conversation history.\n\nContinue?',
['Yes, proceed', 'No, dismiss'],
)
== 0
)
if new_session_requested:
close_repl = True
new_session_requested = True
event_stream.add_event(
ChangeAgentStateAction(AgentState.STOPPED),
EventSource.ENVIRONMENT,
)
display_shutdown_message(usage_metrics, sid)
return close_repl, new_session_requested
async def handle_settings_command(
config: OpenHandsConfig,
settings_store: FileSettingsStore,
) -> None:
display_settings(config)
modify_settings = cli_confirm(
config,
'\nWhich settings would you like to modify?',
[
'LLM (Basic)',
'LLM (Advanced)',
'Search API (Optional)',
'Go back',
],
)
if modify_settings == 0:
await modify_llm_settings_basic(config, settings_store)
elif modify_settings == 1:
await modify_llm_settings_advanced(config, settings_store)
elif modify_settings == 2:
await modify_search_api_settings(config, settings_store)
# FIXME: Currently there's an issue with the actual 'resume' behavior.
# Setting the agent state to RUNNING will currently freeze the agent without continuing with the rest of the task.
# This is a workaround to handle the resume command for the time being. Replace user message with the state change event once the issue is fixed.
async def handle_resume_command(
event_stream: EventStream,
agent_state: str,
) -> tuple[bool, bool]:
close_repl = True
new_session_requested = False
if agent_state != AgentState.PAUSED:
close_repl = False
print_formatted_text(
HTML(
'<ansired>Error: Agent is not paused. /resume command is only available when agent is paused.</ansired>'
)
)
return close_repl, new_session_requested
event_stream.add_event(
MessageAction(content='continue'),
EventSource.USER,
)
# event_stream.add_event(
# ChangeAgentStateAction(AgentState.RUNNING),
# EventSource.ENVIRONMENT,
# )
return close_repl, new_session_requested
async def init_repository(config: OpenHandsConfig, current_dir: str) -> bool:
repo_file_path = Path(current_dir) / '.openhands' / 'microagents' / 'repo.md'
init_repo = False
if repo_file_path.exists():
try:
# Path.exists() ensures repo_file_path is not None, so we can safely pass it to read_file
content = await asyncio.get_event_loop().run_in_executor(
None, read_file, repo_file_path
)
print_formatted_text(
'Repository instructions file (repo.md) already exists.\n'
)
container = Frame(
TextArea(
text=content,
read_only=True,
style=COLOR_GREY,
wrap_lines=True,
),
title='Repository Instructions (repo.md)',
style=f'fg:{COLOR_GREY}',
)
print_container(container)
print_formatted_text('') # Add a newline after the frame
init_repo = (
cli_confirm(
config,
'Do you want to re-initialize?',
['Yes, re-initialize', 'No, dismiss'],
)
== 0
)
if init_repo:
write_to_file(repo_file_path, '')
except Exception:
print_formatted_text('Error reading repository instructions file (repo.md)')
init_repo = False
else:
print_formatted_text(
'\nRepository instructions file will be created by exploring the repository.\n'
)
init_repo = (
cli_confirm(
config,
'Do you want to proceed?',
['Yes, create', 'No, dismiss'],
)
== 0
)
return init_repo
def check_folder_security_agreement(config: OpenHandsConfig, current_dir: str) -> bool:
# Directories trusted by user for the CLI to use as workspace
# Config from ~/.openhands/config.toml overrides the app config
app_config_trusted_dirs = config.sandbox.trusted_dirs
local_config_trusted_dirs = get_local_config_trusted_dirs()
trusted_dirs = local_config_trusted_dirs
if not local_config_trusted_dirs:
trusted_dirs = app_config_trusted_dirs
is_trusted = current_dir in trusted_dirs
if not is_trusted:
security_frame = Frame(
TextArea(
text=(
f' Do you trust the files in this folder?\n\n'
f' {current_dir}\n\n'
' OpenHands may read and execute files in this folder with your permission.'
),
style=COLOR_GREY,
read_only=True,
wrap_lines=True,
),
style=f'fg:{COLOR_GREY}',
)
clear()
print_container(security_frame)
print_formatted_text('')
confirm = (
cli_confirm(
config, 'Do you wish to continue?', ['Yes, proceed', 'No, exit']
)
== 0
)
if confirm:
add_local_config_trusted_dir(current_dir)
return confirm
return True
async def handle_mcp_command(config: OpenHandsConfig) -> None:
"""Handle MCP command with interactive menu."""
action = cli_confirm(
config,
'MCP Server Configuration',
[
'List configured servers',
'Add new server',
'Remove server',
'View errors',
'Go back',
],
)
if action == 0: # List
display_mcp_servers(config)
elif action == 1: # Add
await add_mcp_server(config)
elif action == 2: # Remove
await remove_mcp_server(config)
elif action == 3: # View errors
handle_mcp_errors_command()
# action == 4 is "Go back", do nothing
def display_mcp_servers(config: OpenHandsConfig) -> None:
"""Display MCP server configuration information."""
mcp_config = config.mcp
# Count the different types of servers
sse_count = len(mcp_config.sse_servers)
stdio_count = len(mcp_config.stdio_servers)
shttp_count = len(mcp_config.shttp_servers)
total_count = sse_count + stdio_count + shttp_count
if total_count == 0:
print_formatted_text(
'No custom MCP servers configured. See the documentation to learn more:\n'
' https://docs.all-hands.dev/usage/how-to/cli-mode#using-mcp-servers'
)
else:
print_formatted_text(
f'Configured MCP servers:\n'
f' • SSE servers: {sse_count}\n'
f' • Stdio servers: {stdio_count}\n'
f' • SHTTP servers: {shttp_count}\n'
f' • Total: {total_count}'
)
# Show details for each type if they exist
if sse_count > 0:
print_formatted_text('SSE Servers:')
for idx, sse_server in enumerate(mcp_config.sse_servers, 1):
print_formatted_text(f' {idx}. {sse_server.url}')
print_formatted_text('')
if stdio_count > 0:
print_formatted_text('Stdio Servers:')
for idx, stdio_server in enumerate(mcp_config.stdio_servers, 1):
print_formatted_text(
f' {idx}. {stdio_server.name} ({stdio_server.command})'
)
print_formatted_text('')
if shttp_count > 0:
print_formatted_text('SHTTP Servers:')
for idx, shttp_server in enumerate(mcp_config.shttp_servers, 1):
print_formatted_text(f' {idx}. {shttp_server.url}')
print_formatted_text('')
def handle_mcp_errors_command() -> None:
"""Display MCP connection errors."""
display_mcp_errors()
def get_config_file_path() -> Path:
"""Get the path to the config file. By default, we use config.toml in the current working directory. If not found, we use ~/.openhands/config.toml."""
# Check if config.toml exists in the current directory
current_dir = Path.cwd() / 'config.toml'
if current_dir.exists():
return current_dir
# Fallback to the user's home directory
return Path.home() / '.openhands' / 'config.toml'
def load_config_file(file_path: Path) -> dict:
"""Load the config file, creating it if it doesn't exist."""
if file_path.exists():
try:
with open(file_path, 'r') as f:
return toml.load(f)
except Exception:
pass
# Create directory if it doesn't exist
file_path.parent.mkdir(parents=True, exist_ok=True)
return {}
def save_config_file(config_data: dict, file_path: Path) -> None:
"""Save the config file."""
with open(file_path, 'w') as f:
toml.dump(config_data, f)
def _ensure_mcp_config_structure(config_data: dict) -> None:
"""Ensure MCP configuration structure exists in config data."""
if 'mcp' not in config_data:
config_data['mcp'] = {}
def _add_server_to_config(server_type: str, server_config: dict) -> Path:
"""Add a server configuration to the config file."""
config_file_path = get_config_file_path()
config_data = load_config_file(config_file_path)
_ensure_mcp_config_structure(config_data)
if server_type not in config_data['mcp']:
config_data['mcp'][server_type] = []
config_data['mcp'][server_type].append(server_config)
save_config_file(config_data, config_file_path)
return config_file_path
async def add_mcp_server(config: OpenHandsConfig) -> None:
"""Add a new MCP server configuration."""
# Choose transport type
transport_type = cli_confirm(
config,
'Select MCP server transport type:',
[
'SSE (Server-Sent Events)',
'Stdio (Standard Input/Output)',
'SHTTP (Streamable HTTP)',
'Cancel',
],
)
if transport_type == 3: # Cancel
return
try:
if transport_type == 0: # SSE
await add_sse_server(config)
elif transport_type == 1: # Stdio
await add_stdio_server(config)
elif transport_type == 2: # SHTTP
await add_shttp_server(config)
except Exception as e:
print_formatted_text(f'Error adding MCP server: {e}')
async def add_sse_server(config: OpenHandsConfig) -> None:
"""Add an SSE MCP server."""
print_formatted_text('Adding SSE MCP Server')
while True: # Retry loop for the entire form
# Collect all inputs
url = await collect_input(config, '\nEnter server URL:')
if url is None:
print_formatted_text('Operation cancelled.')
return
api_key = await collect_input(
config, '\nEnter API key (optional, press Enter to skip):'
)
if api_key is None:
print_formatted_text('Operation cancelled.')
return
# Convert empty string to None for optional field
api_key = api_key if api_key else None
# Validate all inputs at once
try:
server = MCPSSEServerConfig(url=url, api_key=api_key)
break # Success - exit retry loop
except ValidationError as e:
# Show all errors at once
print_formatted_text('❌ Please fix the following errors:')
for error in e.errors():
field = error['loc'][0] if error['loc'] else 'unknown'
print_formatted_text(f'{field}: {error["msg"]}')
if cli_confirm(config, '\nTry again?') != 0:
print_formatted_text('Operation cancelled.')
return
# Save to config file
server_config = {'url': server.url}
if server.api_key:
server_config['api_key'] = server.api_key
config_file_path = _add_server_to_config('sse_servers', server_config)
print_formatted_text(f'✓ SSE MCP server added to {config_file_path}: {server.url}')
# Prompt for restart
if await prompt_for_restart(config):
restart_cli()
async def add_stdio_server(config: OpenHandsConfig) -> None:
"""Add a Stdio MCP server."""
print_formatted_text('Adding Stdio MCP Server')
# Get existing server names to check for duplicates
existing_names = [server.name for server in config.mcp.stdio_servers]
while True: # Retry loop for the entire form
# Collect all inputs
name = await collect_input(config, '\nEnter server name:')
if name is None:
print_formatted_text('Operation cancelled.')
return
command = await collect_input(config, "\nEnter command (e.g., 'uvx', 'npx'):")
if command is None:
print_formatted_text('Operation cancelled.')
return
args_input = await collect_input(
config,
'\nEnter arguments (optional, e.g., "-y server-package arg1"):',
)
if args_input is None:
print_formatted_text('Operation cancelled.')
return
env_input = await collect_input(
config,
'\nEnter environment variables (KEY=VALUE format, comma-separated, optional):',
)
if env_input is None:
print_formatted_text('Operation cancelled.')
return
# Check for duplicate server names
if name in existing_names:
print_formatted_text(f"❌ Server name '{name}' already exists.")
if cli_confirm(config, '\nTry again?') != 0:
print_formatted_text('Operation cancelled.')
return
continue
# Validate all inputs at once
try:
server = MCPStdioServerConfig(
name=name,
command=command,
args=args_input, # type: ignore # Will be parsed by Pydantic validator
env=env_input, # type: ignore # Will be parsed by Pydantic validator
)
break # Success - exit retry loop
except ValidationError as e:
# Show all errors at once
print_formatted_text('❌ Please fix the following errors:')
for error in e.errors():
field = error['loc'][0] if error['loc'] else 'unknown'
print_formatted_text(f'{field}: {error["msg"]}')
if cli_confirm(config, '\nTry again?') != 0:
print_formatted_text('Operation cancelled.')
return
# Save to config file
server_config: dict[str, Any] = {
'name': server.name,
'command': server.command,
}
if server.args:
server_config['args'] = server.args
if server.env:
server_config['env'] = server.env
config_file_path = _add_server_to_config('stdio_servers', server_config)
print_formatted_text(
f'✓ Stdio MCP server added to {config_file_path}: {server.name}'
)
# Prompt for restart
if await prompt_for_restart(config):
restart_cli()
async def add_shttp_server(config: OpenHandsConfig) -> None:
"""Add an SHTTP MCP server."""
print_formatted_text('Adding SHTTP MCP Server')
while True: # Retry loop for the entire form
# Collect all inputs
url = await collect_input(config, '\nEnter server URL:')
if url is None:
print_formatted_text('Operation cancelled.')
return
api_key = await collect_input(
config, '\nEnter API key (optional, press Enter to skip):'
)
if api_key is None:
print_formatted_text('Operation cancelled.')
return
# Convert empty string to None for optional field
api_key = api_key if api_key else None
# Validate all inputs at once
try:
server = MCPSHTTPServerConfig(url=url, api_key=api_key)
break # Success - exit retry loop
except ValidationError as e:
# Show all errors at once
print_formatted_text('❌ Please fix the following errors:')
for error in e.errors():
field = error['loc'][0] if error['loc'] else 'unknown'
print_formatted_text(f'{field}: {error["msg"]}')
if cli_confirm(config, '\nTry again?') != 0:
print_formatted_text('Operation cancelled.')
return
# Save to config file
server_config = {'url': server.url}
if server.api_key:
server_config['api_key'] = server.api_key
config_file_path = _add_server_to_config('shttp_servers', server_config)
print_formatted_text(
f'✓ SHTTP MCP server added to {config_file_path}: {server.url}'
)
# Prompt for restart
if await prompt_for_restart(config):
restart_cli()
async def remove_mcp_server(config: OpenHandsConfig) -> None:
"""Remove an MCP server configuration."""
mcp_config = config.mcp
# Collect all servers with their types
servers: list[tuple[str, str, object]] = []
# Add SSE servers
for sse_server in mcp_config.sse_servers:
servers.append(('SSE', sse_server.url, sse_server))
# Add Stdio servers
for stdio_server in mcp_config.stdio_servers:
servers.append(('Stdio', stdio_server.name, stdio_server))
# Add SHTTP servers
for shttp_server in mcp_config.shttp_servers:
servers.append(('SHTTP', shttp_server.url, shttp_server))
if not servers:
print_formatted_text('No MCP servers configured to remove.')
return
# Create choices for the user
choices = []
for server_type, identifier, _ in servers:
choices.append(f'{server_type}: {identifier}')
choices.append('Cancel')
# Let user choose which server to remove
choice = cli_confirm(config, 'Select MCP server to remove:', choices)
if choice == len(choices) - 1: # Cancel
return
# Remove the selected server
server_type, identifier, _ = servers[choice]
# Confirm removal
confirm = cli_confirm(
config,
f'Are you sure you want to remove {server_type} server "{identifier}"?',
['Yes, remove', 'Cancel'],
)
if confirm == 1: # Cancel
return
# Load config file and remove the server
config_file_path = get_config_file_path()
config_data = load_config_file(config_file_path)
_ensure_mcp_config_structure(config_data)
removed = False
if server_type == 'SSE' and 'sse_servers' in config_data['mcp']:
config_data['mcp']['sse_servers'] = [
s for s in config_data['mcp']['sse_servers'] if s.get('url') != identifier
]
removed = True
elif server_type == 'Stdio' and 'stdio_servers' in config_data['mcp']:
config_data['mcp']['stdio_servers'] = [
s
for s in config_data['mcp']['stdio_servers']
if s.get('name') != identifier
]
removed = True
elif server_type == 'SHTTP' and 'shttp_servers' in config_data['mcp']:
config_data['mcp']['shttp_servers'] = [
s for s in config_data['mcp']['shttp_servers'] if s.get('url') != identifier
]
removed = True
if removed:
save_config_file(config_data, config_file_path)
print_formatted_text(
f'{server_type} MCP server "{identifier}" removed from {config_file_path}.'
)
# Prompt for restart
if await prompt_for_restart(config):
restart_cli()
else:
print_formatted_text(f'Failed to remove {server_type} server "{identifier}".')