Files
OpenHands/enterprise/tests/unit/test_auth_routes.py
2026-03-10 13:11:33 -06:00

1963 lines
78 KiB
Python

import base64
import json
from unittest.mock import AsyncMock, MagicMock, patch
import jwt
import pytest
from fastapi import HTTPException, Request, Response, status
from fastapi.responses import JSONResponse, RedirectResponse
from pydantic import SecretStr
from server.auth.auth_error import AuthError
from server.auth.saas_user_auth import SaasUserAuth
from server.auth.user.user_authorizer import UserAuthorizationResponse, UserAuthorizer
from server.routes.auth import (
accept_tos,
authenticate,
keycloak_callback,
keycloak_offline_callback,
logout,
set_response_cookie,
)
from openhands.integrations.service_types import ProviderType
def create_mock_user_authorizer(success: bool = True, error_detail: str | None = None):
"""Create a mock UserAuthorizer that returns the specified authorization result."""
mock_authorizer = MagicMock(spec=UserAuthorizer)
mock_authorizer.authorize_user = AsyncMock(
return_value=UserAuthorizationResponse(
success=success, error_detail=error_detail
)
)
return mock_authorizer
@pytest.fixture
def mock_request():
request = MagicMock(spec=Request)
request.url = MagicMock()
request.url.hostname = 'localhost'
request.url.netloc = 'localhost:8000'
request.url.path = '/oauth/keycloak/callback'
request.base_url = 'http://localhost:8000/'
request.headers = {}
request.cookies = {}
return request
@pytest.fixture
def mock_response():
return MagicMock(spec=Response)
def test_set_response_cookie(mock_response, mock_request):
"""Test setting the auth cookie on a response."""
with (
patch('server.routes.auth.config') as mock_config,
patch('server.utils.url_utils.get_global_config') as get_global_config,
):
mock_config.jwt_secret.get_secret_value.return_value = 'test_secret'
get_global_config.return_value = MagicMock(web_url='https://example.com')
set_response_cookie(
request=mock_request,
response=mock_response,
keycloak_access_token='test_access_token',
keycloak_refresh_token='test_refresh_token',
secure=True,
accepted_tos=True,
)
mock_response.set_cookie.assert_called_once()
args, kwargs = mock_response.set_cookie.call_args
assert kwargs['key'] == 'keycloak_auth'
assert 'value' in kwargs
assert kwargs['httponly'] is True
assert kwargs['secure'] is True
assert kwargs['samesite'] == 'strict'
assert kwargs['domain'] == 'example.com'
# Verify the JWT token contains the correct data
token_data = jwt.decode(kwargs['value'], 'test_secret', algorithms=['HS256'])
assert token_data['access_token'] == 'test_access_token'
assert token_data['refresh_token'] == 'test_refresh_token'
assert token_data['accepted_tos'] is True
@pytest.mark.asyncio
async def test_keycloak_callback_missing_code(mock_request):
"""Test keycloak_callback with missing code."""
with pytest.raises(HTTPException) as exc_info:
await keycloak_callback(
code='',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing code' in exc_info.value.detail
@pytest.mark.asyncio
async def test_keycloak_callback_token_retrieval_failure(mock_request):
"""Test keycloak_callback when token retrieval fails."""
get_keycloak_tokens_mock = AsyncMock(return_value=(None, None))
with patch(
'server.routes.auth.token_manager.get_keycloak_tokens', get_keycloak_tokens_mock
):
with pytest.raises(HTTPException) as exc_info:
await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST
assert 'Problem retrieving Keycloak tokens' in exc_info.value.detail
get_keycloak_tokens_mock.assert_called_once()
# Note: test_keycloak_callback_missing_user_info was removed as part of the
# user authorization refactor. The "Missing user ID or username" check has been
# removed from keycloak_callback - authorization is now handled by UserAuthorizer.
@pytest.mark.asyncio
async def test_keycloak_callback_user_not_authorized(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when user authorization fails."""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = None
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.migrate_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Create mock user authorizer that denies authorization
mock_authorizer = create_mock_user_authorizer(
success=False, error_detail='blocked'
)
with pytest.raises(HTTPException) as exc_info:
await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=mock_authorizer,
)
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
assert exc_info.value.detail == 'blocked'
@pytest.mark.asyncio
async def test_keycloak_callback_success_with_valid_offline_token(
mock_request, create_keycloak_user_info
):
"""Test successful keycloak_callback with valid offline token."""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.set_response_cookie') as mock_set_cookie,
patch('server.routes.auth.UserStore') as mock_user_store,
patch('server.routes.auth.posthog') as mock_posthog,
):
# Mock user with accepted_tos
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
# Setup UserStore mocks
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.migrate_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert result.headers['location'] == 'test_state'
mock_token_manager.store_idp_tokens.assert_called_once_with(
ProviderType.GITHUB, 'test_user_id', 'test_access_token'
)
mock_set_cookie.assert_called_once_with(
request=mock_request,
response=result,
keycloak_access_token='test_access_token',
keycloak_refresh_token='test_refresh_token',
secure=False,
accepted_tos=True,
)
mock_posthog.set.assert_called_once()
@pytest.mark.asyncio
async def test_keycloak_callback_email_not_verified(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when email is not verified."""
# Arrange
mock_verify_email = AsyncMock()
mock_rate_limit = AsyncMock()
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.email.verify_email', mock_verify_email),
patch('server.routes.auth.check_rate_limit_by_user_id', mock_rate_limit),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email_verified=False,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert 'email_verification_required=true' in result.headers['location']
assert 'user_id=test_user_id' in result.headers['location']
mock_verify_email.assert_called_once_with(
request=mock_request, user_id='test_user_id', is_auth_flow=True
)
# Verify rate limit was checked
mock_rate_limit.assert_called_once_with(
request=mock_request,
key_prefix='auth_verify_email',
user_id='test_user_id',
user_rate_limit_seconds=60,
ip_rate_limit_seconds=120,
)
@pytest.mark.asyncio
async def test_keycloak_callback_email_not_verified_missing_field(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when email_verified field is missing (defaults to False)."""
# Arrange
mock_verify_email = AsyncMock()
mock_rate_limit = AsyncMock()
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.email.verify_email', mock_verify_email),
patch('server.routes.auth.check_rate_limit_by_user_id', mock_rate_limit),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
# email_verified field is missing
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert 'email_verification_required=true' in result.headers['location']
assert 'user_id=test_user_id' in result.headers['location']
mock_verify_email.assert_called_once_with(
request=mock_request, user_id='test_user_id', is_auth_flow=True
)
@pytest.mark.asyncio
async def test_keycloak_callback_email_verification_rate_limited(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when email verification is rate limited.
Users who repeatedly try to login without completing email verification
should not trigger unlimited verification emails.
"""
from fastapi import HTTPException
# Arrange
mock_verify_email = AsyncMock()
mock_rate_limit = AsyncMock(
side_effect=HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail='Too many requests. Please wait 1 minute before trying again.',
)
)
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.email.verify_email', mock_verify_email),
patch('server.routes.auth.check_rate_limit_by_user_id', mock_rate_limit),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email_verified=False,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert - should still redirect to verification page but NOT send email
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert 'email_verification_required=true' in result.headers['location']
assert 'user_id=test_user_id' in result.headers['location']
# When rate limited, the redirect URL should include rate_limited=true
# so the frontend can show an appropriate message
assert 'rate_limited=true' in result.headers['location']
# verify_email should NOT have been called due to rate limit
mock_verify_email.assert_not_called()
mock_rate_limit.assert_called_once()
@pytest.mark.asyncio
async def test_keycloak_callback_success_without_offline_token(
mock_request, create_keycloak_user_info
):
"""Test successful keycloak_callback without valid offline token."""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.set_response_cookie') as mock_set_cookie,
patch(
'server.routes.auth.KEYCLOAK_SERVER_URL_EXT', 'https://keycloak.example.com'
),
patch('server.routes.auth.KEYCLOAK_REALM_NAME', 'test-realm'),
patch('server.routes.auth.KEYCLOAK_CLIENT_ID', 'test-client'),
patch('server.routes.auth.UserStore') as mock_user_store,
patch('server.routes.auth.posthog') as mock_posthog,
):
# Mock user with accepted_tos
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
# Setup UserStore mocks
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.migrate_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
# Set validate_offline_token to return False to test the "without offline token" scenario
mock_token_manager.validate_offline_token = AsyncMock(return_value=False)
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
# In this case, we should be redirected to the Keycloak offline token URL
assert 'keycloak.example.com' in result.headers['location']
assert 'offline_access' in result.headers['location']
mock_token_manager.store_idp_tokens.assert_called_once_with(
ProviderType.GITHUB, 'test_user_id', 'test_access_token'
)
# When redirecting to Keycloak for offline token, redirect_url becomes https://keycloak...
# so secure=True is expected
mock_set_cookie.assert_called_once_with(
request=mock_request,
response=result,
keycloak_access_token='test_access_token',
keycloak_refresh_token='test_refresh_token',
secure=True,
accepted_tos=True,
)
mock_posthog.set.assert_called_once()
@pytest.mark.asyncio
async def test_keycloak_callback_account_linking_error(mock_request):
"""Test keycloak_callback with account linking error."""
# Test the case where error is 'temporarily_unavailable' and error_description is 'authentication_expired'
result = await keycloak_callback(
code=None,
state='http://redirect.example.com',
error='temporarily_unavailable',
error_description='authentication_expired',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert result.headers['location'] == 'http://redirect.example.com'
@pytest.mark.asyncio
async def test_keycloak_offline_callback_missing_code(mock_request):
"""Test keycloak_offline_callback with missing code."""
result = await keycloak_offline_callback('', 'test_state', mock_request)
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_400_BAD_REQUEST
assert 'error' in result.body.decode()
assert 'Missing code' in result.body.decode()
@pytest.mark.asyncio
async def test_keycloak_offline_callback_token_retrieval_failure(mock_request):
"""Test keycloak_offline_callback when token retrieval fails."""
with patch('server.routes.auth.token_manager') as mock_token_manager:
mock_token_manager.get_keycloak_tokens = AsyncMock(return_value=(None, None))
result = await keycloak_offline_callback(
'test_code', 'test_state', mock_request
)
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_400_BAD_REQUEST
assert 'error' in result.body.decode()
assert 'Problem retrieving Keycloak tokens' in result.body.decode()
@pytest.mark.asyncio
async def test_keycloak_offline_callback_missing_user_info(mock_request):
"""Test keycloak_offline_callback when user info is missing required fields."""
from pydantic import ValidationError
with patch('server.routes.auth.token_manager') as mock_token_manager:
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
# With Pydantic model, missing 'sub' raises ValidationError during get_user_info
mock_token_manager.get_user_info = AsyncMock(
side_effect=ValidationError.from_exception_data(
'KeycloakUserInfo',
[
{
'type': 'missing',
'loc': ('sub',),
'input': {'some_field': 'value'},
}
],
)
)
# The endpoint should propagate the error (or handle it gracefully)
with pytest.raises(ValidationError):
await keycloak_offline_callback('test_code', 'test_state', mock_request)
@pytest.mark.asyncio
async def test_keycloak_offline_callback_success(
mock_request, create_keycloak_user_info
):
"""Test successful keycloak_offline_callback."""
with patch('server.routes.auth.token_manager') as mock_token_manager:
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(sub='test_user_id')
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.store_offline_token = AsyncMock()
result = await keycloak_offline_callback(
'test_code', 'test_state', mock_request
)
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert result.headers['location'] == 'test_state'
mock_token_manager.store_offline_token.assert_called_once_with(
user_id='test_user_id', offline_token='test_refresh_token'
)
@pytest.mark.asyncio
async def test_authenticate_success():
"""Test successful authentication."""
with patch('server.routes.auth.get_access_token') as mock_get_token:
mock_get_token.return_value = 'test_access_token'
result = await authenticate(MagicMock())
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_200_OK
assert 'message' in result.body.decode()
assert 'User authenticated' in result.body.decode()
@pytest.mark.asyncio
async def test_authenticate_failure():
"""Test authentication failure."""
with patch('server.routes.auth.get_access_token') as mock_get_token:
mock_get_token.side_effect = AuthError()
result = await authenticate(MagicMock())
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_401_UNAUTHORIZED
assert 'error' in result.body.decode()
assert 'User is not authenticated' in result.body.decode()
@pytest.mark.asyncio
async def test_logout_with_refresh_token():
"""Test logout with refresh token."""
mock_request = MagicMock()
mock_request.state.user_auth = SaasUserAuth(
refresh_token=SecretStr('test-refresh-token'), user_id='test_user_id'
)
with patch('server.routes.auth.token_manager') as mock_token_manager:
mock_token_manager.logout = AsyncMock()
result = await logout(mock_request)
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_200_OK
assert 'message' in result.body.decode()
assert 'User logged out' in result.body.decode()
mock_token_manager.logout.assert_called_once_with('test-refresh-token')
# Cookie should be deleted
assert 'set-cookie' in result.headers
@pytest.mark.asyncio
async def test_logout_without_refresh_token():
"""Test logout without refresh token."""
mock_request = MagicMock(state=MagicMock(user_auth=None))
# No refresh_token attribute
with patch('server.routes.auth.token_manager') as mock_token_manager:
with patch(
'openhands.server.user_auth.default_user_auth.DefaultUserAuth.get_instance'
) as mock_get_instance:
mock_get_instance.side_effect = AuthError()
result = await logout(mock_request)
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_200_OK
assert 'message' in result.body.decode()
assert 'User logged out' in result.body.decode()
mock_token_manager.logout.assert_not_called()
assert 'set-cookie' in result.headers
@pytest.mark.asyncio
async def test_keycloak_callback_blocked_email_domain(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when user authorization fails (blocked email domain)."""
# Arrange
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@colsch.us',
identity_provider='github',
)
)
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Create mock user authorizer that blocks the user
mock_authorizer = create_mock_user_authorizer(
success=False, error_detail='blocked'
)
# Act
with pytest.raises(HTTPException) as exc_info:
await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=mock_authorizer,
)
# Assert
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
assert exc_info.value.detail == 'blocked'
# Note: test_keycloak_callback_allowed_email_domain was simplified as part of
# the user authorization refactor. The email domain authorization logic is now
# in DefaultUserAuthorizer and tested in test_user_authorization_store.py.
# The keycloak_callback test only needs to verify it proceeds when authorized.
# Note: test_keycloak_callback_domain_blocking_inactive was removed as part of
# the user authorization refactor. The concept of "domain blocking inactive" no
# longer applies - authorization is always performed by UserAuthorizer.
@pytest.mark.asyncio
async def test_keycloak_callback_missing_email(mock_request, create_keycloak_user_info):
"""Test keycloak_callback when user info does not contain email."""
# Arrange
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email_verified=True,
# No email field
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
mock_user_auth_store.get_authorization_type.assert_not_called()
mock_token_manager.disable_keycloak_user.assert_not_called()
@pytest.mark.asyncio
async def test_keycloak_callback_duplicate_email_detected(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when duplicate email is detected by UserAuthorizer.
Note: Duplicate email detection has been moved to DefaultUserAuthorizer.
This test verifies that keycloak_callback correctly handles the authorization
failure when a duplicate email is detected.
"""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.UserStore') as mock_user_store,
):
# Arrange
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='joe+test@example.com',
identity_provider='github',
)
)
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Create mock authorizer that returns duplicate_email error
mock_authorizer = create_mock_user_authorizer(
success=False, error_detail='duplicate_email'
)
# Act & Assert - should raise HTTPException with 401
with pytest.raises(HTTPException) as exc_info:
await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=mock_authorizer,
)
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
assert exc_info.value.detail == 'duplicate_email'
# Note: test_keycloak_callback_duplicate_email_deletion_fails was removed as part of
# the user authorization refactor. The Keycloak user deletion logic for duplicate emails
# has been removed from keycloak_callback. If this behavior needs to be restored,
# it should be implemented in the DefaultUserAuthorizer or handled separately.
@pytest.mark.asyncio
async def test_keycloak_callback_duplicate_check_exception(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when duplicate check raises exception."""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.UserStore') as mock_user_store,
):
# Arrange
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='joe+test@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.check_duplicate_base_email = AsyncMock(
side_effect=Exception('Check failed')
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
# Should proceed with normal flow despite exception (fail open)
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
@pytest.mark.asyncio
async def test_keycloak_callback_no_duplicate_email(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when authorization succeeds (no duplicate email).
Note: Duplicate email detection has been moved to DefaultUserAuthorizer.
This test verifies the normal flow when authorization is successful.
"""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.UserStore') as mock_user_store,
):
# Arrange
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='joe+test@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act - use successful authorizer (no duplicate detected)
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(success=True),
)
# Assert - normal redirect flow should succeed
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
@pytest.mark.asyncio
async def test_keycloak_callback_no_email_in_user_info(
mock_request, create_keycloak_user_info
):
"""Test keycloak_callback when email is not in user_info."""
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.UserStore') as mock_user_store,
):
# Arrange
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
# No email field
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
# Mock the user creation
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
# Act
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
# Should not check for duplicate when email is missing
mock_token_manager.check_duplicate_base_email.assert_not_called()
class TestKeycloakCallbackRecaptcha:
"""Tests for reCAPTCHA integration in keycloak_callback()."""
@pytest.mark.asyncio
async def test_should_verify_recaptcha_and_allow_login_when_score_is_high(
self, mock_request, create_keycloak_user_info
):
"""Test that login proceeds when reCAPTCHA score is high."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = True
mock_assessment_result.score = 0.9
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
result = await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
mock_recaptcha_service.create_assessment.assert_called_once()
@pytest.mark.asyncio
async def test_should_block_login_when_recaptcha_score_is_low(
self, mock_request, create_keycloak_user_info
):
"""Test that login is blocked and redirected when reCAPTCHA score is low."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = False
mock_assessment_result.score = 0.2
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
)
)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
result = await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
assert 'recaptcha_blocked=true' in result.headers['location']
@pytest.mark.asyncio
async def test_should_extract_ip_from_x_forwarded_for_header(
self, mock_request, create_keycloak_user_info
):
"""Test that IP is extracted from X-Forwarded-For header when present."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_request.headers = {'X-Forwarded-For': '192.168.1.1, 10.0.0.1'}
mock_request.client = None
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = True
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
call_args = mock_recaptcha_service.create_assessment.call_args
assert call_args[1]['user_ip'] == '192.168.1.1'
@pytest.mark.asyncio
async def test_should_use_client_host_when_x_forwarded_for_missing(
self, mock_request, create_keycloak_user_info
):
"""Test that client.host is used when X-Forwarded-For is missing."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_request.headers = {}
mock_request.client = MagicMock()
mock_request.client.host = '192.168.1.2'
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = True
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
call_args = mock_recaptcha_service.create_assessment.call_args
assert call_args[1]['user_ip'] == '192.168.1.2'
@pytest.mark.asyncio
async def test_should_use_unknown_ip_when_client_is_none(
self, mock_request, create_keycloak_user_info
):
"""Test that 'unknown' IP is used when client is None."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_request.headers = {}
mock_request.client = None
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = True
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
call_args = mock_recaptcha_service.create_assessment.call_args
assert call_args[1]['user_ip'] == 'unknown'
@pytest.mark.asyncio
async def test_should_include_email_in_assessment_when_available(
self, mock_request, create_keycloak_user_info
):
"""Test that email is included in assessment when available."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = True
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
call_args = mock_recaptcha_service.create_assessment.call_args
assert call_args[1]['email'] == 'user@example.com'
@pytest.mark.asyncio
async def test_should_skip_recaptcha_when_site_key_not_configured(
self, mock_request, create_keycloak_user_info
):
"""Test that reCAPTCHA is skipped when RECAPTCHA_SITE_KEY is not configured."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', ''),
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Act
await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
mock_recaptcha_service.create_assessment.assert_not_called()
@pytest.mark.asyncio
async def test_should_skip_recaptcha_when_token_is_missing(
self, mock_request, create_keycloak_user_info
):
"""Test that reCAPTCHA is skipped when token is missing from state."""
# Arrange
state = 'https://example.com' # Old format without token
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Act
await keycloak_callback(
code='test_code',
state=state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
mock_recaptcha_service.create_assessment.assert_not_called()
@pytest.mark.asyncio
async def test_should_fail_open_when_recaptcha_service_throws_exception(
self, mock_request, create_keycloak_user_info
):
"""Test that login proceeds (fail open) when reCAPTCHA service throws exception."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch('server.routes.auth.a_session_maker') as mock_session_maker,
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.posthog'),
patch('server.routes.auth.logger') as mock_logger,
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_user_settings = MagicMock()
mock_user_settings.accepted_tos = '2025-01-01'
mock_query.first.return_value = mock_user_settings
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
identity_provider='github',
email_verified=True,
)
)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
mock_recaptcha_service.create_assessment.side_effect = Exception(
'Service error'
)
# Act
result = await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
assert isinstance(result, RedirectResponse)
# Check that reCAPTCHA error was logged (may be called multiple times due to other errors)
recaptcha_error_calls = [
call
for call in mock_logger.exception.call_args_list
if 'reCAPTCHA verification error' in str(call)
]
assert len(recaptcha_error_calls) > 0
@pytest.mark.asyncio
async def test_should_log_warning_when_recaptcha_blocks_user(
self, mock_request, create_keycloak_user_info
):
"""Test that warning is logged when reCAPTCHA blocks user."""
# Arrange
state_data = {
'redirect_url': 'https://example.com',
'recaptcha_token': 'test-token',
}
encoded_state = base64.urlsafe_b64encode(
json.dumps(state_data).encode()
).decode()
mock_assessment_result = MagicMock()
mock_assessment_result.allowed = False
mock_assessment_result.score = 0.2
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.recaptcha_service') as mock_recaptcha_service,
patch('server.routes.auth.RECAPTCHA_SITE_KEY', 'test-site-key'),
patch(
'storage.user_authorization_store.UserAuthorizationStore'
) as mock_user_auth_store,
patch('server.routes.auth.logger') as mock_logger,
patch('server.routes.email.verify_email', new_callable=AsyncMock),
patch('server.routes.auth.UserStore') as mock_user_store,
):
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(
return_value=create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
email='user@example.com',
)
)
mock_token_manager.check_duplicate_base_email = AsyncMock(
return_value=False
)
# Setup UserStore mocks
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_user_auth_store.get_authorization_type = AsyncMock(return_value=None)
# Patch the module-level recaptcha_service instance
mock_recaptcha_service.create_assessment.return_value = (
mock_assessment_result
)
# Act
await keycloak_callback(
code='test_code',
state=encoded_state,
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
# Assert
mock_logger.warning.assert_called_once()
call_kwargs = mock_logger.warning.call_args
assert call_kwargs[0][0] == 'recaptcha_blocked_at_callback'
assert call_kwargs[1]['extra']['score'] == 0.2
assert call_kwargs[1]['extra']['user_id'] == 'test_user_id'
@pytest.mark.asyncio
async def test_keycloak_callback_calls_backfill_user_email_for_existing_user(
mock_request, create_keycloak_user_info
):
"""When an existing user logs in, backfill_user_email should be called."""
user_info = create_keycloak_user_info(
sub='test_user_id',
preferred_username='test_user',
identity_provider='github',
email='test@example.com',
email_verified=True,
)
with (
patch('server.routes.auth.token_manager') as mock_token_manager,
patch('server.routes.auth.set_response_cookie'),
patch('server.routes.auth.UserStore') as mock_user_store,
patch('server.routes.auth.posthog'),
):
mock_user = MagicMock()
mock_user.id = 'test_user_id'
mock_user.current_org_id = 'test_org_id'
mock_user.accepted_tos = '2025-01-01'
mock_user_store.get_user_by_id = AsyncMock(return_value=mock_user)
mock_user_store.create_user = AsyncMock(return_value=mock_user)
mock_user_store.backfill_contact_name = AsyncMock()
mock_user_store.backfill_user_email = AsyncMock()
mock_token_manager.get_keycloak_tokens = AsyncMock(
return_value=('test_access_token', 'test_refresh_token')
)
mock_token_manager.get_user_info = AsyncMock(return_value=user_info)
mock_token_manager.store_idp_tokens = AsyncMock()
mock_token_manager.validate_offline_token = AsyncMock(return_value=True)
mock_token_manager.check_duplicate_base_email = AsyncMock(return_value=False)
result = await keycloak_callback(
code='test_code',
state='test_state',
request=mock_request,
user_authorizer=create_mock_user_authorizer(),
)
assert isinstance(result, RedirectResponse)
assert result.status_code == 302
# backfill_user_email should have been called with the user_id and user_info dict
mock_user_store.backfill_user_email.assert_called_once_with(
'test_user_id', user_info.model_dump(exclude_none=True)
)
@pytest.mark.asyncio
async def test_accept_tos_stores_timezone_naive_datetime(mock_request):
"""Test that accept_tos stores a timezone-naive datetime for database compatibility."""
# Arrange
test_user_id = '12345678-1234-5678-1234-567812345678'
mock_user = MagicMock()
mock_user.id = test_user_id
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_user
mock_session = AsyncMock()
mock_session.execute.return_value = mock_result
mock_session.commit = AsyncMock()
mock_session_context = AsyncMock()
mock_session_context.__aenter__.return_value = mock_session
mock_session_context.__aexit__.return_value = None
mock_user_auth = MagicMock(spec=SaasUserAuth)
mock_user_auth.get_access_token = AsyncMock(
return_value=SecretStr('test_access_token')
)
mock_user_auth.refresh_token = SecretStr('test_refresh_token')
mock_user_auth.get_user_id = AsyncMock(return_value=test_user_id)
mock_request.json = AsyncMock(return_value={'redirect_url': 'http://example.com'})
with (
patch(
'server.routes.auth.get_user_auth', AsyncMock(return_value=mock_user_auth)
),
patch('server.routes.auth.a_session_maker', return_value=mock_session_context),
patch('server.routes.auth.set_response_cookie'),
):
# Act
result = await accept_tos(mock_request)
# Assert
assert isinstance(result, JSONResponse)
assert result.status_code == status.HTTP_200_OK
# The datetime assigned to user.accepted_tos must be timezone-naive
# (compatible with TIMESTAMP WITHOUT TIME ZONE database column)
assert mock_user.accepted_tos.tzinfo is None