from unittest.mock import AsyncMock, MagicMock, patch import pytest from server.auth.token_manager import TokenManager, create_encryption_utility from openhands.integrations.service_types import ProviderType @pytest.fixture def token_manager(): with patch('server.config.get_config') as mock_get_config: mock_config = mock_get_config.return_value mock_config.jwt_secret.get_secret_value.return_value = 'test_secret' return TokenManager(external=False) def test_create_encryption_utility(): """Test the encryption utility creation and functionality.""" secret_key = b'test_secret_key_that_is_32_bytes_lng' encrypt_payload, decrypt_payload, encrypt_text, decrypt_text = ( create_encryption_utility(secret_key) ) # Test text encryption/decryption original_text = 'This is a test message' encrypted = encrypt_text(original_text) decrypted = decrypt_text(encrypted) assert decrypted == original_text assert encrypted != original_text # Test payload encryption/decryption original_payload = {'key1': 'value1', 'key2': 123, 'nested': {'inner': 'value'}} encrypted = encrypt_payload(original_payload) decrypted = decrypt_payload(encrypted) assert decrypted == original_payload assert encrypted != original_payload @pytest.mark.asyncio async def test_get_keycloak_tokens_success(token_manager): """Test successful retrieval of Keycloak tokens.""" mock_token_response = { 'access_token': 'test_access_token', 'refresh_token': 'test_refresh_token', } with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_token = AsyncMock(return_value=mock_token_response) access_token, refresh_token = await token_manager.get_keycloak_tokens( 'test_code', 'http://test.com/callback' ) assert access_token == 'test_access_token' assert refresh_token == 'test_refresh_token' mock_keycloak.return_value.a_token.assert_called_once_with( grant_type='authorization_code', code='test_code', redirect_uri='http://test.com/callback', ) @pytest.mark.asyncio async def test_get_keycloak_tokens_missing_tokens(token_manager): """Test handling of missing tokens in Keycloak response.""" mock_token_response = { 'access_token': 'test_access_token', # Missing refresh_token } with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_token = AsyncMock(return_value=mock_token_response) access_token, refresh_token = await token_manager.get_keycloak_tokens( 'test_code', 'http://test.com/callback' ) assert access_token is None assert refresh_token is None @pytest.mark.asyncio async def test_get_keycloak_tokens_exception(token_manager): """Test handling of exceptions during token retrieval.""" with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_token = AsyncMock( side_effect=Exception('Test error') ) access_token, refresh_token = await token_manager.get_keycloak_tokens( 'test_code', 'http://test.com/callback' ) assert access_token is None assert refresh_token is None @pytest.mark.asyncio async def test_verify_keycloak_token_valid(token_manager): """Test verification of a valid Keycloak token.""" with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_userinfo = AsyncMock( return_value={'sub': 'test_user_id'} ) access_token, refresh_token = await token_manager.verify_keycloak_token( 'test_access_token', 'test_refresh_token' ) assert access_token == 'test_access_token' assert refresh_token == 'test_refresh_token' mock_keycloak.return_value.a_userinfo.assert_called_once_with( 'test_access_token' ) @pytest.mark.asyncio async def test_verify_keycloak_token_refresh(token_manager): """Test refreshing an invalid Keycloak token.""" from keycloak.exceptions import KeycloakAuthenticationError with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_userinfo = AsyncMock( side_effect=KeycloakAuthenticationError('Invalid token') ) mock_keycloak.return_value.a_refresh_token = AsyncMock( return_value={ 'access_token': 'new_access_token', 'refresh_token': 'new_refresh_token', } ) access_token, refresh_token = await token_manager.verify_keycloak_token( 'test_access_token', 'test_refresh_token' ) assert access_token == 'new_access_token' assert refresh_token == 'new_refresh_token' mock_keycloak.return_value.a_userinfo.assert_called_once_with( 'test_access_token' ) mock_keycloak.return_value.a_refresh_token.assert_called_once_with( 'test_refresh_token' ) @pytest.mark.asyncio async def test_get_user_info(token_manager): """Test getting user info from a Keycloak token.""" from server.auth.token_manager import KeycloakUserInfo mock_user_info = { 'sub': 'test_user_id', 'name': 'Test User', 'email': 'test@example.com', } with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_userinfo = AsyncMock(return_value=mock_user_info) user_info = await token_manager.get_user_info('test_access_token') # Now returns KeycloakUserInfo Pydantic model instead of dict assert isinstance(user_info, KeycloakUserInfo) assert user_info.sub == 'test_user_id' assert user_info.name == 'Test User' assert user_info.email == 'test@example.com' mock_keycloak.return_value.a_userinfo.assert_called_once_with( 'test_access_token' ) @pytest.mark.asyncio async def test_get_user_info_empty_token(token_manager): """Test handling of empty token when getting user info.""" from keycloak.exceptions import KeycloakAuthenticationError with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_userinfo = AsyncMock( side_effect=KeycloakAuthenticationError('Invalid token') ) with pytest.raises(KeycloakAuthenticationError): await token_manager.get_user_info('') mock_keycloak.return_value.a_userinfo.assert_called_once_with('') @pytest.mark.asyncio async def test_store_idp_tokens(token_manager): """Test storing identity provider tokens.""" mock_idp_tokens = { 'access_token': 'github_access_token', 'refresh_token': 'github_refresh_token', 'access_token_expires_at': 1000, 'refresh_token_expires_at': 2000, } with ( patch.object( token_manager, 'get_idp_tokens_from_keycloak', return_value=mock_idp_tokens ), patch.object(token_manager, '_store_idp_tokens') as mock_store, ): await token_manager.store_idp_tokens( ProviderType.GITHUB, 'test_user_id', 'test_access_token' ) mock_store.assert_called_once_with( 'test_user_id', ProviderType.GITHUB, 'github_access_token', 'github_refresh_token', 1000, 2000, ) @pytest.mark.asyncio async def test_get_idp_token(token_manager, create_keycloak_user_info): """Test getting an identity provider token.""" with ( patch( 'server.auth.token_manager.TokenManager.get_user_info', AsyncMock(return_value=create_keycloak_user_info(sub='test_user_id')), ), patch('server.auth.token_manager.AuthTokenStore') as mock_token_store_cls, ): mock_token_store = AsyncMock() mock_token_store.return_value.load_tokens.return_value = { 'access_token': token_manager.encrypt_text('github_access_token'), } mock_token_store_cls.get_instance = mock_token_store token = await token_manager.get_idp_token( 'test_access_token', ProviderType.GITHUB ) assert token == 'github_access_token' mock_token_store_cls.get_instance.assert_called_once_with( keycloak_user_id='test_user_id', idp=ProviderType.GITHUB ) mock_token_store.return_value.load_tokens.assert_called_once() @pytest.mark.asyncio async def test_refresh(token_manager): """Test refreshing a token.""" mock_tokens = { 'access_token': 'new_access_token', 'refresh_token': 'new_refresh_token', } with patch('server.auth.token_manager.get_keycloak_openid') as mock_keycloak: mock_keycloak.return_value.a_refresh_token = AsyncMock(return_value=mock_tokens) result = await token_manager.refresh('test_refresh_token') assert result == mock_tokens mock_keycloak.return_value.a_refresh_token.assert_called_once_with( 'test_refresh_token' ) @pytest.mark.asyncio async def test_disable_keycloak_user_success(token_manager): """Test successful disabling of a Keycloak user account.""" # Arrange user_id = 'test_user_id' email = 'user@colsch.us' mock_user = { 'id': user_id, 'username': 'testuser', 'email': email, 'emailVerified': True, } with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin: mock_admin = MagicMock() mock_admin.a_get_user = AsyncMock(return_value=mock_user) mock_admin.a_update_user = AsyncMock() mock_get_admin.return_value = mock_admin # Act await token_manager.disable_keycloak_user(user_id, email) # Assert mock_admin.a_get_user.assert_called_once_with(user_id) mock_admin.a_update_user.assert_called_once_with( user_id=user_id, payload={ 'enabled': False, 'username': 'testuser', 'email': email, 'emailVerified': True, }, ) @pytest.mark.asyncio async def test_disable_keycloak_user_without_email(token_manager): """Test disabling Keycloak user without providing email.""" # Arrange user_id = 'test_user_id' mock_user = { 'id': user_id, 'username': 'testuser', 'email': 'user@example.com', 'emailVerified': False, } with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin: mock_admin = MagicMock() mock_admin.a_get_user = AsyncMock(return_value=mock_user) mock_admin.a_update_user = AsyncMock() mock_get_admin.return_value = mock_admin # Act await token_manager.disable_keycloak_user(user_id) # Assert mock_admin.a_get_user.assert_called_once_with(user_id) mock_admin.a_update_user.assert_called_once() @pytest.mark.asyncio async def test_disable_keycloak_user_not_found(token_manager): """Test disabling Keycloak user when user is not found.""" # Arrange user_id = 'nonexistent_user_id' email = 'user@colsch.us' with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin: mock_admin = MagicMock() mock_admin.a_get_user = AsyncMock(return_value=None) mock_get_admin.return_value = mock_admin # Act await token_manager.disable_keycloak_user(user_id, email) # Assert mock_admin.a_get_user.assert_called_once_with(user_id) mock_admin.a_update_user.assert_not_called() @pytest.mark.asyncio async def test_disable_keycloak_user_exception_handling(token_manager): """Test that disable_keycloak_user handles exceptions gracefully without raising.""" # Arrange user_id = 'test_user_id' email = 'user@colsch.us' with patch('server.auth.token_manager.get_keycloak_admin') as mock_get_admin: mock_admin = MagicMock() mock_admin.a_get_user = AsyncMock(side_effect=Exception('Connection error')) mock_get_admin.return_value = mock_admin # Act & Assert - should not raise exception await token_manager.disable_keycloak_user(user_id, email) # Verify the method was called mock_admin.a_get_user.assert_called_once_with(user_id) class TestRefreshBitbucketDataCenterToken: """Tests for the _refresh_bitbucket_data_center_token code path.""" @pytest.mark.asyncio async def test_happy_path(self, token_manager): """Credentials are sent in the POST body (not Basic auth); response is parsed.""" mock_response = MagicMock() mock_response.raise_for_status = MagicMock() mock_response.json.return_value = { 'access_token': 'new_bbs_access', 'refresh_token': 'new_bbs_refresh', 'expires_in': 3600, 'refresh_token_expires_in': 86400, } with ( patch( 'server.auth.token_manager.BITBUCKET_DATA_CENTER_HOST', 'bitbucket.example.com', ), patch( 'server.auth.token_manager.BITBUCKET_DATA_CENTER_TOKEN_URL', 'https://bitbucket.example.com/oauth2/token', ), patch( 'server.auth.token_manager.BITBUCKET_DATA_CENTER_CLIENT_ID', 'test_client_id', ), patch( 'server.auth.token_manager.BITBUCKET_DATA_CENTER_CLIENT_SECRET', 'test_client_secret', ), patch('httpx.AsyncClient') as mock_client_cls, ): mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock( return_value=mock_client ) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) result = await token_manager._refresh_bitbucket_data_center_token( 'old_refresh_token' ) # Credentials are sent in the POST body, not in a Basic-auth header mock_client.post.assert_called_once_with( 'https://bitbucket.example.com/oauth2/token', data={ 'client_id': 'test_client_id', 'client_secret': 'test_client_secret', 'refresh_token': 'old_refresh_token', 'grant_type': 'refresh_token', }, ) # Response is parsed correctly assert result['access_token'] == 'new_bbs_access' assert result['refresh_token'] == 'new_bbs_refresh' @pytest.mark.asyncio async def test_empty_url_raises_value_error(self, token_manager): """When BITBUCKET_DATA_CENTER_HOST is not set, ValueError is raised immediately.""" with patch('server.auth.token_manager.BITBUCKET_DATA_CENTER_HOST', ''): with pytest.raises(ValueError, match='BITBUCKET_DATA_CENTER_HOST'): await token_manager._refresh_bitbucket_data_center_token( 'some_refresh_token' ) @pytest.mark.asyncio async def test_http_error_propagates(self, token_manager): """When raise_for_status() raises, the exception propagates to the caller.""" import httpx mock_response = MagicMock() mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( '401 Unauthorized', request=MagicMock(), response=MagicMock(status_code=401), ) with ( patch( 'server.auth.token_manager.BITBUCKET_DATA_CENTER_HOST', 'bitbucket.example.com', ), patch( 'server.auth.token_manager.BITBUCKET_DATA_CENTER_TOKEN_URL', 'https://bitbucket.example.com/oauth2/token', ), patch('httpx.AsyncClient') as mock_client_cls, ): mock_client = AsyncMock() mock_client.post = AsyncMock(return_value=mock_response) mock_client_cls.return_value.__aenter__ = AsyncMock( return_value=mock_client ) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) with pytest.raises(httpx.HTTPStatusError): await token_manager._refresh_bitbucket_data_center_token( 'old_refresh_token' ) class TestOrgTokenMethods: """Test cases for store_org_token and load_org_token methods.""" @pytest.mark.asyncio async def test_store_org_token_new_installation(self, token_manager): """Test storing a token for a new installation.""" installation_id = 12345 installation_token = 'ghs_test_token_abc123' mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalars.return_value.first.return_value = None mock_session.execute = AsyncMock(return_value=mock_result) mock_session.add = MagicMock() mock_session.commit = AsyncMock() mock_context_manager = AsyncMock() mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session) mock_context_manager.__aexit__ = AsyncMock(return_value=None) with patch( 'server.auth.token_manager.a_session_maker', return_value=mock_context_manager, ): await token_manager.store_org_token(installation_id, installation_token) mock_session.add.assert_called_once() mock_session.commit.assert_called_once() @pytest.mark.asyncio async def test_store_org_token_update_existing_installation(self, token_manager): """Test updating a token for an existing installation.""" installation_id = 12345 installation_token = 'ghs_test_token_abc123' mock_installation = MagicMock() mock_installation.encrypted_token = 'old_encrypted_token' mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalars.return_value.first.return_value = mock_installation mock_session.execute = AsyncMock(return_value=mock_result) mock_session.commit = AsyncMock() mock_context_manager = AsyncMock() mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session) mock_context_manager.__aexit__ = AsyncMock(return_value=None) with patch( 'server.auth.token_manager.a_session_maker', return_value=mock_context_manager, ): await token_manager.store_org_token(installation_id, installation_token) # Verify token was updated (encrypted) assert mock_installation.encrypted_token != 'old_encrypted_token' mock_session.commit.assert_called_once() @pytest.mark.asyncio async def test_load_org_token_found(self, token_manager): """Test loading a token that exists.""" installation_id = 12345 original_token = 'ghs_test_token_abc123' encrypted_token = token_manager.encrypt_text(original_token) mock_installation = MagicMock() mock_installation.encrypted_token = encrypted_token mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalars.return_value.first.return_value = mock_installation mock_session.execute = AsyncMock(return_value=mock_result) mock_context_manager = AsyncMock() mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session) mock_context_manager.__aexit__ = AsyncMock(return_value=None) with patch( 'server.auth.token_manager.a_session_maker', return_value=mock_context_manager, ): result = await token_manager.load_org_token(installation_id) assert result == original_token @pytest.mark.asyncio async def test_load_org_token_not_found(self, token_manager): """Test loading a token that doesn't exist.""" installation_id = 99999 mock_session = AsyncMock() mock_result = MagicMock() mock_result.scalars.return_value.first.return_value = None mock_session.execute = AsyncMock(return_value=mock_result) mock_context_manager = AsyncMock() mock_context_manager.__aenter__ = AsyncMock(return_value=mock_session) mock_context_manager.__aexit__ = AsyncMock(return_value=None) with patch( 'server.auth.token_manager.a_session_maker', return_value=mock_context_manager, ): result = await token_manager.load_org_token(installation_id) assert result is None @pytest.mark.asyncio async def test_store_and_load_org_token_integration(self, token_manager): """Test that store and load work together using the same encryption.""" installation_id = 12345 original_token = 'ghs_test_token_abc123' # Store stored_encrypted_token = None def capture_add(obj): nonlocal stored_encrypted_token stored_encrypted_token = obj.encrypted_token mock_session_store = AsyncMock() mock_result_store = MagicMock() mock_result_store.scalars.return_value.first.return_value = None mock_session_store.execute = AsyncMock(return_value=mock_result_store) mock_session_store.add = capture_add mock_session_store.commit = AsyncMock() mock_context_manager_store = AsyncMock() mock_context_manager_store.__aenter__ = AsyncMock( return_value=mock_session_store ) mock_context_manager_store.__aexit__ = AsyncMock(return_value=None) with patch( 'server.auth.token_manager.a_session_maker', return_value=mock_context_manager_store, ): await token_manager.store_org_token(installation_id, original_token) # Verify we captured the encrypted token assert stored_encrypted_token is not None # Load - using the captured encrypted token mock_installation_load = MagicMock() mock_installation_load.encrypted_token = stored_encrypted_token mock_session_load = AsyncMock() mock_result_load = MagicMock() mock_result_load.scalars.return_value.first.return_value = ( mock_installation_load ) mock_session_load.execute = AsyncMock(return_value=mock_result_load) mock_context_manager_load = AsyncMock() mock_context_manager_load.__aenter__ = AsyncMock(return_value=mock_session_load) mock_context_manager_load.__aexit__ = AsyncMock(return_value=None) with patch( 'server.auth.token_manager.a_session_maker', return_value=mock_context_manager_load, ): loaded_token = await token_manager.load_org_token(installation_id) assert loaded_token == original_token