mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 13:47:19 +08:00
634 lines
23 KiB
Python
634 lines
23 KiB
Python
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
from server.auth.token_manager import TokenManager, create_encryption_utility
|
|
|
|
from openhands.integrations.service_types import ProviderType
|
|
|
|
|
|
@pytest.fixture
|
|
def token_manager():
|
|
with patch('server.config.get_config') as mock_get_config:
|
|
mock_config = mock_get_config.return_value
|
|
mock_config.jwt_secret.get_secret_value.return_value = 'test_secret'
|
|
return TokenManager(external=False)
|
|
|
|
|
|
def test_create_encryption_utility():
|
|
"""Test the encryption utility creation and functionality."""
|
|
secret_key = b'test_secret_key_that_is_32_bytes_lng'
|
|
encrypt_payload, decrypt_payload, encrypt_text, decrypt_text = (
|
|
create_encryption_utility(secret_key)
|
|
)
|
|
|
|
# Test text encryption/decryption
|
|
original_text = 'This is a test message'
|
|
encrypted = encrypt_text(original_text)
|
|
decrypted = decrypt_text(encrypted)
|
|
assert decrypted == original_text
|
|
assert encrypted != original_text
|
|
|
|
# Test payload encryption/decryption
|
|
original_payload = {'key1': 'value1', 'key2': 123, 'nested': {'inner': 'value'}}
|
|
encrypted = encrypt_payload(original_payload)
|
|
decrypted = decrypt_payload(encrypted)
|
|
assert decrypted == original_payload
|
|
assert encrypted != original_payload
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_keycloak_tokens_success(token_manager):
|
|
"""Test successful retrieval of Keycloak tokens."""
|
|
mock_token_response = {
|
|
'access_token': 'test_access_token',
|
|
'refresh_token': 'test_refresh_token',
|
|
}
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_token = AsyncMock(return_value=mock_token_response)
|
|
|
|
access_token, refresh_token = await token_manager.get_keycloak_tokens(
|
|
'test_code', 'http://test.com/callback'
|
|
)
|
|
|
|
assert access_token == 'test_access_token'
|
|
assert refresh_token == 'test_refresh_token'
|
|
mock_keycloak.return_value.a_token.assert_called_once_with(
|
|
grant_type='authorization_code',
|
|
code='test_code',
|
|
redirect_uri='http://test.com/callback',
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_keycloak_tokens_missing_tokens(token_manager):
|
|
"""Test handling of missing tokens in Keycloak response."""
|
|
mock_token_response = {
|
|
'access_token': 'test_access_token',
|
|
# Missing refresh_token
|
|
}
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_token = AsyncMock(return_value=mock_token_response)
|
|
|
|
access_token, refresh_token = await token_manager.get_keycloak_tokens(
|
|
'test_code', 'http://test.com/callback'
|
|
)
|
|
|
|
assert access_token is None
|
|
assert refresh_token is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_keycloak_tokens_exception(token_manager):
|
|
"""Test handling of exceptions during token retrieval."""
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_token = AsyncMock(
|
|
side_effect=Exception('Test error')
|
|
)
|
|
|
|
access_token, refresh_token = await token_manager.get_keycloak_tokens(
|
|
'test_code', 'http://test.com/callback'
|
|
)
|
|
|
|
assert access_token is None
|
|
assert refresh_token is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verify_keycloak_token_valid(token_manager):
|
|
"""Test verification of a valid Keycloak token."""
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_userinfo = AsyncMock(
|
|
return_value={'sub': 'test_user_id'}
|
|
)
|
|
|
|
access_token, refresh_token = await token_manager.verify_keycloak_token(
|
|
'test_access_token', 'test_refresh_token'
|
|
)
|
|
|
|
assert access_token == 'test_access_token'
|
|
assert refresh_token == 'test_refresh_token'
|
|
mock_keycloak.return_value.a_userinfo.assert_called_once_with(
|
|
'test_access_token'
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verify_keycloak_token_refresh(token_manager):
|
|
"""Test refreshing an invalid Keycloak token."""
|
|
from keycloak.exceptions import KeycloakAuthenticationError
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_userinfo = AsyncMock(
|
|
side_effect=KeycloakAuthenticationError('Invalid token')
|
|
)
|
|
mock_keycloak.return_value.a_refresh_token = AsyncMock(
|
|
return_value={
|
|
'access_token': 'new_access_token',
|
|
'refresh_token': 'new_refresh_token',
|
|
}
|
|
)
|
|
|
|
access_token, refresh_token = await token_manager.verify_keycloak_token(
|
|
'test_access_token', 'test_refresh_token'
|
|
)
|
|
|
|
assert access_token == 'new_access_token'
|
|
assert refresh_token == 'new_refresh_token'
|
|
mock_keycloak.return_value.a_userinfo.assert_called_once_with(
|
|
'test_access_token'
|
|
)
|
|
mock_keycloak.return_value.a_refresh_token.assert_called_once_with(
|
|
'test_refresh_token'
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_user_info(token_manager):
|
|
"""Test getting user info from a Keycloak token."""
|
|
from server.auth.token_manager import KeycloakUserInfo
|
|
|
|
mock_user_info = {
|
|
'sub': 'test_user_id',
|
|
'name': 'Test User',
|
|
'email': 'test@example.com',
|
|
}
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_userinfo = AsyncMock(return_value=mock_user_info)
|
|
|
|
user_info = await token_manager.get_user_info('test_access_token')
|
|
|
|
# Now returns KeycloakUserInfo Pydantic model instead of dict
|
|
assert isinstance(user_info, KeycloakUserInfo)
|
|
assert user_info.sub == 'test_user_id'
|
|
assert user_info.name == 'Test User'
|
|
assert user_info.email == 'test@example.com'
|
|
mock_keycloak.return_value.a_userinfo.assert_called_once_with(
|
|
'test_access_token'
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_user_info_empty_token(token_manager):
|
|
"""Test handling of empty token when getting user info."""
|
|
from keycloak.exceptions import KeycloakAuthenticationError
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_userinfo = AsyncMock(
|
|
side_effect=KeycloakAuthenticationError('Invalid token')
|
|
)
|
|
|
|
with pytest.raises(KeycloakAuthenticationError):
|
|
await token_manager.get_user_info('')
|
|
|
|
mock_keycloak.return_value.a_userinfo.assert_called_once_with('')
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_store_idp_tokens(token_manager):
|
|
"""Test storing identity provider tokens."""
|
|
mock_idp_tokens = {
|
|
'access_token': 'github_access_token',
|
|
'refresh_token': 'github_refresh_token',
|
|
'access_token_expires_at': 1000,
|
|
'refresh_token_expires_at': 2000,
|
|
}
|
|
|
|
with (
|
|
patch.object(
|
|
token_manager, 'get_idp_tokens_from_keycloak', return_value=mock_idp_tokens
|
|
),
|
|
patch.object(token_manager, '_store_idp_tokens') as mock_store,
|
|
):
|
|
await token_manager.store_idp_tokens(
|
|
ProviderType.GITHUB, 'test_user_id', 'test_access_token'
|
|
)
|
|
|
|
mock_store.assert_called_once_with(
|
|
'test_user_id',
|
|
ProviderType.GITHUB,
|
|
'github_access_token',
|
|
'github_refresh_token',
|
|
1000,
|
|
2000,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_idp_token(token_manager, create_keycloak_user_info):
|
|
"""Test getting an identity provider token."""
|
|
with (
|
|
patch(
|
|
'server.auth.token_manager.TokenManager.get_user_info',
|
|
AsyncMock(return_value=create_keycloak_user_info(sub='test_user_id')),
|
|
),
|
|
patch('server.auth.token_manager.AuthTokenStore') as mock_token_store_cls,
|
|
):
|
|
mock_token_store = AsyncMock()
|
|
mock_token_store.return_value.load_tokens.return_value = {
|
|
'access_token': token_manager.encrypt_text('github_access_token'),
|
|
}
|
|
mock_token_store_cls.get_instance = mock_token_store
|
|
|
|
token = await token_manager.get_idp_token(
|
|
'test_access_token', ProviderType.GITHUB
|
|
)
|
|
|
|
assert token == 'github_access_token'
|
|
mock_token_store_cls.get_instance.assert_called_once_with(
|
|
keycloak_user_id='test_user_id', idp=ProviderType.GITHUB
|
|
)
|
|
mock_token_store.return_value.load_tokens.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh(token_manager):
|
|
"""Test refreshing a token."""
|
|
mock_tokens = {
|
|
'access_token': 'new_access_token',
|
|
'refresh_token': 'new_refresh_token',
|
|
}
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak:
|
|
mock_keycloak.return_value.a_refresh_token = AsyncMock(return_value=mock_tokens)
|
|
|
|
result = await token_manager.refresh('test_refresh_token')
|
|
|
|
assert result == mock_tokens
|
|
mock_keycloak.return_value.a_refresh_token.assert_called_once_with(
|
|
'test_refresh_token'
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disable_keycloak_user_success(token_manager):
|
|
"""Test successful disabling of a Keycloak user account."""
|
|
# Arrange
|
|
user_id = 'test_user_id'
|
|
email = 'user@colsch.us'
|
|
mock_user = {
|
|
'id': user_id,
|
|
'username': 'testuser',
|
|
'email': email,
|
|
'emailVerified': True,
|
|
}
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin:
|
|
mock_admin = MagicMock()
|
|
mock_admin.a_get_user = AsyncMock(return_value=mock_user)
|
|
mock_admin.a_update_user = AsyncMock()
|
|
mock_get_admin.return_value = mock_admin
|
|
|
|
# Act
|
|
await token_manager.disable_keycloak_user(user_id, email)
|
|
|
|
# Assert
|
|
mock_admin.a_get_user.assert_called_once_with(user_id)
|
|
mock_admin.a_update_user.assert_called_once_with(
|
|
user_id=user_id,
|
|
payload={
|
|
'enabled': False,
|
|
'username': 'testuser',
|
|
'email': email,
|
|
'emailVerified': True,
|
|
},
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disable_keycloak_user_without_email(token_manager):
|
|
"""Test disabling Keycloak user without providing email."""
|
|
# Arrange
|
|
user_id = 'test_user_id'
|
|
mock_user = {
|
|
'id': user_id,
|
|
'username': 'testuser',
|
|
'email': 'user@example.com',
|
|
'emailVerified': False,
|
|
}
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin:
|
|
mock_admin = MagicMock()
|
|
mock_admin.a_get_user = AsyncMock(return_value=mock_user)
|
|
mock_admin.a_update_user = AsyncMock()
|
|
mock_get_admin.return_value = mock_admin
|
|
|
|
# Act
|
|
await token_manager.disable_keycloak_user(user_id)
|
|
|
|
# Assert
|
|
mock_admin.a_get_user.assert_called_once_with(user_id)
|
|
mock_admin.a_update_user.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disable_keycloak_user_not_found(token_manager):
|
|
"""Test disabling Keycloak user when user is not found."""
|
|
# Arrange
|
|
user_id = 'nonexistent_user_id'
|
|
email = 'user@colsch.us'
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin:
|
|
mock_admin = MagicMock()
|
|
mock_admin.a_get_user = AsyncMock(return_value=None)
|
|
mock_get_admin.return_value = mock_admin
|
|
|
|
# Act
|
|
await token_manager.disable_keycloak_user(user_id, email)
|
|
|
|
# Assert
|
|
mock_admin.a_get_user.assert_called_once_with(user_id)
|
|
mock_admin.a_update_user.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disable_keycloak_user_exception_handling(token_manager):
|
|
"""Test that disable_keycloak_user handles exceptions gracefully without raising."""
|
|
# Arrange
|
|
user_id = 'test_user_id'
|
|
email = 'user@colsch.us'
|
|
|
|
with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin:
|
|
mock_admin = MagicMock()
|
|
mock_admin.a_get_user = AsyncMock(side_effect=Exception('Connection error'))
|
|
mock_get_admin.return_value = mock_admin
|
|
|
|
# Act & Assert - should not raise exception
|
|
await token_manager.disable_keycloak_user(user_id, email)
|
|
|
|
# Verify the method was called
|
|
mock_admin.a_get_user.assert_called_once_with(user_id)
|
|
|
|
|
|
class TestRefreshBitbucketDataCenterToken:
|
|
"""Tests for the _refresh_bitbucket_data_center_token code path."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_happy_path(self, token_manager):
|
|
"""Credentials are sent in the POST body (not Basic auth); response is parsed."""
|
|
mock_response = MagicMock()
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = {
|
|
'access_token': 'new_bbs_access',
|
|
'refresh_token': 'new_bbs_refresh',
|
|
'expires_in': 3600,
|
|
'refresh_token_expires_in': 86400,
|
|
}
|
|
|
|
with (
|
|
patch(
|
|
'server.auth.token_manager.BITBUCKET_DATA_CENTER_HOST',
|
|
'bitbucket.example.com',
|
|
),
|
|
patch(
|
|
'server.auth.token_manager.BITBUCKET_DATA_CENTER_TOKEN_URL',
|
|
'https://bitbucket.example.com/oauth2/token',
|
|
),
|
|
patch(
|
|
'server.auth.token_manager.BITBUCKET_DATA_CENTER_CLIENT_ID',
|
|
'test_client_id',
|
|
),
|
|
patch(
|
|
'server.auth.token_manager.BITBUCKET_DATA_CENTER_CLIENT_SECRET',
|
|
'test_client_secret',
|
|
),
|
|
patch('httpx.AsyncClient') as mock_client_cls,
|
|
):
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
|
return_value=mock_client
|
|
)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
result = await token_manager._refresh_bitbucket_data_center_token(
|
|
'old_refresh_token'
|
|
)
|
|
|
|
# Credentials are sent in the POST body, not in a Basic-auth header
|
|
mock_client.post.assert_called_once_with(
|
|
'https://bitbucket.example.com/oauth2/token',
|
|
data={
|
|
'client_id': 'test_client_id',
|
|
'client_secret': 'test_client_secret',
|
|
'refresh_token': 'old_refresh_token',
|
|
'grant_type': 'refresh_token',
|
|
},
|
|
)
|
|
|
|
# Response is parsed correctly
|
|
assert result['access_token'] == 'new_bbs_access'
|
|
assert result['refresh_token'] == 'new_bbs_refresh'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_url_raises_value_error(self, token_manager):
|
|
"""When BITBUCKET_DATA_CENTER_HOST is not set, ValueError is raised immediately."""
|
|
with patch('server.auth.token_manager.BITBUCKET_DATA_CENTER_HOST', ''):
|
|
with pytest.raises(ValueError, match='BITBUCKET_DATA_CENTER_HOST'):
|
|
await token_manager._refresh_bitbucket_data_center_token(
|
|
'some_refresh_token'
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_error_propagates(self, token_manager):
|
|
"""When raise_for_status() raises, the exception propagates to the caller."""
|
|
import httpx
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
|
|
'401 Unauthorized',
|
|
request=MagicMock(),
|
|
response=MagicMock(status_code=401),
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
'server.auth.token_manager.BITBUCKET_DATA_CENTER_HOST',
|
|
'bitbucket.example.com',
|
|
),
|
|
patch(
|
|
'server.auth.token_manager.BITBUCKET_DATA_CENTER_TOKEN_URL',
|
|
'https://bitbucket.example.com/oauth2/token',
|
|
),
|
|
patch('httpx.AsyncClient') as mock_client_cls,
|
|
):
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
|
return_value=mock_client
|
|
)
|
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with pytest.raises(httpx.HTTPStatusError):
|
|
await token_manager._refresh_bitbucket_data_center_token(
|
|
'old_refresh_token'
|
|
)
|
|
|
|
|
|
class TestOrgTokenMethods:
|
|
"""Test cases for store_org_token and load_org_token methods."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_store_org_token_new_installation(self, token_manager):
|
|
"""Test storing a token for a new installation."""
|
|
installation_id = 12345
|
|
installation_token = 'ghs_test_token_abc123'
|
|
|
|
mock_session = AsyncMock()
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.first.return_value = None
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
mock_session.add = MagicMock()
|
|
mock_session.commit = AsyncMock()
|
|
|
|
mock_context_manager = AsyncMock()
|
|
mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session)
|
|
mock_context_manager.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with patch(
|
|
'server.auth.token_manager.a_session_maker',
|
|
return_value=mock_context_manager,
|
|
):
|
|
await token_manager.store_org_token(installation_id, installation_token)
|
|
|
|
mock_session.add.assert_called_once()
|
|
mock_session.commit.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_store_org_token_update_existing_installation(self, token_manager):
|
|
"""Test updating a token for an existing installation."""
|
|
installation_id = 12345
|
|
installation_token = 'ghs_test_token_abc123'
|
|
|
|
mock_installation = MagicMock()
|
|
mock_installation.encrypted_token = 'old_encrypted_token'
|
|
|
|
mock_session = AsyncMock()
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.first.return_value = mock_installation
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
mock_session.commit = AsyncMock()
|
|
|
|
mock_context_manager = AsyncMock()
|
|
mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session)
|
|
mock_context_manager.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with patch(
|
|
'server.auth.token_manager.a_session_maker',
|
|
return_value=mock_context_manager,
|
|
):
|
|
await token_manager.store_org_token(installation_id, installation_token)
|
|
|
|
# Verify token was updated (encrypted)
|
|
assert mock_installation.encrypted_token != 'old_encrypted_token'
|
|
mock_session.commit.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_load_org_token_found(self, token_manager):
|
|
"""Test loading a token that exists."""
|
|
installation_id = 12345
|
|
original_token = 'ghs_test_token_abc123'
|
|
encrypted_token = token_manager.encrypt_text(original_token)
|
|
|
|
mock_installation = MagicMock()
|
|
mock_installation.encrypted_token = encrypted_token
|
|
|
|
mock_session = AsyncMock()
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.first.return_value = mock_installation
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
mock_context_manager = AsyncMock()
|
|
mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session)
|
|
mock_context_manager.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with patch(
|
|
'server.auth.token_manager.a_session_maker',
|
|
return_value=mock_context_manager,
|
|
):
|
|
result = await token_manager.load_org_token(installation_id)
|
|
|
|
assert result == original_token
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_load_org_token_not_found(self, token_manager):
|
|
"""Test loading a token that doesn't exist."""
|
|
installation_id = 99999
|
|
|
|
mock_session = AsyncMock()
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.first.return_value = None
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
mock_context_manager = AsyncMock()
|
|
mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session)
|
|
mock_context_manager.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with patch(
|
|
'server.auth.token_manager.a_session_maker',
|
|
return_value=mock_context_manager,
|
|
):
|
|
result = await token_manager.load_org_token(installation_id)
|
|
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_store_and_load_org_token_integration(self, token_manager):
|
|
"""Test that store and load work together using the same encryption."""
|
|
installation_id = 12345
|
|
original_token = 'ghs_test_token_abc123'
|
|
|
|
# Store
|
|
stored_encrypted_token = None
|
|
|
|
def capture_add(obj):
|
|
nonlocal stored_encrypted_token
|
|
stored_encrypted_token = obj.encrypted_token
|
|
|
|
mock_session_store = AsyncMock()
|
|
mock_result_store = MagicMock()
|
|
mock_result_store.scalars.return_value.first.return_value = None
|
|
mock_session_store.execute = AsyncMock(return_value=mock_result_store)
|
|
mock_session_store.add = capture_add
|
|
mock_session_store.commit = AsyncMock()
|
|
|
|
mock_context_manager_store = AsyncMock()
|
|
mock_context_manager_store.__aenter__ = AsyncMock(
|
|
return_value=mock_session_store
|
|
)
|
|
mock_context_manager_store.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with patch(
|
|
'server.auth.token_manager.a_session_maker',
|
|
return_value=mock_context_manager_store,
|
|
):
|
|
await token_manager.store_org_token(installation_id, original_token)
|
|
|
|
# Verify we captured the encrypted token
|
|
assert stored_encrypted_token is not None
|
|
|
|
# Load - using the captured encrypted token
|
|
mock_installation_load = MagicMock()
|
|
mock_installation_load.encrypted_token = stored_encrypted_token
|
|
|
|
mock_session_load = AsyncMock()
|
|
mock_result_load = MagicMock()
|
|
mock_result_load.scalars.return_value.first.return_value = (
|
|
mock_installation_load
|
|
)
|
|
mock_session_load.execute = AsyncMock(return_value=mock_result_load)
|
|
|
|
mock_context_manager_load = AsyncMock()
|
|
mock_context_manager_load.__aenter__ = AsyncMock(return_value=mock_session_load)
|
|
mock_context_manager_load.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
with patch(
|
|
'server.auth.token_manager.a_session_maker',
|
|
return_value=mock_context_manager_load,
|
|
):
|
|
loaded_token = await token_manager.load_org_token(installation_id)
|
|
|
|
assert loaded_token == original_token
|