fix(backend): Add validation for LLM settings to prevent non-pro user bypass (#11113)

This commit is contained in:
sp.wack 2025-09-26 16:10:09 +04:00 committed by GitHub
parent 8a7a5cce5e
commit ef12adc107
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 474 additions and 1 deletions

View File

@ -11,10 +11,15 @@ from openhands.server.routes.secrets import invalidate_legacy_secrets_store
from openhands.server.settings import (
GETSettingsModel,
)
from openhands.server.settings_validation import (
check_llm_settings_changes,
validate_llm_settings_access,
)
from openhands.server.shared import config
from openhands.server.user_auth import (
get_provider_tokens,
get_secrets_store,
get_user_id,
get_user_settings,
get_user_settings_store,
)
@ -135,17 +140,34 @@ async def store_llm_settings(
response_model=None,
responses={
200: {'description': 'Settings stored successfully', 'model': dict},
403: {'description': 'Subscription required for pro models', 'model': dict},
500: {'description': 'Error storing settings', 'model': dict},
},
)
async def store_settings(
settings: Settings,
settings_store: SettingsStore = Depends(get_user_settings_store),
user_id: str = Depends(get_user_id),
) -> JSONResponse:
# Check provider tokens are valid
try:
existing_settings = await settings_store.load()
# Check if any LLM-related settings are being changed
llm_settings_being_changed = check_llm_settings_changes(
settings, existing_settings
)
if llm_settings_being_changed:
has_access = await validate_llm_settings_access(user_id)
if not has_access:
return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={
'error': 'Modifying LLM settings requires an active OpenHands Pro subscription. Please upgrade your account to access LLM configuration.',
'detail': 'Subscription required for LLM settings modifications',
},
)
# Convert to Settings model and merge with existing settings
if existing_settings:
settings = await store_llm_settings(settings, settings_store)

View File

@ -0,0 +1,122 @@
"""Settings validation utilities for LLM settings access control."""
from openhands.core.logger import openhands_logger as logger
from openhands.server.shared import server_config
from openhands.server.types import AppMode
from openhands.storage.data_models.settings import Settings
def _is_llm_setting_changing(setting_name: str, new_value, existing_settings) -> bool:
"""Check if a specific LLM setting is being changed from its existing value.
Args:
setting_name: Name of the setting to check
new_value: New value being set
existing_settings: Existing settings object (can be None)
Returns:
bool: True if the setting is being changed, False otherwise
"""
if new_value is None:
return False
# Handle special case for enable_default_condenser with default value
if setting_name == 'enable_default_condenser':
if not existing_settings:
# First time setting - only validate if setting to non-default value
return not new_value
else:
# Changing existing value
return new_value != existing_settings.enable_default_condenser
# For other settings, validate if explicitly provided and different from existing
if not existing_settings:
return True
existing_value = getattr(existing_settings, setting_name, None)
return new_value != existing_value
def check_llm_settings_changes(settings: Settings, existing_settings) -> bool:
"""Check if any LLM-related settings are being changed.
Validates both core LLM settings (model, API key, base URL) and advanced settings
shown to SaaS users (confirmation mode, security analyzer, memory condenser settings).
Args:
settings: New settings being applied
existing_settings: Current settings (can be None)
Returns:
bool: True if any LLM settings are being changed, False otherwise
"""
# Core LLM settings - always validate if provided
core_llm_changes = any(
[
settings.llm_model is not None,
settings.llm_api_key is not None,
settings.llm_base_url is not None,
]
)
if core_llm_changes:
return True
# Additional LLM settings shown to SaaS users - validate if actually changing
advanced_llm_changes = any(
[
_is_llm_setting_changing(
'confirmation_mode', settings.confirmation_mode, existing_settings
),
_is_llm_setting_changing(
'security_analyzer', settings.security_analyzer, existing_settings
),
_is_llm_setting_changing(
'enable_default_condenser',
settings.enable_default_condenser,
existing_settings,
),
_is_llm_setting_changing(
'condenser_max_size', settings.condenser_max_size, existing_settings
),
]
)
return advanced_llm_changes
async def validate_llm_settings_access(user_id: str) -> bool:
"""Validate if user has access to modify LLM settings in SaaS mode.
In SaaS mode, only pro users with active subscriptions can modify LLM settings.
Args:
user_id: The user ID to check subscription for
Returns:
bool: True if user can modify LLM settings, False otherwise
"""
# Skip validation in non-SaaS mode
if server_config.app_mode != AppMode.SAAS:
return True
# In SaaS mode, check for active subscription for ANY LLM settings changes
try:
# Import here to avoid circular imports and handle enterprise mode gracefully
from enterprise.server.routes.billing import get_subscription_access
subscription = await get_subscription_access(user_id)
# The get_subscription_access function already filters for ACTIVE status,
# so if we get a subscription back, it means it's active
return subscription is not None
except ImportError:
# Enterprise billing module not available - in SaaS mode, this means
# we can't validate subscriptions, so deny access to be safe
logger.warning(
'Enterprise billing module not available in SaaS mode, denying LLM settings access'
)
return False
except Exception as e:
# On error, deny access to be safe
logger.warning(f'Error checking subscription access for user {user_id}: {e}')
return False

View File

@ -0,0 +1,329 @@
"""Security tests for settings API to ensure pro-only features are properly validated on backend."""
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import Request
from fastapi.testclient import TestClient
from pydantic import SecretStr
from openhands.integrations.provider import ProviderToken, ProviderType
from openhands.server.app import app
from openhands.server.types import AppMode
from openhands.server.user_auth.user_auth import UserAuth
from openhands.storage.data_models.user_secrets import UserSecrets
from openhands.storage.memory import InMemoryFileStore
from openhands.storage.secrets.secrets_store import SecretsStore
from openhands.storage.settings.file_settings_store import FileSettingsStore
from openhands.storage.settings.settings_store import SettingsStore
class MockUserAuthNonPro(UserAuth):
"""Mock implementation of UserAuth for non-pro user testing"""
def __init__(self):
self._settings = None
self._settings_store = MagicMock()
self._settings_store.load = AsyncMock(return_value=None)
self._settings_store.store = AsyncMock()
async def get_user_id(self) -> str | None:
return 'test-user-nonpro'
async def get_user_email(self) -> str | None:
return 'nonpro@test.com'
async def get_access_token(self) -> SecretStr | None:
return SecretStr('test-token-nonpro')
async def get_provider_tokens(self) -> dict[ProviderType, ProviderToken] | None:
return None
async def get_user_settings_store(self) -> SettingsStore | None:
return self._settings_store
async def get_secrets_store(self) -> SecretsStore | None:
return None
async def get_user_secrets(self) -> UserSecrets | None:
return None
@classmethod
async def get_instance(cls, request: Request) -> UserAuth:
return MockUserAuthNonPro()
class MockUserAuthPro(UserAuth):
"""Mock implementation of UserAuth for pro user testing"""
def __init__(self):
self._settings = None
self._settings_store = MagicMock()
self._settings_store.load = AsyncMock(return_value=None)
self._settings_store.store = AsyncMock()
async def get_user_id(self) -> str | None:
return 'test-user-pro'
async def get_user_email(self) -> str | None:
return 'pro@test.com'
async def get_access_token(self) -> SecretStr | None:
return SecretStr('test-token-pro')
async def get_provider_tokens(self) -> dict[ProviderType, ProviderToken] | None:
return None
async def get_user_settings_store(self) -> SettingsStore | None:
return self._settings_store
async def get_secrets_store(self) -> SecretsStore | None:
return None
async def get_user_secrets(self) -> UserSecrets | None:
return None
@classmethod
async def get_instance(cls, request: Request) -> UserAuth:
return MockUserAuthPro()
@pytest.fixture
def test_client_non_pro():
"""Test client for non-pro user"""
with (
patch.dict(
os.environ, {'SESSION_API_KEY': '', 'APP_MODE': 'saas'}, clear=False
),
patch('openhands.server.dependencies._SESSION_API_KEY', None),
patch('openhands.server.shared.server_config.app_mode', AppMode.SAAS),
patch(
'openhands.server.user_auth.user_auth.UserAuth.get_instance',
return_value=MockUserAuthNonPro(),
),
patch(
'openhands.storage.settings.file_settings_store.FileSettingsStore.get_instance',
AsyncMock(return_value=FileSettingsStore(InMemoryFileStore())),
),
# Mock the validation function at the routes level to return False (no access)
patch(
'openhands.server.routes.settings.validate_llm_settings_access',
AsyncMock(return_value=False),
),
):
client = TestClient(app)
yield client
@pytest.fixture
def test_client_pro():
"""Test client for pro user"""
with (
patch.dict(
os.environ, {'SESSION_API_KEY': '', 'APP_MODE': 'saas'}, clear=False
),
patch('openhands.server.dependencies._SESSION_API_KEY', None),
patch('openhands.server.shared.server_config.app_mode', AppMode.SAAS),
patch(
'openhands.server.user_auth.user_auth.UserAuth.get_instance',
return_value=MockUserAuthPro(),
),
patch(
'openhands.storage.settings.file_settings_store.FileSettingsStore.get_instance',
AsyncMock(return_value=FileSettingsStore(InMemoryFileStore())),
),
# Mock the validation function at the routes level to return True (has access)
patch(
'openhands.server.routes.settings.validate_llm_settings_access',
AsyncMock(return_value=True),
),
):
client = TestClient(app)
yield client
# Test data constants
OPENHANDS_PRO_MODELS = [
'openhands/claude-sonnet-4-20250514',
'openhands/gpt-5-2025-08-07',
'openhands/gpt-5-mini-2025-08-07',
'openhands/claude-opus-4-20250514',
'openhands/claude-opus-4-1-20250805',
'openhands/gemini-2.5-pro',
'openhands/o3',
'openhands/o4-mini',
]
DEFAULT_MODEL = 'claude-sonnet-4-20250514'
USER_PROVIDED_MODELS = [
'anthropic/claude-3-5-sonnet-20241022',
'openai/gpt-4o',
'mistral/mistral-large',
]
# Helper functions
def create_base_settings(**overrides):
"""Create base settings data with optional overrides"""
base_settings = {
'language': 'en',
'agent': 'test-agent',
'max_iterations': 100,
}
base_settings.update(overrides)
return base_settings
def assert_forbidden_response(response, model_or_setting_name=''):
"""Assert that response is 403 with subscription-related error"""
assert response.status_code == 403, (
f'{model_or_setting_name} should be forbidden for non-pro users'
)
response_data = response.json()
assert any(
keyword in response_data.get('detail', '').lower()
or keyword in response_data.get('error', '').lower()
for keyword in ['subscription', 'pro', 'upgrade']
)
@pytest.mark.parametrize(
'model',
[
'openhands/claude-sonnet-4-20250514',
'openhands/gpt-5-2025-08-07',
'openhands/claude-opus-4-20250514',
DEFAULT_MODEL,
]
+ USER_PROVIDED_MODELS,
)
@pytest.mark.asyncio
async def test_non_pro_user_cannot_set_any_llm_model(test_client_non_pro, model):
"""SECURITY TEST: Non-pro user should not be able to set any LLM model"""
settings_data = create_base_settings(llm_model=model, llm_api_key='test-key')
response = test_client_non_pro.post('/api/settings', json=settings_data)
assert_forbidden_response(response, f'Model {model}')
@pytest.mark.parametrize(
'llm_setting,value',
[
('llm_api_key', 'new-api-key'),
('llm_base_url', 'https://custom-api.example.com'),
('llm_model', DEFAULT_MODEL),
('confirmation_mode', True),
('security_analyzer', 'llm'),
('enable_default_condenser', False),
('condenser_max_size', 50),
],
)
@pytest.mark.asyncio
async def test_non_pro_user_cannot_set_individual_llm_settings(
test_client_non_pro, llm_setting, value
):
"""SECURITY TEST: Non-pro user should not be able to set individual LLM settings"""
settings_data = create_base_settings(**{llm_setting: value})
response = test_client_non_pro.post('/api/settings', json=settings_data)
assert_forbidden_response(response, f'LLM setting {llm_setting}')
@pytest.mark.asyncio
async def test_non_pro_user_can_set_non_llm_settings(test_client_non_pro):
"""Non-pro users should still be able to modify non-LLM settings"""
# Only use settings that definitely don't trigger LLM validation
settings_data = {
'language': 'fr',
'max_iterations': 50,
'user_consents_to_analytics': True,
'git_user_name': 'test-user',
'git_user_email': 'test@example.com',
}
response = test_client_non_pro.post('/api/settings', json=settings_data)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_pro_user_can_set_llm_models(test_client_pro):
"""Pro user should be able to set any LLM models"""
settings_data = create_base_settings(
llm_model='openhands/claude-sonnet-4-20250514', llm_api_key='test-key'
)
response = test_client_pro.post('/api/settings', json=settings_data)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_expired_subscription_cannot_access_llm_settings():
"""SECURITY TEST: User with expired subscription should not access LLM settings"""
with (
patch.dict(
os.environ, {'SESSION_API_KEY': '', 'APP_MODE': 'saas'}, clear=False
),
patch('openhands.server.dependencies._SESSION_API_KEY', None),
patch('openhands.server.shared.server_config.app_mode', AppMode.SAAS),
patch(
'openhands.server.user_auth.user_auth.UserAuth.get_instance',
return_value=MockUserAuthPro(),
),
patch(
'openhands.storage.settings.file_settings_store.FileSettingsStore.get_instance',
AsyncMock(return_value=FileSettingsStore(InMemoryFileStore())),
),
# Mock validation to return False (expired subscription, no access)
patch(
'openhands.server.routes.settings.validate_llm_settings_access',
AsyncMock(return_value=False),
),
):
client = TestClient(app)
settings_data = create_base_settings(
llm_model='openhands/claude-sonnet-4-20250514', llm_api_key='test-key'
)
response = client.post('/api/settings', json=settings_data)
assert_forbidden_response(response, 'Expired subscription')
@pytest.mark.asyncio
async def test_direct_api_bypass_prevention(test_client_non_pro):
"""SECURITY TEST: Direct API calls should still validate subscription status"""
settings_data = create_base_settings(
llm_model='openhands/claude-sonnet-4-20250514',
llm_api_key='fake-api-key',
llm_base_url='https://api.anthropic.com',
remote_runtime_resource_factor=4,
)
response = test_client_non_pro.post(
'/api/settings',
json=settings_data,
headers={
'Content-Type': 'application/json',
'User-Agent': 'DirectAPIClient/1.0',
},
)
assert_forbidden_response(response, 'Direct API bypass attempt')
@pytest.mark.parametrize(
'malicious_model',
[
'openhands/claude-sonnet-4-20250514', # Direct
'OPENHANDS/claude-sonnet-4-20250514', # Case manipulation
' openhands/claude-sonnet-4-20250514', # Leading space
'openhands//claude-sonnet-4-20250514', # Double slash
],
)
@pytest.mark.asyncio
async def test_model_prefix_bypass_attempts_blocked(
test_client_non_pro, malicious_model
):
"""SECURITY TEST: Various prefix bypass attempts should be blocked"""
settings_data = create_base_settings(
llm_model=malicious_model, llm_api_key='test-key'
)
response = test_client_non_pro.post('/api/settings', json=settings_data)
assert_forbidden_response(
response, f"Bypass attempt with model '{malicious_model}'"
)