[Feat]: Custom secrets plumbing for BE (#7891)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Rohit Malhotra 2025-04-23 12:44:25 -04:00 committed by GitHub
parent 00c449d447
commit 964478c22f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 949 additions and 7 deletions

View File

@ -66,6 +66,10 @@ class SecretStore(BaseModel):
default_factory=lambda: MappingProxyType({})
)
custom_secrets: CUSTOM_SECRETS_TYPE = Field(
default_factory=lambda: MappingProxyType({})
)
model_config = {
'frozen': True,
'validate_assignment': True,
@ -97,16 +101,32 @@ class SecretStore(BaseModel):
return tokens
@field_serializer('custom_secrets')
def custom_secrets_serializer(
self, custom_secrets: CUSTOM_SECRETS_TYPE, info: SerializationInfo
):
secrets = {}
expose_secrets = info.context and info.context.get('expose_secrets', False)
if custom_secrets:
for secret_name, secret_key in custom_secrets.items():
secrets[secret_name] = (
secret_key.get_secret_value()
if expose_secrets
else pydantic_encoder(secret_key)
)
return secrets
@model_validator(mode='before')
@classmethod
def convert_dict_to_mappingproxy(
cls, data: dict[str, dict[str, dict[str, str]]] | PROVIDER_TOKEN_TYPE
) -> dict[str, MappingProxyType[Any, Any]]:
cls, data: dict[str, dict[str, Any] | MappingProxyType] | PROVIDER_TOKEN_TYPE
) -> dict[str, MappingProxyType | None]:
"""Custom deserializer to convert dictionary into MappingProxyType"""
if not isinstance(data, dict):
raise ValueError('SecretStore must be initialized with a dictionary')
new_data: dict[str, MappingProxyType[Any, Any]] = {}
new_data: dict[str, MappingProxyType | None] = {}
if 'provider_tokens' in data:
tokens = data['provider_tokens']
@ -128,6 +148,22 @@ class SecretStore(BaseModel):
# Convert to MappingProxyType
new_data['provider_tokens'] = MappingProxyType(converted_tokens)
elif isinstance(tokens, MappingProxyType):
new_data['provider_tokens'] = tokens
if 'custom_secrets' in data:
secrets = data['custom_secrets']
if isinstance(secrets, dict):
converted_secrets = {}
for key, value in secrets.items():
if isinstance(value, str):
converted_secrets[key] = SecretStr(value)
elif isinstance(value, SecretStr):
converted_secrets[key] = value
new_data['custom_secrets'] = MappingProxyType(converted_secrets)
elif isinstance(secrets, MappingProxyType):
new_data['custom_secrets'] = secrets
return new_data

View File

@ -6,7 +6,13 @@ from openhands.core.logger import openhands_logger as logger
from openhands.integrations.provider import ProviderToken, ProviderType, SecretStore
from openhands.integrations.utils import validate_provider_token
from openhands.server.auth import get_provider_tokens, get_user_id
from openhands.server.settings import GETSettingsModel, POSTSettingsModel, Settings
from openhands.server.settings import (
GETSettingsCustomSecrets,
GETSettingsModel,
POSTSettingsCustomSecrets,
POSTSettingsModel,
Settings,
)
from openhands.server.shared import SettingsStoreImpl, config, server_config
from openhands.server.types import AppMode
@ -55,6 +61,122 @@ async def load_settings(request: Request) -> GETSettingsModel | JSONResponse:
)
@app.get('/secrets', response_model=GETSettingsCustomSecrets)
async def load_custom_secrets_names(
request: Request,
) -> GETSettingsCustomSecrets | JSONResponse:
try:
user_id = get_user_id(request)
settings_store = await SettingsStoreImpl.get_instance(config, user_id)
settings = await settings_store.load()
if not settings:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Settings not found'},
)
custom_secrets = []
if settings.secrets_store.custom_secrets:
for secret_name, _ in settings.secrets_store.custom_secrets.items():
custom_secrets.append(secret_name)
secret_names = GETSettingsCustomSecrets(custom_secrets=custom_secrets)
return secret_names
except Exception as e:
logger.warning(f'Invalid token: {e}')
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={'error': 'Invalid token'},
)
@app.post('/secrets', response_model=dict[str, str])
async def add_custom_secret(
request: Request, incoming_secrets: POSTSettingsCustomSecrets
) -> JSONResponse:
try:
settings_store = await SettingsStoreImpl.get_instance(
config, get_user_id(request)
)
existing_settings: Settings = await settings_store.load()
if existing_settings:
for (
secret_name,
secret_value,
) in existing_settings.secrets_store.custom_secrets.items():
if (
secret_name not in incoming_secrets.custom_secrets
): # Allow incoming values to override existing ones
incoming_secrets.custom_secrets[secret_name] = secret_value
# Create a new SecretStore that preserves provider tokens
updated_secret_store = SecretStore(
custom_secrets=incoming_secrets.custom_secrets,
provider_tokens=existing_settings.secrets_store.provider_tokens,
)
# Only update SecretStore in Settings
updated_settings = existing_settings.model_copy(
update={'secrets_store': updated_secret_store}
)
updated_settings = convert_to_settings(updated_settings)
await settings_store.store(updated_settings)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={'message': 'Settings stored'},
)
except Exception as e:
logger.warning(f'Something went wrong storing settings: {e}')
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={'error': 'Something went wrong storing settings'},
)
@app.delete('/secrets/{secret_id}')
async def delete_custom_secret(request: Request, secret_id: str) -> JSONResponse:
try:
settings_store = await SettingsStoreImpl.get_instance(
config, get_user_id(request)
)
existing_settings: Settings | None = await settings_store.load()
custom_secrets = {}
if existing_settings:
for (
secret_name,
secret_value,
) in existing_settings.secrets_store.custom_secrets.items():
if secret_name != secret_id:
custom_secrets[secret_name] = secret_value
# Create a new SecretStore that preserves provider tokens
updated_secret_store = SecretStore(
custom_secrets=custom_secrets,
provider_tokens=existing_settings.secrets_store.provider_tokens,
)
updated_settings = existing_settings.model_copy(
update={'secrets_store': updated_secret_store}
)
updated_settings = convert_to_settings(updated_settings)
await settings_store.store(updated_settings)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={'message': 'Settings stored'},
)
except Exception as e:
logger.warning(f'Something went wrong storing settings: {e}')
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={'error': 'Something went wrong storing settings'},
)
@app.post('/unset-settings-tokens', response_model=dict[str, str])
async def unset_settings_tokens(request: Request) -> JSONResponse:
try:

View File

@ -63,11 +63,29 @@ class Settings(BaseModel):
if not isinstance(secrets_store, dict):
return data
custom_secrets = secrets_store.get('custom_secrets')
tokens = secrets_store.get('provider_tokens')
if not isinstance(tokens, dict):
return data
data['secrets_store'] = SecretStore(provider_tokens=tokens)
secret_store = SecretStore(provider_tokens={}, custom_secrets={})
if isinstance(tokens, dict):
converted_store = SecretStore(provider_tokens=tokens)
secret_store = secret_store.model_copy(
update={'provider_tokens': converted_store.provider_tokens}
)
else:
secret_store.model_copy(update={'provider_tokens': tokens})
if isinstance(custom_secrets, dict):
converted_store = SecretStore(custom_secrets=custom_secrets)
secret_store = secret_store.model_copy(
update={'custom_secrets': converted_store.custom_secrets}
)
else:
secret_store = secret_store.model_copy(
update={'custom_secrets': custom_secrets}
)
data['secret_store'] = secret_store
return data
@field_serializer('secrets_store')
@ -109,6 +127,14 @@ class POSTSettingsModel(Settings):
provider_tokens: dict[str, str] = {}
class POSTSettingsCustomSecrets(BaseModel):
"""
Adding new custom secret
"""
custom_secrets: dict[str, str | SecretStr] = {}
class GETSettingsModel(Settings):
"""
Settings with additional token data for the frontend
@ -116,3 +142,11 @@ class GETSettingsModel(Settings):
provider_tokens_set: dict[str, bool] | None = None
llm_api_key_set: bool
class GETSettingsCustomSecrets(BaseModel):
"""
Custom secrets names
"""
custom_secrets: list[str] | None = None

View File

@ -0,0 +1,306 @@
from __future__ import annotations
from types import MappingProxyType
from typing import Any
from pydantic import SecretStr
from openhands.integrations.provider import ProviderToken, ProviderType, SecretStore
class TestSecretStore:
def test_adding_only_provider_tokens(self):
"""Test adding only provider tokens to the SecretStore."""
# Create provider tokens
github_token = ProviderToken(
token=SecretStr('github-token-123'), user_id='user1'
)
gitlab_token = ProviderToken(
token=SecretStr('gitlab-token-456'), user_id='user2'
)
# Create a store with only provider tokens
provider_tokens = {
ProviderType.GITHUB: github_token,
ProviderType.GITLAB: gitlab_token,
}
# Initialize the store with a dict that will be converted to MappingProxyType
store = SecretStore(provider_tokens=provider_tokens)
# Verify the tokens were added correctly
assert isinstance(store.provider_tokens, MappingProxyType)
assert len(store.provider_tokens) == 2
assert (
store.provider_tokens[ProviderType.GITHUB].token.get_secret_value()
== 'github-token-123'
)
assert store.provider_tokens[ProviderType.GITHUB].user_id == 'user1'
assert (
store.provider_tokens[ProviderType.GITLAB].token.get_secret_value()
== 'gitlab-token-456'
)
assert store.provider_tokens[ProviderType.GITLAB].user_id == 'user2'
# Verify custom_secrets is empty
assert isinstance(store.custom_secrets, MappingProxyType)
assert len(store.custom_secrets) == 0
def test_adding_only_custom_secrets(self):
"""Test adding only custom secrets to the SecretStore."""
# Create custom secrets
custom_secrets = {
'API_KEY': 'api-key-123',
'DATABASE_PASSWORD': 'db-pass-456',
}
# Initialize the store with custom secrets
store = SecretStore(custom_secrets=custom_secrets)
# Verify the custom secrets were added correctly
assert isinstance(store.custom_secrets, MappingProxyType)
assert len(store.custom_secrets) == 2
assert store.custom_secrets['API_KEY'].get_secret_value() == 'api-key-123'
assert (
store.custom_secrets['DATABASE_PASSWORD'].get_secret_value()
== 'db-pass-456'
)
# Verify provider_tokens is empty
assert isinstance(store.provider_tokens, MappingProxyType)
assert len(store.provider_tokens) == 0
def test_initializing_with_mixed_types(self):
"""Test initializing the store with mixed types (dict and MappingProxyType)."""
# Create provider tokens as a dict
provider_tokens_dict = {
ProviderType.GITHUB: {'token': 'github-token-123', 'user_id': 'user1'},
}
# Create custom secrets as a MappingProxyType
custom_secrets_dict = {'API_KEY': 'api-key-123'}
custom_secrets_proxy = MappingProxyType(
{key: SecretStr(value) for key, value in custom_secrets_dict.items()}
)
# Test with dict for provider_tokens and MappingProxyType for custom_secrets
store1 = SecretStore(
provider_tokens=provider_tokens_dict, custom_secrets=custom_secrets_proxy
)
assert isinstance(store1.provider_tokens, MappingProxyType)
assert isinstance(store1.custom_secrets, MappingProxyType)
assert (
store1.provider_tokens[ProviderType.GITHUB].token.get_secret_value()
== 'github-token-123'
)
assert store1.custom_secrets['API_KEY'].get_secret_value() == 'api-key-123'
# Test with MappingProxyType for provider_tokens and dict for custom_secrets
provider_token = ProviderToken(
token=SecretStr('gitlab-token-456'), user_id='user2'
)
provider_tokens_proxy = MappingProxyType({ProviderType.GITLAB: provider_token})
store2 = SecretStore(
provider_tokens=provider_tokens_proxy, custom_secrets=custom_secrets_dict
)
assert isinstance(store2.provider_tokens, MappingProxyType)
assert isinstance(store2.custom_secrets, MappingProxyType)
assert (
store2.provider_tokens[ProviderType.GITLAB].token.get_secret_value()
== 'gitlab-token-456'
)
assert store2.custom_secrets['API_KEY'].get_secret_value() == 'api-key-123'
def test_model_copy_update_fields(self):
"""Test using model_copy to update fields without affecting other fields."""
# Create initial store
github_token = ProviderToken(
token=SecretStr('github-token-123'), user_id='user1'
)
custom_secret = {'API_KEY': SecretStr('api-key-123')}
initial_store = SecretStore(
provider_tokens=MappingProxyType({ProviderType.GITHUB: github_token}),
custom_secrets=MappingProxyType(custom_secret),
)
# Update only provider_tokens
gitlab_token = ProviderToken(
token=SecretStr('gitlab-token-456'), user_id='user2'
)
updated_provider_tokens = MappingProxyType(
{ProviderType.GITHUB: github_token, ProviderType.GITLAB: gitlab_token}
)
updated_store1 = initial_store.model_copy(
update={'provider_tokens': updated_provider_tokens}
)
# Verify provider_tokens was updated but custom_secrets remains the same
assert len(updated_store1.provider_tokens) == 2
assert (
updated_store1.provider_tokens[ProviderType.GITHUB].token.get_secret_value()
== 'github-token-123'
)
assert (
updated_store1.provider_tokens[ProviderType.GITLAB].token.get_secret_value()
== 'gitlab-token-456'
)
assert len(updated_store1.custom_secrets) == 1
assert (
updated_store1.custom_secrets['API_KEY'].get_secret_value() == 'api-key-123'
)
# Update only custom_secrets
updated_custom_secrets = MappingProxyType(
{
'API_KEY': SecretStr('api-key-123'),
'DATABASE_PASSWORD': SecretStr('db-pass-456'),
}
)
updated_store2 = initial_store.model_copy(
update={'custom_secrets': updated_custom_secrets}
)
# Verify custom_secrets was updated but provider_tokens remains the same
assert len(updated_store2.provider_tokens) == 1
assert (
updated_store2.provider_tokens[ProviderType.GITHUB].token.get_secret_value()
== 'github-token-123'
)
assert len(updated_store2.custom_secrets) == 2
assert (
updated_store2.custom_secrets['API_KEY'].get_secret_value() == 'api-key-123'
)
assert (
updated_store2.custom_secrets['DATABASE_PASSWORD'].get_secret_value()
== 'db-pass-456'
)
def test_serialization_with_expose_secrets(self):
"""Test serializing the SecretStore with expose_secrets=True."""
# Create a store with both provider tokens and custom secrets
github_token = ProviderToken(
token=SecretStr('github-token-123'), user_id='user1'
)
custom_secrets = {'API_KEY': SecretStr('api-key-123')}
store = SecretStore(
provider_tokens=MappingProxyType({ProviderType.GITHUB: github_token}),
custom_secrets=MappingProxyType(custom_secrets),
)
# Test serialization with expose_secrets=True
serialized_provider_tokens = store.provider_tokens_serializer(
store.provider_tokens, SerializationInfo(context={'expose_secrets': True})
)
serialized_custom_secrets = store.custom_secrets_serializer(
store.custom_secrets, SerializationInfo(context={'expose_secrets': True})
)
# Verify provider tokens are exposed
assert serialized_provider_tokens['github']['token'] == 'github-token-123'
assert serialized_provider_tokens['github']['user_id'] == 'user1'
# Verify custom secrets are exposed
assert serialized_custom_secrets['API_KEY'] == 'api-key-123'
# Test serialization with expose_secrets=False (default)
hidden_provider_tokens = store.provider_tokens_serializer(
store.provider_tokens, SerializationInfo(context={'expose_secrets': False})
)
hidden_custom_secrets = store.custom_secrets_serializer(
store.custom_secrets, SerializationInfo(context={'expose_secrets': False})
)
# Verify provider tokens are hidden
assert hidden_provider_tokens['github']['token'] != 'github-token-123'
assert '**' in hidden_provider_tokens['github']['token']
# Verify custom secrets are hidden
assert hidden_custom_secrets['API_KEY'] != 'api-key-123'
assert '**' in hidden_custom_secrets['API_KEY']
def test_initializing_provider_tokens_with_mixed_value_types(self):
"""Test initializing provider tokens with both plain strings and SecretStr objects."""
# Create provider tokens with mixed value types
# Note: The ProviderToken.from_value method only accepts plain strings in the token field
# when passed as a dictionary, not SecretStr objects
provider_tokens_dict = {
ProviderType.GITHUB: {
'token': 'github-token-123', # Plain string
'user_id': 'user1',
},
ProviderType.GITLAB: {
'token': 'gitlab-token-456', # Also using plain string
'user_id': 'user2',
},
}
# For the second provider, create a ProviderToken directly
gitlab_token = ProviderToken(
token=SecretStr('gitlab-token-456'), user_id='user2'
)
# Create a mixed dictionary with both a dict and a ProviderToken object
mixed_provider_tokens = {
ProviderType.GITHUB: provider_tokens_dict[ProviderType.GITHUB], # Dict
ProviderType.GITLAB: gitlab_token, # ProviderToken object
}
# Initialize the store
store = SecretStore(provider_tokens=mixed_provider_tokens)
# Verify all tokens are converted to SecretStr
assert isinstance(store.provider_tokens, MappingProxyType)
assert len(store.provider_tokens) == 2
# Check GitHub token (was plain string in a dict)
github_token = store.provider_tokens[ProviderType.GITHUB]
assert isinstance(github_token.token, SecretStr)
assert github_token.token.get_secret_value() == 'github-token-123'
assert github_token.user_id == 'user1'
# Check GitLab token (was a ProviderToken object)
gitlab_token_result = store.provider_tokens[ProviderType.GITLAB]
assert isinstance(gitlab_token_result.token, SecretStr)
assert gitlab_token_result.token.get_secret_value() == 'gitlab-token-456'
assert gitlab_token_result.user_id == 'user2'
def test_initializing_custom_secrets_with_mixed_value_types(self):
"""Test initializing custom secrets with both plain strings and SecretStr objects."""
# Create custom secrets with mixed value types
custom_secrets_dict = {
'API_KEY': 'api-key-123', # Plain string
'DATABASE_PASSWORD': SecretStr('db-pass-456'), # SecretStr
}
# Initialize the store
store = SecretStore(custom_secrets=custom_secrets_dict)
# Verify all secrets are converted to SecretStr
assert isinstance(store.custom_secrets, MappingProxyType)
assert len(store.custom_secrets) == 2
# Check API_KEY (was plain string)
assert isinstance(store.custom_secrets['API_KEY'], SecretStr)
assert store.custom_secrets['API_KEY'].get_secret_value() == 'api-key-123'
# Check DATABASE_PASSWORD (was SecretStr)
assert isinstance(store.custom_secrets['DATABASE_PASSWORD'], SecretStr)
assert (
store.custom_secrets['DATABASE_PASSWORD'].get_secret_value()
== 'db-pass-456'
)
# Mock class for SerializationInfo since it's not directly importable
class SerializationInfo:
def __init__(self, context: dict[str, Any] | None = None):
self.context = context or {}

View File

@ -0,0 +1,444 @@
"""Tests for the custom secrets API endpoints."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import SecretStr
from openhands.integrations.provider import ProviderToken, ProviderType, SecretStore
from openhands.server.routes.settings import app as settings_app
from openhands.server.settings import Settings
@pytest.fixture
def test_client():
"""Create a test client for the settings API."""
app = FastAPI()
app.include_router(settings_app)
return TestClient(app)
@pytest.fixture
def mock_settings_store():
with patch('openhands.server.routes.settings.SettingsStoreImpl') as mock:
store_instance = MagicMock()
mock.get_instance = AsyncMock(return_value=store_instance)
store_instance.load = AsyncMock()
store_instance.store = AsyncMock()
yield store_instance
@pytest.fixture
def mock_convert_to_settings():
with patch('openhands.server.routes.settings.convert_to_settings') as mock:
# Make the mock function pass through the input settings
mock.side_effect = lambda settings: settings
yield mock
@pytest.fixture
def mock_get_user_id():
with patch('openhands.server.routes.settings.get_user_id') as mock:
mock.return_value = 'test-user'
yield mock
@pytest.mark.asyncio
async def test_load_custom_secrets_names(test_client, mock_settings_store):
"""Test loading custom secrets names."""
# Create initial settings with custom secrets
custom_secrets = {
'API_KEY': SecretStr('api-key-value'),
'DB_PASSWORD': SecretStr('db-password-value'),
}
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(
custom_secrets=custom_secrets, provider_tokens=provider_tokens
)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the GET request
response = test_client.get('/api/secrets')
assert response.status_code == 200
# Check the response
data = response.json()
assert 'custom_secrets' in data
assert sorted(data['custom_secrets']) == ['API_KEY', 'DB_PASSWORD']
@pytest.mark.asyncio
async def test_load_custom_secrets_names_empty(test_client, mock_settings_store):
"""Test loading custom secrets names when there are no custom secrets."""
# Create initial settings with no custom secrets
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(provider_tokens=provider_tokens)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the GET request
response = test_client.get('/api/secrets')
assert response.status_code == 200
# Check the response
data = response.json()
assert 'custom_secrets' in data
assert data['custom_secrets'] == []
@pytest.mark.asyncio
async def test_add_custom_secret(
test_client, mock_settings_store, mock_convert_to_settings
):
"""Test adding a new custom secret."""
# Create initial settings with provider tokens but no custom secrets
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(provider_tokens=provider_tokens)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the POST request to add a custom secret
add_secret_data = {'custom_secrets': {'API_KEY': 'api-key-value'}}
response = test_client.post('/api/secrets', json=add_secret_data)
assert response.status_code == 200
# Verify that the settings were stored with the new secret
stored_settings = mock_settings_store.store.call_args[0][0]
# Check that the secret was added
assert 'API_KEY' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets['API_KEY'].get_secret_value()
== 'api-key-value'
)
# Check that other settings were preserved
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
@pytest.mark.asyncio
async def test_update_existing_custom_secret(
test_client, mock_settings_store, mock_convert_to_settings
):
"""Test updating an existing custom secret."""
# Create initial settings with a custom secret
custom_secrets = {'API_KEY': SecretStr('old-api-key')}
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(
custom_secrets=custom_secrets, provider_tokens=provider_tokens
)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the POST request to update the custom secret
update_secret_data = {'custom_secrets': {'API_KEY': 'new-api-key'}}
response = test_client.post('/api/secrets', json=update_secret_data)
assert response.status_code == 200
# Verify that the settings were stored with the updated secret
stored_settings: Settings = mock_settings_store.store.call_args[0][0]
# Check that the secret was updated
assert 'API_KEY' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets['API_KEY'].get_secret_value()
== 'new-api-key'
)
# Check that other settings were preserved
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
assert ProviderType.GITHUB in stored_settings.secrets_store.provider_tokens
@pytest.mark.asyncio
async def test_add_multiple_custom_secrets(
test_client, mock_settings_store, mock_convert_to_settings
):
"""Test adding multiple custom secrets at once."""
# Create initial settings with one custom secret
custom_secrets = {'EXISTING_SECRET': SecretStr('existing-value')}
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(
custom_secrets=custom_secrets, provider_tokens=provider_tokens
)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the POST request to add multiple custom secrets
add_secrets_data = {
'custom_secrets': {
'API_KEY': 'api-key-value',
'DB_PASSWORD': 'db-password-value',
}
}
response = test_client.post('/api/secrets', json=add_secrets_data)
assert response.status_code == 200
# Verify that the settings were stored with the new secrets
stored_settings = mock_settings_store.store.call_args[0][0]
# Check that the new secrets were added
assert 'API_KEY' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets['API_KEY'].get_secret_value()
== 'api-key-value'
)
assert 'DB_PASSWORD' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets['DB_PASSWORD'].get_secret_value()
== 'db-password-value'
)
# Check that existing secrets were preserved
assert 'EXISTING_SECRET' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets[
'EXISTING_SECRET'
].get_secret_value()
== 'existing-value'
)
# Check that other settings were preserved
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
@pytest.mark.asyncio
async def test_delete_custom_secret(
test_client, mock_settings_store, mock_convert_to_settings
):
"""Test deleting a custom secret."""
# Create initial settings with multiple custom secrets
custom_secrets = {
'API_KEY': SecretStr('api-key-value'),
'DB_PASSWORD': SecretStr('db-password-value'),
}
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(
custom_secrets=custom_secrets, provider_tokens=provider_tokens
)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the DELETE request to delete a custom secret
response = test_client.delete('/api/secrets/API_KEY')
assert response.status_code == 200
# Verify that the settings were stored without the deleted secret
stored_settings = mock_settings_store.store.call_args[0][0]
# Check that the specified secret was deleted
assert 'API_KEY' not in stored_settings.secrets_store.custom_secrets
# Check that other secrets were preserved
assert 'DB_PASSWORD' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets['DB_PASSWORD'].get_secret_value()
== 'db-password-value'
)
# Check that other settings were preserved
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
@pytest.mark.asyncio
async def test_delete_nonexistent_custom_secret(
test_client, mock_settings_store, mock_convert_to_settings
):
"""Test deleting a custom secret that doesn't exist."""
# Create initial settings with a custom secret
custom_secrets = {'API_KEY': SecretStr('api-key-value')}
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token'))
}
secret_store = SecretStore(
custom_secrets=custom_secrets, provider_tokens=provider_tokens
)
initial_settings = Settings(
language='en',
agent='test-agent',
llm_api_key=SecretStr('test-llm-key'),
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# Make the DELETE request to delete a nonexistent custom secret
response = test_client.delete('/api/secrets/NONEXISTENT_KEY')
assert response.status_code == 200
# Verify that the settings were stored without changes to existing secrets
stored_settings = mock_settings_store.store.call_args[0][0]
# Check that the existing secret was preserved
assert 'API_KEY' in stored_settings.secrets_store.custom_secrets
assert (
stored_settings.secrets_store.custom_secrets['API_KEY'].get_secret_value()
== 'api-key-value'
)
# Check that other settings were preserved
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
@pytest.mark.asyncio
async def test_custom_secrets_operations_preserve_settings(
test_client, mock_settings_store, mock_convert_to_settings
):
"""Test that operations on custom secrets preserve all other settings."""
# Create initial settings with comprehensive data
custom_secrets = {'INITIAL_SECRET': SecretStr('initial-value')}
provider_tokens = {
ProviderType.GITHUB: ProviderToken(token=SecretStr('github-token')),
ProviderType.GITLAB: ProviderToken(token=SecretStr('gitlab-token')),
}
secret_store = SecretStore(
custom_secrets=custom_secrets, provider_tokens=provider_tokens
)
initial_settings = Settings(
language='en',
agent='test-agent',
max_iterations=100,
security_analyzer='default',
confirmation_mode=True,
llm_model='test-model',
llm_api_key=SecretStr('test-llm-key'),
llm_base_url='https://test.com',
remote_runtime_resource_factor=2,
enable_default_condenser=True,
enable_sound_notifications=False,
user_consents_to_analytics=True,
secrets_store=secret_store,
)
# Mock the settings store to return our initial settings
mock_settings_store.load.return_value = initial_settings
# 1. Test adding a new custom secret
add_secret_data = {'custom_secrets': {'NEW_SECRET': 'new-value'}}
response = test_client.post('/api/secrets', json=add_secret_data)
assert response.status_code == 200
# Verify all settings are preserved
stored_settings = mock_settings_store.store.call_args[0][0]
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.max_iterations == 100
assert stored_settings.security_analyzer == 'default'
assert stored_settings.confirmation_mode is True
assert stored_settings.llm_model == 'test-model'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
assert stored_settings.llm_base_url == 'https://test.com'
assert stored_settings.remote_runtime_resource_factor == 2
assert stored_settings.enable_default_condenser is True
assert stored_settings.enable_sound_notifications is False
assert stored_settings.user_consents_to_analytics is True
assert len(stored_settings.secrets_store.provider_tokens) == 2
# 2. Test updating an existing custom secret
update_secret_data = {'custom_secrets': {'INITIAL_SECRET': 'updated-value'}}
response = test_client.post('/api/secrets', json=update_secret_data)
assert response.status_code == 200
# Verify all settings are still preserved
stored_settings = mock_settings_store.store.call_args[0][0]
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.max_iterations == 100
assert stored_settings.security_analyzer == 'default'
assert stored_settings.confirmation_mode is True
assert stored_settings.llm_model == 'test-model'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
assert stored_settings.llm_base_url == 'https://test.com'
assert stored_settings.remote_runtime_resource_factor == 2
assert stored_settings.enable_default_condenser is True
assert stored_settings.enable_sound_notifications is False
assert stored_settings.user_consents_to_analytics is True
assert len(stored_settings.secrets_store.provider_tokens) == 2
# 3. Test deleting a custom secret
response = test_client.delete('/api/secrets/NEW_SECRET')
assert response.status_code == 200
# Verify all settings are still preserved
stored_settings = mock_settings_store.store.call_args[0][0]
assert stored_settings.language == 'en'
assert stored_settings.agent == 'test-agent'
assert stored_settings.max_iterations == 100
assert stored_settings.security_analyzer == 'default'
assert stored_settings.confirmation_mode is True
assert stored_settings.llm_model == 'test-model'
assert stored_settings.llm_api_key.get_secret_value() == 'test-llm-key'
assert stored_settings.llm_base_url == 'https://test.com'
assert stored_settings.remote_runtime_resource_factor == 2
assert stored_settings.enable_default_condenser is True
assert stored_settings.enable_sound_notifications is False
assert stored_settings.user_consents_to_analytics is True
assert len(stored_settings.secrets_store.provider_tokens) == 2