fix(mcp): fix SSE MCP server connection & add tests (#8353)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Xingyao Wang 2025-05-09 01:26:49 +08:00 committed by GitHub
parent c6c94d979b
commit 11f32d2465
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 215 additions and 19 deletions

View File

@ -359,26 +359,34 @@ class ActionExecutionClient(Runtime):
server.model_dump(mode='json')
for server in updated_mcp_config.stdio_servers
]
self.log('debug', f'Updating MCP server to: {stdio_tools}')
response = self._send_action_server_request(
'POST',
f'{self.action_execution_server_url}/update_mcp_server',
json=stdio_tools,
timeout=10,
)
if response.status_code != 200:
raise RuntimeError(f'Failed to update MCP server: {response.text}')
# No API key by default. Child runtime can override this when appropriate
updated_mcp_config.sse_servers.append(
MCPSSEServerConfig(
url=self.action_execution_server_url.rstrip('/') + '/sse', api_key=None
if len(stdio_tools) > 0:
self.log('debug', f'Updating MCP server to: {stdio_tools}')
response = self._send_action_server_request(
'POST',
f'{self.action_execution_server_url}/update_mcp_server',
json=stdio_tools,
timeout=10,
)
if response.status_code != 200:
self.log('warning', f'Failed to update MCP server: {response.text}')
# No API key by default. Child runtime can override this when appropriate
updated_mcp_config.sse_servers.append(
MCPSSEServerConfig(
url=self.action_execution_server_url.rstrip('/') + '/sse',
api_key=None,
)
)
self.log(
'info',
f'Updated MCP config: {updated_mcp_config.sse_servers}',
)
else:
self.log(
'debug',
'MCP servers inside runtime is not updated since no stdio servers are provided',
)
)
self.log(
'info',
f'Updated MCP config: {updated_mcp_config.sse_servers}',
)
return updated_mcp_config
async def call_tool_mcp(self, action: MCPAction) -> Observation:

View File

@ -2,7 +2,10 @@
import json
import os
import socket
import time
import docker
import pytest
from conftest import (
_load_runtime,
@ -10,7 +13,7 @@ from conftest import (
import openhands
from openhands.core.config import MCPConfig
from openhands.core.config.mcp_config import MCPStdioServerConfig
from openhands.core.config.mcp_config import MCPSSEServerConfig, MCPStdioServerConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events.action import CmdRunAction, MCPAction
from openhands.events.observation import CmdOutputObservation, MCPObservation
@ -20,6 +23,84 @@ from openhands.events.observation import CmdOutputObservation, MCPObservation
# ============================================================================================================================
@pytest.fixture
def sse_mcp_docker_server():
"""Manages the lifecycle of the SSE MCP Docker container for tests, using a random available port."""
image_name = 'supercorp/supergateway'
# Find a free port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
host_port = s.getsockname()[1]
container_internal_port = (
8000 # The port the MCP server listens on *inside* the container
)
container_command_args = [
'--stdio',
'npx -y @modelcontextprotocol/server-filesystem /',
'--port',
str(container_internal_port), # MCP server inside container listens on this
'--baseUrl',
f'http://localhost:{host_port}', # The URL used to access the server from the host
]
client = docker.from_env()
container = None
log_streamer = None
# Import LogStreamer here as it's specific to this fixture's needs
from openhands.runtime.utils.log_streamer import LogStreamer
try:
logger.info(
f'Starting Docker container {image_name} with command: {" ".join(container_command_args)} '
f'and mapping internal port {container_internal_port} to host port {host_port}',
extra={'msg_type': 'ACTION'},
)
container = client.containers.run(
image_name,
command=container_command_args,
ports={
f'{container_internal_port}/tcp': host_port
}, # Map container's internal port to the random host port
detach=True,
auto_remove=True,
stdin_open=True,
)
logger.info(
f'Container {container.short_id} started, listening on host port {host_port}.'
)
log_streamer = LogStreamer(
container,
lambda level, msg: getattr(logger, level.lower())(
f'[MCP server {container.short_id}] {msg}'
),
)
# Wait for the server to initialize, as in the original tests
time.sleep(10)
yield {'url': f'http://localhost:{host_port}/sse'}
finally:
if container:
logger.info(f'Stopping container {container.short_id}...')
try:
container.stop(timeout=5)
logger.info(
f'Container {container.short_id} stopped (and should be auto-removed).'
)
except docker.errors.NotFound:
logger.info(
f'Container {container.short_id} not found, likely already stopped and removed.'
)
except Exception as e:
logger.error(f'Error stopping container {container.short_id}: {e}')
if log_streamer:
log_streamer.close()
def test_default_activated_tools():
project_root = os.path.dirname(openhands.__file__)
mcp_config_path = os.path.join(project_root, 'runtime', 'mcp', 'config.json')
@ -76,3 +157,110 @@ async def test_fetch_mcp_via_stdio(temp_dir, runtime_cls, run_as_openhands):
)
runtime.close()
@pytest.mark.asyncio
async def test_filesystem_mcp_via_sse(
temp_dir, runtime_cls, run_as_openhands, sse_mcp_docker_server
):
sse_server_info = sse_mcp_docker_server
sse_url = sse_server_info['url']
runtime = None
try:
mcp_sse_server_config = MCPSSEServerConfig(url=sse_url)
override_mcp_config = MCPConfig(sse_servers=[mcp_sse_server_config])
runtime, config = _load_runtime(
temp_dir,
runtime_cls,
run_as_openhands,
override_mcp_config=override_mcp_config,
)
mcp_action = MCPAction(name='list_directory', arguments={'path': '.'})
obs = await runtime.call_tool_mcp(mcp_action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs, MCPObservation), (
'The observation should be a MCPObservation.'
)
assert '[FILE] .dockerenv' in obs.content
finally:
if runtime:
runtime.close()
# Container and log_streamer cleanup is handled by the sse_mcp_docker_server fixture
@pytest.mark.asyncio
async def test_both_stdio_and_sse_mcp(
temp_dir, runtime_cls, run_as_openhands, sse_mcp_docker_server
):
sse_server_info = sse_mcp_docker_server
sse_url = sse_server_info['url']
runtime = None
try:
mcp_sse_server_config = MCPSSEServerConfig(url=sse_url)
# Also add stdio server
mcp_stdio_server_config = MCPStdioServerConfig(
name='fetch', command='uvx', args=['mcp-server-fetch']
)
override_mcp_config = MCPConfig(
sse_servers=[mcp_sse_server_config], stdio_servers=[mcp_stdio_server_config]
)
runtime, config = _load_runtime(
temp_dir,
runtime_cls,
run_as_openhands,
override_mcp_config=override_mcp_config,
)
# ======= Test SSE server =======
mcp_action_sse = MCPAction(name='list_directory', arguments={'path': '.'})
obs_sse = await runtime.call_tool_mcp(mcp_action_sse)
logger.info(obs_sse, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs_sse, MCPObservation), (
'The observation should be a MCPObservation.'
)
assert '[FILE] .dockerenv' in obs_sse.content
# ======= Test stdio server =======
# Test browser server
action_cmd_http = CmdRunAction(
command='python3 -m http.server 8000 > server.log 2>&1 &'
)
logger.info(action_cmd_http, extra={'msg_type': 'ACTION'})
obs_http = runtime.run_action(action_cmd_http)
logger.info(obs_http, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs_http, CmdOutputObservation)
assert obs_http.exit_code == 0
assert '[1]' in obs_http.content
action_cmd_cat = CmdRunAction(command='sleep 3 && cat server.log')
logger.info(action_cmd_cat, extra={'msg_type': 'ACTION'})
obs_cat = runtime.run_action(action_cmd_cat)
logger.info(obs_cat, extra={'msg_type': 'OBSERVATION'})
assert obs_cat.exit_code == 0
mcp_action_fetch = MCPAction(
name='fetch', arguments={'url': 'http://localhost:8000'}
)
obs_fetch = await runtime.call_tool_mcp(mcp_action_fetch)
logger.info(obs_fetch, extra={'msg_type': 'OBSERVATION'})
assert isinstance(obs_fetch, MCPObservation), (
'The observation should be a MCPObservation.'
)
result_json = json.loads(obs_fetch.content)
assert not result_json['isError']
assert len(result_json['content']) == 1
assert result_json['content'][0]['type'] == 'text'
assert (
result_json['content'][0]['text']
== 'Contents of http://localhost:8000/:\n---\n\n* <server.log>\n\n---'
)
finally:
if runtime:
runtime.close()
# SSE Docker container cleanup is handled by the sse_mcp_docker_server fixture