Fix Docker runtime port allocation race condition (#9810)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Graham Neubig
2025-07-22 18:12:31 -04:00
committed by GitHub
parent 556ec9ab1a
commit 7a168b9b5f
3 changed files with 587 additions and 15 deletions

View File

@@ -31,6 +31,7 @@ from openhands.runtime.utils.command import (
get_action_execution_server_startup_command,
)
from openhands.runtime.utils.log_streamer import LogStreamer
from openhands.runtime.utils.port_lock import PortLock, find_available_port_with_lock
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.shutdown_listener import add_shutdown_listener
@@ -104,6 +105,11 @@ class DockerRuntime(ActionExecutionClient):
self._vscode_port = -1
self._app_ports: list[int] = []
# Port locks to prevent race conditions
self._host_port_lock: PortLock | None = None
self._vscode_port_lock: PortLock | None = None
self._app_port_locks: list[PortLock] = []
if os.environ.get('DOCKER_HOST_ADDR'):
logger.info(
f'Using DOCKER_HOST_IP: {os.environ["DOCKER_HOST_ADDR"]} for local_runtime_url'
@@ -276,17 +282,31 @@ class DockerRuntime(ActionExecutionClient):
def init_container(self) -> None:
self.log('debug', 'Preparing to start container...')
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
self._host_port = self._find_available_port(EXECUTION_SERVER_PORT_RANGE)
self._container_port = self._host_port
# Use the configured vscode_port if provided, otherwise find an available port
self._vscode_port = (
self.config.sandbox.vscode_port
or self._find_available_port(VSCODE_PORT_RANGE)
# Allocate host port with locking to prevent race conditions
self._host_port, self._host_port_lock = self._find_available_port_with_lock(
EXECUTION_SERVER_PORT_RANGE
)
self._app_ports = [
self._find_available_port(APP_PORT_RANGE_1),
self._find_available_port(APP_PORT_RANGE_2),
self._container_port = self._host_port
# Use the configured vscode_port if provided, otherwise find an available port
if self.config.sandbox.vscode_port:
self._vscode_port = self.config.sandbox.vscode_port
self._vscode_port_lock = None # No lock needed for configured port
else:
self._vscode_port, self._vscode_port_lock = (
self._find_available_port_with_lock(VSCODE_PORT_RANGE)
)
# Allocate app ports with locking
app_port_1, app_lock_1 = self._find_available_port_with_lock(APP_PORT_RANGE_1)
app_port_2, app_lock_2 = self._find_available_port_with_lock(APP_PORT_RANGE_2)
self._app_ports = [app_port_1, app_port_2]
self._app_port_locks = [
lock for lock in [app_lock_1, app_lock_2] if lock is not None
]
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
use_host_network = self.config.sandbox.use_host_network
@@ -476,6 +496,28 @@ class DockerRuntime(ActionExecutionClient):
CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name
)
stop_all_containers(close_prefix)
self._release_port_locks()
def _release_port_locks(self) -> None:
"""Release all acquired port locks."""
if self._host_port_lock:
self._host_port_lock.release()
self._host_port_lock = None
logger.debug(f'Released host port lock for port {self._host_port}')
if self._vscode_port_lock:
self._vscode_port_lock.release()
self._vscode_port_lock = None
logger.debug(f'Released VSCode port lock for port {self._vscode_port}')
for i, lock in enumerate(self._app_port_locks):
if lock:
lock.release()
logger.debug(
f'Released app port lock for port {self._app_ports[i] if i < len(self._app_ports) else "unknown"}'
)
self._app_port_locks.clear()
def _is_port_in_use_docker(self, port: int) -> bool:
containers = self.docker_client.containers.list()
@@ -485,15 +527,58 @@ class DockerRuntime(ActionExecutionClient):
return True
return False
def _find_available_port_with_lock(
self, port_range: tuple[int, int], max_attempts: int = 5
) -> tuple[int, PortLock | None]:
"""Find an available port with race condition protection.
This method uses file-based locking to prevent multiple workers
from allocating the same port simultaneously.
Args:
port_range: Tuple of (min_port, max_port)
max_attempts: Maximum number of attempts to find a port
Returns:
Tuple of (port_number, port_lock) where port_lock may be None if locking failed
"""
# Try to find and lock an available port
result = find_available_port_with_lock(
min_port=port_range[0],
max_port=port_range[1],
max_attempts=max_attempts,
bind_address='0.0.0.0',
lock_timeout=1.0,
)
if result is None:
# Fallback to original method if port locking fails
logger.warning(
f'Port locking failed for range {port_range}, falling back to original method'
)
port = port_range[1]
for _ in range(max_attempts):
port = find_available_tcp_port(port_range[0], port_range[1])
if not self._is_port_in_use_docker(port):
return port, None
return port, None
port, port_lock = result
# Additional check with Docker to ensure port is not in use
if self._is_port_in_use_docker(port):
port_lock.release()
# Try again with a different port
logger.debug(f'Port {port} is in use by Docker, trying again')
return self._find_available_port_with_lock(port_range, max_attempts - 1)
return port, port_lock
def _find_available_port(
self, port_range: tuple[int, int], max_attempts: int = 5
) -> int:
port = port_range[1]
for _ in range(max_attempts):
port = find_available_tcp_port(port_range[0], port_range[1])
if not self._is_port_in_use_docker(port):
return port
# If no port is found after max_attempts, return the last tried port
"""Find an available port (legacy method for backward compatibility)."""
port, _ = self._find_available_port_with_lock(port_range, max_attempts)
return port
@property

View File

@@ -0,0 +1,268 @@
"""File-based port locking system for preventing race conditions in port allocation."""
import os
import random
import socket
import tempfile
import time
from typing import Optional
from openhands.core.logger import openhands_logger as logger
# Import fcntl only on Unix systems
try:
import fcntl
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
class PortLock:
"""File-based lock for a specific port to prevent race conditions."""
def __init__(self, port: int, lock_dir: Optional[str] = None):
self.port = port
self.lock_dir = lock_dir or os.path.join(
tempfile.gettempdir(), 'openhands_port_locks'
)
self.lock_file_path = os.path.join(self.lock_dir, f'port_{port}.lock')
self.lock_fd: Optional[int] = None
self._locked = False
# Ensure lock directory exists
os.makedirs(self.lock_dir, exist_ok=True)
def acquire(self, timeout: float = 1.0) -> bool:
"""Acquire the lock for this port.
Args:
timeout: Maximum time to wait for the lock
Returns:
True if lock was acquired, False otherwise
"""
if self._locked:
return True
try:
if HAS_FCNTL:
# Unix-style file locking with fcntl
self.lock_fd = os.open(
self.lock_file_path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC
)
# Try to acquire exclusive lock with timeout
start_time = time.time()
while time.time() - start_time < timeout:
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
self._locked = True
# Write port number to lock file for debugging
os.write(self.lock_fd, f'{self.port}\n'.encode())
os.fsync(self.lock_fd)
logger.debug(f'Acquired lock for port {self.port}')
return True
except (OSError, IOError):
# Lock is held by another process, wait a bit
time.sleep(0.01)
# Timeout reached
if self.lock_fd:
os.close(self.lock_fd)
self.lock_fd = None
return False
else:
# Windows fallback: use atomic file creation
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Try to create lock file exclusively
self.lock_fd = os.open(
self.lock_file_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY
)
self._locked = True
# Write port number to lock file for debugging
os.write(self.lock_fd, f'{self.port}\n'.encode())
os.fsync(self.lock_fd)
logger.debug(f'Acquired lock for port {self.port}')
return True
except OSError:
# Lock file already exists, wait a bit
time.sleep(0.01)
# Timeout reached
return False
except Exception as e:
logger.debug(f'Failed to acquire lock for port {self.port}: {e}')
if self.lock_fd:
try:
os.close(self.lock_fd)
except OSError:
pass
self.lock_fd = None
return False
def release(self) -> None:
"""Release the lock."""
if self.lock_fd is not None:
try:
if HAS_FCNTL:
# Unix: unlock and close
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
os.close(self.lock_fd)
# Remove lock file (both Unix and Windows)
try:
os.unlink(self.lock_file_path)
except FileNotFoundError:
pass
logger.debug(f'Released lock for port {self.port}')
except Exception as e:
logger.warning(f'Error releasing lock for port {self.port}: {e}')
finally:
self.lock_fd = None
self._locked = False
def __enter__(self) -> 'PortLock':
if not self.acquire():
raise OSError(f'Could not acquire lock for port {self.port}')
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.release()
@property
def is_locked(self) -> bool:
return self._locked
def find_available_port_with_lock(
min_port: int = 30000,
max_port: int = 39999,
max_attempts: int = 20,
bind_address: str = '0.0.0.0',
lock_timeout: float = 1.0,
) -> Optional[tuple[int, PortLock]]:
"""Find an available port and acquire a lock for it.
This function combines file-based locking with port availability checking
to prevent race conditions in multi-process scenarios.
Args:
min_port: Minimum port number to try
max_port: Maximum port number to try
max_attempts: Maximum number of ports to try
bind_address: Address to bind to when checking availability
lock_timeout: Timeout for acquiring port lock
Returns:
Tuple of (port, lock) if successful, None otherwise
"""
rng = random.SystemRandom()
# Try random ports first for better distribution
random_attempts = min(max_attempts // 2, 10)
for _ in range(random_attempts):
port = rng.randint(min_port, max_port)
# Try to acquire lock first
lock = PortLock(port)
if lock.acquire(timeout=lock_timeout):
# Check if port is actually available
if _check_port_available(port, bind_address):
logger.debug(f'Found and locked available port {port}')
return port, lock
else:
# Port is locked but not available (maybe in TIME_WAIT state)
lock.release()
# Small delay to reduce contention
time.sleep(0.001)
# If random attempts failed, try sequential search
remaining_attempts = max_attempts - random_attempts
start_port = rng.randint(min_port, max_port - remaining_attempts)
for i in range(remaining_attempts):
port = start_port + i
if port > max_port:
port = min_port + (port - max_port - 1)
# Try to acquire lock first
lock = PortLock(port)
if lock.acquire(timeout=lock_timeout):
# Check if port is actually available
if _check_port_available(port, bind_address):
logger.debug(f'Found and locked available port {port}')
return port, lock
else:
# Port is locked but not available
lock.release()
# Small delay to reduce contention
time.sleep(0.001)
logger.error(
f'Could not find and lock available port in range {min_port}-{max_port} after {max_attempts} attempts'
)
return None
def _check_port_available(port: int, bind_address: str = '0.0.0.0') -> bool:
"""Check if a port is available by trying to bind to it."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind_address, port))
sock.close()
return True
except OSError:
return False
def cleanup_stale_locks(max_age_seconds: int = 300) -> int:
"""Clean up stale lock files.
Args:
max_age_seconds: Maximum age of lock files before they're considered stale
Returns:
Number of lock files cleaned up
"""
lock_dir = os.path.join(tempfile.gettempdir(), 'openhands_port_locks')
if not os.path.exists(lock_dir):
return 0
cleaned = 0
current_time = time.time()
try:
for filename in os.listdir(lock_dir):
if filename.startswith('port_') and filename.endswith('.lock'):
lock_path = os.path.join(lock_dir, filename)
try:
# Check if lock file is old
stat = os.stat(lock_path)
if current_time - stat.st_mtime > max_age_seconds:
# Try to remove stale lock
os.unlink(lock_path)
cleaned += 1
logger.debug(f'Cleaned up stale lock file: {filename}')
except (OSError, FileNotFoundError):
# File might have been removed by another process
pass
except OSError:
# Directory might not exist or be accessible
pass
if cleaned > 0:
logger.info(f'Cleaned up {cleaned} stale port lock files')
return cleaned

View File

@@ -0,0 +1,219 @@
"""Test for port allocation race condition fix."""
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from openhands.runtime.utils.port_lock import PortLock, find_available_port_with_lock
class TestPortLockingFix:
"""Test cases for port allocation race condition fix."""
def test_port_lock_prevents_duplicate_allocation(self):
"""Test that port locking prevents duplicate port allocation."""
allocated_ports = []
port_locks = []
def allocate_port():
"""Simulate port allocation by multiple workers."""
result = find_available_port_with_lock(
min_port=30000,
max_port=30010, # Small range to force conflicts
max_attempts=5,
bind_address='0.0.0.0',
lock_timeout=2.0,
)
if result:
port, lock = result
allocated_ports.append(port)
port_locks.append(lock)
# Simulate some work time
time.sleep(0.1)
return port
return None
# Run multiple threads concurrently
num_workers = 8
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(allocate_port) for _ in range(num_workers)]
results = [future.result() for future in as_completed(futures)]
# Filter out None results
successful_ports = [port for port in results if port is not None]
# Verify no duplicate ports were allocated
assert len(successful_ports) == len(set(successful_ports)), (
f'Duplicate ports allocated: {successful_ports}'
)
# Clean up locks
for lock in port_locks:
if lock:
lock.release()
print(
f'Successfully allocated {len(successful_ports)} unique ports: {successful_ports}'
)
def test_port_lock_basic_functionality(self):
"""Test basic port lock functionality."""
port = 30001
# Test acquiring and releasing a lock
lock1 = PortLock(port)
assert lock1.acquire(timeout=1.0)
assert lock1.is_locked
# Test that another lock cannot acquire the same port
lock2 = PortLock(port)
assert not lock2.acquire(timeout=0.1)
assert not lock2.is_locked
# Release first lock
lock1.release()
assert not lock1.is_locked
# Now second lock should be able to acquire
assert lock2.acquire(timeout=1.0)
assert lock2.is_locked
lock2.release()
def test_port_lock_context_manager(self):
"""Test port lock context manager functionality."""
port = 30002
# Test successful context manager usage
with PortLock(port) as lock:
assert lock.is_locked
# Test that another lock cannot acquire while in context
lock2 = PortLock(port)
assert not lock2.acquire(timeout=0.1)
# After context, lock should be released
assert not lock.is_locked
# Now another lock should be able to acquire
lock3 = PortLock(port)
assert lock3.acquire(timeout=1.0)
lock3.release()
def test_concurrent_port_allocation_stress_test(self):
"""Stress test concurrent port allocation."""
allocated_ports = []
port_locks = []
errors = []
def worker_allocate_port(worker_id):
"""Worker function that allocates a port."""
try:
result = find_available_port_with_lock(
min_port=31000,
max_port=31020, # Small range to force contention
max_attempts=10,
bind_address='0.0.0.0',
lock_timeout=3.0,
)
if result:
port, lock = result
allocated_ports.append((worker_id, port))
port_locks.append(lock)
# Simulate work
time.sleep(0.05)
return port
else:
errors.append(f'Worker {worker_id}: No port available')
return None
except Exception as e:
errors.append(f'Worker {worker_id}: {str(e)}')
return None
# Run many workers concurrently
num_workers = 15
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {
executor.submit(worker_allocate_port, i): i for i in range(num_workers)
}
results = {}
for future in as_completed(futures):
worker_id = futures[future]
try:
result = future.result()
results[worker_id] = result
except Exception as e:
errors.append(f'Worker {worker_id} exception: {str(e)}')
# Analyze results
successful_allocations = [
(wid, port) for wid, port in allocated_ports if port is not None
]
allocated_port_numbers = [port for _, port in successful_allocations]
print(f'Successful allocations: {len(successful_allocations)}')
print(f'Allocated ports: {allocated_port_numbers}')
print(f'Errors: {len(errors)}')
if errors:
print(f'Error details: {errors[:5]}') # Show first 5 errors
# Verify no duplicate ports
unique_ports = set(allocated_port_numbers)
assert len(allocated_port_numbers) == len(unique_ports), (
f'Duplicate ports found: {allocated_port_numbers}'
)
# Clean up locks
for lock in port_locks:
if lock:
lock.release()
def test_port_allocation_without_locking_shows_race_condition(self):
"""Test that demonstrates race condition without locking."""
from openhands.runtime.utils import find_available_tcp_port
allocated_ports = []
def allocate_port_without_lock():
"""Simulate port allocation without locking (old method)."""
# This simulates the old behavior that had race conditions
port = find_available_tcp_port(32000, 32010)
allocated_ports.append(port)
# Small delay to increase chance of race condition
time.sleep(0.01)
return port
# Run multiple threads concurrently
num_workers = 10
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [
executor.submit(allocate_port_without_lock) for _ in range(num_workers)
]
results = [future.result() for future in as_completed(futures)]
# Check if we got duplicate ports (race condition)
unique_ports = set(results)
duplicates_found = len(results) != len(unique_ports)
print(
f'Without locking - Total ports: {len(results)}, Unique: {len(unique_ports)}'
)
print(f'Ports allocated: {results}')
print(f'Race condition detected: {duplicates_found}')
# This test demonstrates the problem exists without locking
# In a real race condition scenario, we might get duplicates
# But since the race window is small, we'll just verify the test runs
assert len(results) == num_workers
if __name__ == '__main__':
test = TestPortLockingFix()
test.test_port_lock_prevents_duplicate_allocation()
test.test_port_lock_basic_functionality()
test.test_port_lock_context_manager()
test.test_concurrent_port_allocation_stress_test()
test.test_port_allocation_without_locking_shows_race_condition()
print('All tests passed!')