From ef12adc10720e1aa9a0d88ceae4a6241b34ef4ab Mon Sep 17 00:00:00 2001 From: "sp.wack" <83104063+amanape@users.noreply.github.com> Date: Fri, 26 Sep 2025 16:10:09 +0400 Subject: [PATCH] fix(backend): Add validation for LLM settings to prevent non-pro user bypass (#11113) --- openhands/server/routes/settings.py | 24 +- openhands/server/settings_validation.py | 122 +++++++ .../server/routes/test_settings_security.py | 329 ++++++++++++++++++ 3 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 openhands/server/settings_validation.py create mode 100644 tests/unit/server/routes/test_settings_security.py diff --git a/openhands/server/routes/settings.py b/openhands/server/routes/settings.py index 7a78ca7720..12c5700508 100644 --- a/openhands/server/routes/settings.py +++ b/openhands/server/routes/settings.py @@ -11,10 +11,15 @@ from openhands.server.routes.secrets import invalidate_legacy_secrets_store from openhands.server.settings import ( GETSettingsModel, ) +from openhands.server.settings_validation import ( + check_llm_settings_changes, + validate_llm_settings_access, +) from openhands.server.shared import config from openhands.server.user_auth import ( get_provider_tokens, get_secrets_store, + get_user_id, get_user_settings, get_user_settings_store, ) @@ -135,17 +140,34 @@ async def store_llm_settings( response_model=None, responses={ 200: {'description': 'Settings stored successfully', 'model': dict}, + 403: {'description': 'Subscription required for pro models', 'model': dict}, 500: {'description': 'Error storing settings', 'model': dict}, }, ) async def store_settings( settings: Settings, settings_store: SettingsStore = Depends(get_user_settings_store), + user_id: str = Depends(get_user_id), ) -> JSONResponse: - # Check provider tokens are valid try: existing_settings = await settings_store.load() + # Check if any LLM-related settings are being changed + llm_settings_being_changed = check_llm_settings_changes( + settings, existing_settings + ) + + if llm_settings_being_changed: + has_access = await validate_llm_settings_access(user_id) + if not has_access: + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + 'error': 'Modifying LLM settings requires an active OpenHands Pro subscription. Please upgrade your account to access LLM configuration.', + 'detail': 'Subscription required for LLM settings modifications', + }, + ) + # Convert to Settings model and merge with existing settings if existing_settings: settings = await store_llm_settings(settings, settings_store) diff --git a/openhands/server/settings_validation.py b/openhands/server/settings_validation.py new file mode 100644 index 0000000000..3d311e3cd1 --- /dev/null +++ b/openhands/server/settings_validation.py @@ -0,0 +1,122 @@ +"""Settings validation utilities for LLM settings access control.""" + +from openhands.core.logger import openhands_logger as logger +from openhands.server.shared import server_config +from openhands.server.types import AppMode +from openhands.storage.data_models.settings import Settings + + +def _is_llm_setting_changing(setting_name: str, new_value, existing_settings) -> bool: + """Check if a specific LLM setting is being changed from its existing value. + + Args: + setting_name: Name of the setting to check + new_value: New value being set + existing_settings: Existing settings object (can be None) + + Returns: + bool: True if the setting is being changed, False otherwise + """ + if new_value is None: + return False + + # Handle special case for enable_default_condenser with default value + if setting_name == 'enable_default_condenser': + if not existing_settings: + # First time setting - only validate if setting to non-default value + return not new_value + else: + # Changing existing value + return new_value != existing_settings.enable_default_condenser + + # For other settings, validate if explicitly provided and different from existing + if not existing_settings: + return True + + existing_value = getattr(existing_settings, setting_name, None) + return new_value != existing_value + + +def check_llm_settings_changes(settings: Settings, existing_settings) -> bool: + """Check if any LLM-related settings are being changed. + + Validates both core LLM settings (model, API key, base URL) and advanced settings + shown to SaaS users (confirmation mode, security analyzer, memory condenser settings). + + Args: + settings: New settings being applied + existing_settings: Current settings (can be None) + + Returns: + bool: True if any LLM settings are being changed, False otherwise + """ + # Core LLM settings - always validate if provided + core_llm_changes = any( + [ + settings.llm_model is not None, + settings.llm_api_key is not None, + settings.llm_base_url is not None, + ] + ) + + if core_llm_changes: + return True + + # Additional LLM settings shown to SaaS users - validate if actually changing + advanced_llm_changes = any( + [ + _is_llm_setting_changing( + 'confirmation_mode', settings.confirmation_mode, existing_settings + ), + _is_llm_setting_changing( + 'security_analyzer', settings.security_analyzer, existing_settings + ), + _is_llm_setting_changing( + 'enable_default_condenser', + settings.enable_default_condenser, + existing_settings, + ), + _is_llm_setting_changing( + 'condenser_max_size', settings.condenser_max_size, existing_settings + ), + ] + ) + + return advanced_llm_changes + + +async def validate_llm_settings_access(user_id: str) -> bool: + """Validate if user has access to modify LLM settings in SaaS mode. + + In SaaS mode, only pro users with active subscriptions can modify LLM settings. + + Args: + user_id: The user ID to check subscription for + + Returns: + bool: True if user can modify LLM settings, False otherwise + """ + # Skip validation in non-SaaS mode + if server_config.app_mode != AppMode.SAAS: + return True + + # In SaaS mode, check for active subscription for ANY LLM settings changes + try: + # Import here to avoid circular imports and handle enterprise mode gracefully + from enterprise.server.routes.billing import get_subscription_access + + subscription = await get_subscription_access(user_id) + # The get_subscription_access function already filters for ACTIVE status, + # so if we get a subscription back, it means it's active + return subscription is not None + except ImportError: + # Enterprise billing module not available - in SaaS mode, this means + # we can't validate subscriptions, so deny access to be safe + logger.warning( + 'Enterprise billing module not available in SaaS mode, denying LLM settings access' + ) + return False + except Exception as e: + # On error, deny access to be safe + logger.warning(f'Error checking subscription access for user {user_id}: {e}') + return False diff --git a/tests/unit/server/routes/test_settings_security.py b/tests/unit/server/routes/test_settings_security.py new file mode 100644 index 0000000000..3694e26873 --- /dev/null +++ b/tests/unit/server/routes/test_settings_security.py @@ -0,0 +1,329 @@ +"""Security tests for settings API to ensure pro-only features are properly validated on backend.""" + +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import Request +from fastapi.testclient import TestClient +from pydantic import SecretStr + +from openhands.integrations.provider import ProviderToken, ProviderType +from openhands.server.app import app +from openhands.server.types import AppMode +from openhands.server.user_auth.user_auth import UserAuth +from openhands.storage.data_models.user_secrets import UserSecrets +from openhands.storage.memory import InMemoryFileStore +from openhands.storage.secrets.secrets_store import SecretsStore +from openhands.storage.settings.file_settings_store import FileSettingsStore +from openhands.storage.settings.settings_store import SettingsStore + + +class MockUserAuthNonPro(UserAuth): + """Mock implementation of UserAuth for non-pro user testing""" + + def __init__(self): + self._settings = None + self._settings_store = MagicMock() + self._settings_store.load = AsyncMock(return_value=None) + self._settings_store.store = AsyncMock() + + async def get_user_id(self) -> str | None: + return 'test-user-nonpro' + + async def get_user_email(self) -> str | None: + return 'nonpro@test.com' + + async def get_access_token(self) -> SecretStr | None: + return SecretStr('test-token-nonpro') + + async def get_provider_tokens(self) -> dict[ProviderType, ProviderToken] | None: + return None + + async def get_user_settings_store(self) -> SettingsStore | None: + return self._settings_store + + async def get_secrets_store(self) -> SecretsStore | None: + return None + + async def get_user_secrets(self) -> UserSecrets | None: + return None + + @classmethod + async def get_instance(cls, request: Request) -> UserAuth: + return MockUserAuthNonPro() + + +class MockUserAuthPro(UserAuth): + """Mock implementation of UserAuth for pro user testing""" + + def __init__(self): + self._settings = None + self._settings_store = MagicMock() + self._settings_store.load = AsyncMock(return_value=None) + self._settings_store.store = AsyncMock() + + async def get_user_id(self) -> str | None: + return 'test-user-pro' + + async def get_user_email(self) -> str | None: + return 'pro@test.com' + + async def get_access_token(self) -> SecretStr | None: + return SecretStr('test-token-pro') + + async def get_provider_tokens(self) -> dict[ProviderType, ProviderToken] | None: + return None + + async def get_user_settings_store(self) -> SettingsStore | None: + return self._settings_store + + async def get_secrets_store(self) -> SecretsStore | None: + return None + + async def get_user_secrets(self) -> UserSecrets | None: + return None + + @classmethod + async def get_instance(cls, request: Request) -> UserAuth: + return MockUserAuthPro() + + +@pytest.fixture +def test_client_non_pro(): + """Test client for non-pro user""" + with ( + patch.dict( + os.environ, {'SESSION_API_KEY': '', 'APP_MODE': 'saas'}, clear=False + ), + patch('openhands.server.dependencies._SESSION_API_KEY', None), + patch('openhands.server.shared.server_config.app_mode', AppMode.SAAS), + patch( + 'openhands.server.user_auth.user_auth.UserAuth.get_instance', + return_value=MockUserAuthNonPro(), + ), + patch( + 'openhands.storage.settings.file_settings_store.FileSettingsStore.get_instance', + AsyncMock(return_value=FileSettingsStore(InMemoryFileStore())), + ), + # Mock the validation function at the routes level to return False (no access) + patch( + 'openhands.server.routes.settings.validate_llm_settings_access', + AsyncMock(return_value=False), + ), + ): + client = TestClient(app) + yield client + + +@pytest.fixture +def test_client_pro(): + """Test client for pro user""" + with ( + patch.dict( + os.environ, {'SESSION_API_KEY': '', 'APP_MODE': 'saas'}, clear=False + ), + patch('openhands.server.dependencies._SESSION_API_KEY', None), + patch('openhands.server.shared.server_config.app_mode', AppMode.SAAS), + patch( + 'openhands.server.user_auth.user_auth.UserAuth.get_instance', + return_value=MockUserAuthPro(), + ), + patch( + 'openhands.storage.settings.file_settings_store.FileSettingsStore.get_instance', + AsyncMock(return_value=FileSettingsStore(InMemoryFileStore())), + ), + # Mock the validation function at the routes level to return True (has access) + patch( + 'openhands.server.routes.settings.validate_llm_settings_access', + AsyncMock(return_value=True), + ), + ): + client = TestClient(app) + yield client + + +# Test data constants +OPENHANDS_PRO_MODELS = [ + 'openhands/claude-sonnet-4-20250514', + 'openhands/gpt-5-2025-08-07', + 'openhands/gpt-5-mini-2025-08-07', + 'openhands/claude-opus-4-20250514', + 'openhands/claude-opus-4-1-20250805', + 'openhands/gemini-2.5-pro', + 'openhands/o3', + 'openhands/o4-mini', +] + +DEFAULT_MODEL = 'claude-sonnet-4-20250514' + +USER_PROVIDED_MODELS = [ + 'anthropic/claude-3-5-sonnet-20241022', + 'openai/gpt-4o', + 'mistral/mistral-large', +] + + +# Helper functions +def create_base_settings(**overrides): + """Create base settings data with optional overrides""" + base_settings = { + 'language': 'en', + 'agent': 'test-agent', + 'max_iterations': 100, + } + base_settings.update(overrides) + return base_settings + + +def assert_forbidden_response(response, model_or_setting_name=''): + """Assert that response is 403 with subscription-related error""" + assert response.status_code == 403, ( + f'{model_or_setting_name} should be forbidden for non-pro users' + ) + response_data = response.json() + assert any( + keyword in response_data.get('detail', '').lower() + or keyword in response_data.get('error', '').lower() + for keyword in ['subscription', 'pro', 'upgrade'] + ) + + +@pytest.mark.parametrize( + 'model', + [ + 'openhands/claude-sonnet-4-20250514', + 'openhands/gpt-5-2025-08-07', + 'openhands/claude-opus-4-20250514', + DEFAULT_MODEL, + ] + + USER_PROVIDED_MODELS, +) +@pytest.mark.asyncio +async def test_non_pro_user_cannot_set_any_llm_model(test_client_non_pro, model): + """SECURITY TEST: Non-pro user should not be able to set any LLM model""" + settings_data = create_base_settings(llm_model=model, llm_api_key='test-key') + response = test_client_non_pro.post('/api/settings', json=settings_data) + assert_forbidden_response(response, f'Model {model}') + + +@pytest.mark.parametrize( + 'llm_setting,value', + [ + ('llm_api_key', 'new-api-key'), + ('llm_base_url', 'https://custom-api.example.com'), + ('llm_model', DEFAULT_MODEL), + ('confirmation_mode', True), + ('security_analyzer', 'llm'), + ('enable_default_condenser', False), + ('condenser_max_size', 50), + ], +) +@pytest.mark.asyncio +async def test_non_pro_user_cannot_set_individual_llm_settings( + test_client_non_pro, llm_setting, value +): + """SECURITY TEST: Non-pro user should not be able to set individual LLM settings""" + settings_data = create_base_settings(**{llm_setting: value}) + response = test_client_non_pro.post('/api/settings', json=settings_data) + assert_forbidden_response(response, f'LLM setting {llm_setting}') + + +@pytest.mark.asyncio +async def test_non_pro_user_can_set_non_llm_settings(test_client_non_pro): + """Non-pro users should still be able to modify non-LLM settings""" + # Only use settings that definitely don't trigger LLM validation + settings_data = { + 'language': 'fr', + 'max_iterations': 50, + 'user_consents_to_analytics': True, + 'git_user_name': 'test-user', + 'git_user_email': 'test@example.com', + } + response = test_client_non_pro.post('/api/settings', json=settings_data) + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_pro_user_can_set_llm_models(test_client_pro): + """Pro user should be able to set any LLM models""" + settings_data = create_base_settings( + llm_model='openhands/claude-sonnet-4-20250514', llm_api_key='test-key' + ) + response = test_client_pro.post('/api/settings', json=settings_data) + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_expired_subscription_cannot_access_llm_settings(): + """SECURITY TEST: User with expired subscription should not access LLM settings""" + with ( + patch.dict( + os.environ, {'SESSION_API_KEY': '', 'APP_MODE': 'saas'}, clear=False + ), + patch('openhands.server.dependencies._SESSION_API_KEY', None), + patch('openhands.server.shared.server_config.app_mode', AppMode.SAAS), + patch( + 'openhands.server.user_auth.user_auth.UserAuth.get_instance', + return_value=MockUserAuthPro(), + ), + patch( + 'openhands.storage.settings.file_settings_store.FileSettingsStore.get_instance', + AsyncMock(return_value=FileSettingsStore(InMemoryFileStore())), + ), + # Mock validation to return False (expired subscription, no access) + patch( + 'openhands.server.routes.settings.validate_llm_settings_access', + AsyncMock(return_value=False), + ), + ): + client = TestClient(app) + settings_data = create_base_settings( + llm_model='openhands/claude-sonnet-4-20250514', llm_api_key='test-key' + ) + response = client.post('/api/settings', json=settings_data) + assert_forbidden_response(response, 'Expired subscription') + + +@pytest.mark.asyncio +async def test_direct_api_bypass_prevention(test_client_non_pro): + """SECURITY TEST: Direct API calls should still validate subscription status""" + settings_data = create_base_settings( + llm_model='openhands/claude-sonnet-4-20250514', + llm_api_key='fake-api-key', + llm_base_url='https://api.anthropic.com', + remote_runtime_resource_factor=4, + ) + + response = test_client_non_pro.post( + '/api/settings', + json=settings_data, + headers={ + 'Content-Type': 'application/json', + 'User-Agent': 'DirectAPIClient/1.0', + }, + ) + assert_forbidden_response(response, 'Direct API bypass attempt') + + +@pytest.mark.parametrize( + 'malicious_model', + [ + 'openhands/claude-sonnet-4-20250514', # Direct + 'OPENHANDS/claude-sonnet-4-20250514', # Case manipulation + ' openhands/claude-sonnet-4-20250514', # Leading space + 'openhands//claude-sonnet-4-20250514', # Double slash + ], +) +@pytest.mark.asyncio +async def test_model_prefix_bypass_attempts_blocked( + test_client_non_pro, malicious_model +): + """SECURITY TEST: Various prefix bypass attempts should be blocked""" + settings_data = create_base_settings( + llm_model=malicious_model, llm_api_key='test-key' + ) + response = test_client_non_pro.post('/api/settings', json=settings_data) + assert_forbidden_response( + response, f"Bypass attempt with model '{malicious_model}'" + )