diff --git a/openhands/runtime/impl/docker/docker_runtime.py b/openhands/runtime/impl/docker/docker_runtime.py index 03dfb7b949..29230693cc 100644 --- a/openhands/runtime/impl/docker/docker_runtime.py +++ b/openhands/runtime/impl/docker/docker_runtime.py @@ -31,6 +31,7 @@ from openhands.runtime.utils.command import ( get_action_execution_server_startup_command, ) from openhands.runtime.utils.log_streamer import LogStreamer +from openhands.runtime.utils.port_lock import PortLock, find_available_port_with_lock from openhands.runtime.utils.runtime_build import build_runtime_image from openhands.utils.async_utils import call_sync_from_async from openhands.utils.shutdown_listener import add_shutdown_listener @@ -104,6 +105,11 @@ class DockerRuntime(ActionExecutionClient): self._vscode_port = -1 self._app_ports: list[int] = [] + # Port locks to prevent race conditions + self._host_port_lock: PortLock | None = None + self._vscode_port_lock: PortLock | None = None + self._app_port_locks: list[PortLock] = [] + if os.environ.get('DOCKER_HOST_ADDR'): logger.info( f'Using DOCKER_HOST_IP: {os.environ["DOCKER_HOST_ADDR"]} for local_runtime_url' @@ -276,17 +282,31 @@ class DockerRuntime(ActionExecutionClient): def init_container(self) -> None: self.log('debug', 'Preparing to start container...') self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME) - self._host_port = self._find_available_port(EXECUTION_SERVER_PORT_RANGE) - self._container_port = self._host_port - # Use the configured vscode_port if provided, otherwise find an available port - self._vscode_port = ( - self.config.sandbox.vscode_port - or self._find_available_port(VSCODE_PORT_RANGE) + + # Allocate host port with locking to prevent race conditions + self._host_port, self._host_port_lock = self._find_available_port_with_lock( + EXECUTION_SERVER_PORT_RANGE ) - self._app_ports = [ - self._find_available_port(APP_PORT_RANGE_1), - self._find_available_port(APP_PORT_RANGE_2), + self._container_port = self._host_port + + # Use the configured vscode_port if provided, otherwise find an available port + if self.config.sandbox.vscode_port: + self._vscode_port = self.config.sandbox.vscode_port + self._vscode_port_lock = None # No lock needed for configured port + else: + self._vscode_port, self._vscode_port_lock = ( + self._find_available_port_with_lock(VSCODE_PORT_RANGE) + ) + + # Allocate app ports with locking + app_port_1, app_lock_1 = self._find_available_port_with_lock(APP_PORT_RANGE_1) + app_port_2, app_lock_2 = self._find_available_port_with_lock(APP_PORT_RANGE_2) + + self._app_ports = [app_port_1, app_port_2] + self._app_port_locks = [ + lock for lock in [app_lock_1, app_lock_2] if lock is not None ] + self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}' use_host_network = self.config.sandbox.use_host_network @@ -476,6 +496,28 @@ class DockerRuntime(ActionExecutionClient): CONTAINER_NAME_PREFIX if rm_all_containers else self.container_name ) stop_all_containers(close_prefix) + self._release_port_locks() + + def _release_port_locks(self) -> None: + """Release all acquired port locks.""" + if self._host_port_lock: + self._host_port_lock.release() + self._host_port_lock = None + logger.debug(f'Released host port lock for port {self._host_port}') + + if self._vscode_port_lock: + self._vscode_port_lock.release() + self._vscode_port_lock = None + logger.debug(f'Released VSCode port lock for port {self._vscode_port}') + + for i, lock in enumerate(self._app_port_locks): + if lock: + lock.release() + logger.debug( + f'Released app port lock for port {self._app_ports[i] if i < len(self._app_ports) else "unknown"}' + ) + + self._app_port_locks.clear() def _is_port_in_use_docker(self, port: int) -> bool: containers = self.docker_client.containers.list() @@ -485,15 +527,58 @@ class DockerRuntime(ActionExecutionClient): return True return False + def _find_available_port_with_lock( + self, port_range: tuple[int, int], max_attempts: int = 5 + ) -> tuple[int, PortLock | None]: + """Find an available port with race condition protection. + + This method uses file-based locking to prevent multiple workers + from allocating the same port simultaneously. + + Args: + port_range: Tuple of (min_port, max_port) + max_attempts: Maximum number of attempts to find a port + + Returns: + Tuple of (port_number, port_lock) where port_lock may be None if locking failed + """ + # Try to find and lock an available port + result = find_available_port_with_lock( + min_port=port_range[0], + max_port=port_range[1], + max_attempts=max_attempts, + bind_address='0.0.0.0', + lock_timeout=1.0, + ) + + if result is None: + # Fallback to original method if port locking fails + logger.warning( + f'Port locking failed for range {port_range}, falling back to original method' + ) + port = port_range[1] + for _ in range(max_attempts): + port = find_available_tcp_port(port_range[0], port_range[1]) + if not self._is_port_in_use_docker(port): + return port, None + return port, None + + port, port_lock = result + + # Additional check with Docker to ensure port is not in use + if self._is_port_in_use_docker(port): + port_lock.release() + # Try again with a different port + logger.debug(f'Port {port} is in use by Docker, trying again') + return self._find_available_port_with_lock(port_range, max_attempts - 1) + + return port, port_lock + def _find_available_port( self, port_range: tuple[int, int], max_attempts: int = 5 ) -> int: - port = port_range[1] - for _ in range(max_attempts): - port = find_available_tcp_port(port_range[0], port_range[1]) - if not self._is_port_in_use_docker(port): - return port - # If no port is found after max_attempts, return the last tried port + """Find an available port (legacy method for backward compatibility).""" + port, _ = self._find_available_port_with_lock(port_range, max_attempts) return port @property diff --git a/openhands/runtime/utils/port_lock.py b/openhands/runtime/utils/port_lock.py new file mode 100644 index 0000000000..ea784451c9 --- /dev/null +++ b/openhands/runtime/utils/port_lock.py @@ -0,0 +1,268 @@ +"""File-based port locking system for preventing race conditions in port allocation.""" + +import os +import random +import socket +import tempfile +import time +from typing import Optional + +from openhands.core.logger import openhands_logger as logger + +# Import fcntl only on Unix systems +try: + import fcntl + + HAS_FCNTL = True +except ImportError: + HAS_FCNTL = False + + +class PortLock: + """File-based lock for a specific port to prevent race conditions.""" + + def __init__(self, port: int, lock_dir: Optional[str] = None): + self.port = port + self.lock_dir = lock_dir or os.path.join( + tempfile.gettempdir(), 'openhands_port_locks' + ) + self.lock_file_path = os.path.join(self.lock_dir, f'port_{port}.lock') + self.lock_fd: Optional[int] = None + self._locked = False + + # Ensure lock directory exists + os.makedirs(self.lock_dir, exist_ok=True) + + def acquire(self, timeout: float = 1.0) -> bool: + """Acquire the lock for this port. + + Args: + timeout: Maximum time to wait for the lock + + Returns: + True if lock was acquired, False otherwise + """ + if self._locked: + return True + + try: + if HAS_FCNTL: + # Unix-style file locking with fcntl + self.lock_fd = os.open( + self.lock_file_path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC + ) + + # Try to acquire exclusive lock with timeout + start_time = time.time() + while time.time() - start_time < timeout: + try: + fcntl.flock(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + self._locked = True + + # Write port number to lock file for debugging + os.write(self.lock_fd, f'{self.port}\n'.encode()) + os.fsync(self.lock_fd) + + logger.debug(f'Acquired lock for port {self.port}') + return True + except (OSError, IOError): + # Lock is held by another process, wait a bit + time.sleep(0.01) + + # Timeout reached + if self.lock_fd: + os.close(self.lock_fd) + self.lock_fd = None + return False + else: + # Windows fallback: use atomic file creation + start_time = time.time() + while time.time() - start_time < timeout: + try: + # Try to create lock file exclusively + self.lock_fd = os.open( + self.lock_file_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY + ) + self._locked = True + + # Write port number to lock file for debugging + os.write(self.lock_fd, f'{self.port}\n'.encode()) + os.fsync(self.lock_fd) + + logger.debug(f'Acquired lock for port {self.port}') + return True + except OSError: + # Lock file already exists, wait a bit + time.sleep(0.01) + + # Timeout reached + return False + + except Exception as e: + logger.debug(f'Failed to acquire lock for port {self.port}: {e}') + if self.lock_fd: + try: + os.close(self.lock_fd) + except OSError: + pass + self.lock_fd = None + return False + + def release(self) -> None: + """Release the lock.""" + if self.lock_fd is not None: + try: + if HAS_FCNTL: + # Unix: unlock and close + fcntl.flock(self.lock_fd, fcntl.LOCK_UN) + + os.close(self.lock_fd) + + # Remove lock file (both Unix and Windows) + try: + os.unlink(self.lock_file_path) + except FileNotFoundError: + pass + logger.debug(f'Released lock for port {self.port}') + except Exception as e: + logger.warning(f'Error releasing lock for port {self.port}: {e}') + finally: + self.lock_fd = None + self._locked = False + + def __enter__(self) -> 'PortLock': + if not self.acquire(): + raise OSError(f'Could not acquire lock for port {self.port}') + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.release() + + @property + def is_locked(self) -> bool: + return self._locked + + +def find_available_port_with_lock( + min_port: int = 30000, + max_port: int = 39999, + max_attempts: int = 20, + bind_address: str = '0.0.0.0', + lock_timeout: float = 1.0, +) -> Optional[tuple[int, PortLock]]: + """Find an available port and acquire a lock for it. + + This function combines file-based locking with port availability checking + to prevent race conditions in multi-process scenarios. + + Args: + min_port: Minimum port number to try + max_port: Maximum port number to try + max_attempts: Maximum number of ports to try + bind_address: Address to bind to when checking availability + lock_timeout: Timeout for acquiring port lock + + Returns: + Tuple of (port, lock) if successful, None otherwise + """ + rng = random.SystemRandom() + + # Try random ports first for better distribution + random_attempts = min(max_attempts // 2, 10) + for _ in range(random_attempts): + port = rng.randint(min_port, max_port) + + # Try to acquire lock first + lock = PortLock(port) + if lock.acquire(timeout=lock_timeout): + # Check if port is actually available + if _check_port_available(port, bind_address): + logger.debug(f'Found and locked available port {port}') + return port, lock + else: + # Port is locked but not available (maybe in TIME_WAIT state) + lock.release() + + # Small delay to reduce contention + time.sleep(0.001) + + # If random attempts failed, try sequential search + remaining_attempts = max_attempts - random_attempts + start_port = rng.randint(min_port, max_port - remaining_attempts) + + for i in range(remaining_attempts): + port = start_port + i + if port > max_port: + port = min_port + (port - max_port - 1) + + # Try to acquire lock first + lock = PortLock(port) + if lock.acquire(timeout=lock_timeout): + # Check if port is actually available + if _check_port_available(port, bind_address): + logger.debug(f'Found and locked available port {port}') + return port, lock + else: + # Port is locked but not available + lock.release() + + # Small delay to reduce contention + time.sleep(0.001) + + logger.error( + f'Could not find and lock available port in range {min_port}-{max_port} after {max_attempts} attempts' + ) + return None + + +def _check_port_available(port: int, bind_address: str = '0.0.0.0') -> bool: + """Check if a port is available by trying to bind to it.""" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((bind_address, port)) + sock.close() + return True + except OSError: + return False + + +def cleanup_stale_locks(max_age_seconds: int = 300) -> int: + """Clean up stale lock files. + + Args: + max_age_seconds: Maximum age of lock files before they're considered stale + + Returns: + Number of lock files cleaned up + """ + lock_dir = os.path.join(tempfile.gettempdir(), 'openhands_port_locks') + if not os.path.exists(lock_dir): + return 0 + + cleaned = 0 + current_time = time.time() + + try: + for filename in os.listdir(lock_dir): + if filename.startswith('port_') and filename.endswith('.lock'): + lock_path = os.path.join(lock_dir, filename) + try: + # Check if lock file is old + stat = os.stat(lock_path) + if current_time - stat.st_mtime > max_age_seconds: + # Try to remove stale lock + os.unlink(lock_path) + cleaned += 1 + logger.debug(f'Cleaned up stale lock file: {filename}') + except (OSError, FileNotFoundError): + # File might have been removed by another process + pass + except OSError: + # Directory might not exist or be accessible + pass + + if cleaned > 0: + logger.info(f'Cleaned up {cleaned} stale port lock files') + + return cleaned diff --git a/tests/runtime/test_port_locking_fix.py b/tests/runtime/test_port_locking_fix.py new file mode 100644 index 0000000000..2564d2f7bd --- /dev/null +++ b/tests/runtime/test_port_locking_fix.py @@ -0,0 +1,219 @@ +"""Test for port allocation race condition fix.""" + +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +from openhands.runtime.utils.port_lock import PortLock, find_available_port_with_lock + + +class TestPortLockingFix: + """Test cases for port allocation race condition fix.""" + + def test_port_lock_prevents_duplicate_allocation(self): + """Test that port locking prevents duplicate port allocation.""" + allocated_ports = [] + port_locks = [] + + def allocate_port(): + """Simulate port allocation by multiple workers.""" + result = find_available_port_with_lock( + min_port=30000, + max_port=30010, # Small range to force conflicts + max_attempts=5, + bind_address='0.0.0.0', + lock_timeout=2.0, + ) + + if result: + port, lock = result + allocated_ports.append(port) + port_locks.append(lock) + # Simulate some work time + time.sleep(0.1) + return port + return None + + # Run multiple threads concurrently + num_workers = 8 + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(allocate_port) for _ in range(num_workers)] + results = [future.result() for future in as_completed(futures)] + + # Filter out None results + successful_ports = [port for port in results if port is not None] + + # Verify no duplicate ports were allocated + assert len(successful_ports) == len(set(successful_ports)), ( + f'Duplicate ports allocated: {successful_ports}' + ) + + # Clean up locks + for lock in port_locks: + if lock: + lock.release() + + print( + f'Successfully allocated {len(successful_ports)} unique ports: {successful_ports}' + ) + + def test_port_lock_basic_functionality(self): + """Test basic port lock functionality.""" + port = 30001 + + # Test acquiring and releasing a lock + lock1 = PortLock(port) + assert lock1.acquire(timeout=1.0) + assert lock1.is_locked + + # Test that another lock cannot acquire the same port + lock2 = PortLock(port) + assert not lock2.acquire(timeout=0.1) + assert not lock2.is_locked + + # Release first lock + lock1.release() + assert not lock1.is_locked + + # Now second lock should be able to acquire + assert lock2.acquire(timeout=1.0) + assert lock2.is_locked + + lock2.release() + + def test_port_lock_context_manager(self): + """Test port lock context manager functionality.""" + port = 30002 + + # Test successful context manager usage + with PortLock(port) as lock: + assert lock.is_locked + + # Test that another lock cannot acquire while in context + lock2 = PortLock(port) + assert not lock2.acquire(timeout=0.1) + + # After context, lock should be released + assert not lock.is_locked + + # Now another lock should be able to acquire + lock3 = PortLock(port) + assert lock3.acquire(timeout=1.0) + lock3.release() + + def test_concurrent_port_allocation_stress_test(self): + """Stress test concurrent port allocation.""" + allocated_ports = [] + port_locks = [] + errors = [] + + def worker_allocate_port(worker_id): + """Worker function that allocates a port.""" + try: + result = find_available_port_with_lock( + min_port=31000, + max_port=31020, # Small range to force contention + max_attempts=10, + bind_address='0.0.0.0', + lock_timeout=3.0, + ) + + if result: + port, lock = result + allocated_ports.append((worker_id, port)) + port_locks.append(lock) + # Simulate work + time.sleep(0.05) + return port + else: + errors.append(f'Worker {worker_id}: No port available') + return None + + except Exception as e: + errors.append(f'Worker {worker_id}: {str(e)}') + return None + + # Run many workers concurrently + num_workers = 15 + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = { + executor.submit(worker_allocate_port, i): i for i in range(num_workers) + } + results = {} + for future in as_completed(futures): + worker_id = futures[future] + try: + result = future.result() + results[worker_id] = result + except Exception as e: + errors.append(f'Worker {worker_id} exception: {str(e)}') + + # Analyze results + successful_allocations = [ + (wid, port) for wid, port in allocated_ports if port is not None + ] + allocated_port_numbers = [port for _, port in successful_allocations] + + print(f'Successful allocations: {len(successful_allocations)}') + print(f'Allocated ports: {allocated_port_numbers}') + print(f'Errors: {len(errors)}') + if errors: + print(f'Error details: {errors[:5]}') # Show first 5 errors + + # Verify no duplicate ports + unique_ports = set(allocated_port_numbers) + assert len(allocated_port_numbers) == len(unique_ports), ( + f'Duplicate ports found: {allocated_port_numbers}' + ) + + # Clean up locks + for lock in port_locks: + if lock: + lock.release() + + def test_port_allocation_without_locking_shows_race_condition(self): + """Test that demonstrates race condition without locking.""" + from openhands.runtime.utils import find_available_tcp_port + + allocated_ports = [] + + def allocate_port_without_lock(): + """Simulate port allocation without locking (old method).""" + # This simulates the old behavior that had race conditions + port = find_available_tcp_port(32000, 32010) + allocated_ports.append(port) + # Small delay to increase chance of race condition + time.sleep(0.01) + return port + + # Run multiple threads concurrently + num_workers = 10 + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [ + executor.submit(allocate_port_without_lock) for _ in range(num_workers) + ] + results = [future.result() for future in as_completed(futures)] + + # Check if we got duplicate ports (race condition) + unique_ports = set(results) + duplicates_found = len(results) != len(unique_ports) + + print( + f'Without locking - Total ports: {len(results)}, Unique: {len(unique_ports)}' + ) + print(f'Ports allocated: {results}') + print(f'Race condition detected: {duplicates_found}') + + # This test demonstrates the problem exists without locking + # In a real race condition scenario, we might get duplicates + # But since the race window is small, we'll just verify the test runs + assert len(results) == num_workers + + +if __name__ == '__main__': + test = TestPortLockingFix() + test.test_port_lock_prevents_duplicate_allocation() + test.test_port_lock_basic_functionality() + test.test_port_lock_context_manager() + test.test_concurrent_port_allocation_stress_test() + test.test_port_allocation_without_locking_shows_race_condition() + print('All tests passed!')