diff --git a/enterprise/server/routes/orgs.py b/enterprise/server/routes/orgs.py index 07bb884097..0eea36d727 100644 --- a/enterprise/server/routes/orgs.py +++ b/enterprise/server/routes/orgs.py @@ -470,3 +470,74 @@ async def get_org_members( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Failed to retrieve members', ) + + +@org_router.delete('/{org_id}/members/{user_id}') +async def remove_org_member( + org_id: str, + user_id: str, + current_user_id: str = Depends(get_user_id), +): + """Remove a member from an organization. + + Only owners and admins can remove members: + - Owners can remove admins and regular users + - Admins can only remove regular users + + Users cannot remove themselves. The last owner cannot be removed. + """ + try: + success, error = await OrgMemberService.remove_org_member( + org_id=UUID(org_id), + target_user_id=UUID(user_id), + current_user_id=UUID(current_user_id), + ) + + if not success: + error_map = { + 'not_a_member': ( + status.HTTP_403_FORBIDDEN, + 'You are not a member of this organization', + ), + 'cannot_remove_self': ( + status.HTTP_403_FORBIDDEN, + 'Cannot remove yourself from an organization', + ), + 'member_not_found': ( + status.HTTP_404_NOT_FOUND, + 'Member not found in this organization', + ), + 'insufficient_permission': ( + status.HTTP_403_FORBIDDEN, + 'You do not have permission to remove this member', + ), + 'cannot_remove_last_owner': ( + status.HTTP_400_BAD_REQUEST, + 'Cannot remove the last owner of an organization', + ), + 'removal_failed': ( + status.HTTP_500_INTERNAL_SERVER_ERROR, + 'Failed to remove member', + ), + } + status_code, detail = error_map.get( + error, (status.HTTP_500_INTERNAL_SERVER_ERROR, 'An error occurred') + ) + raise HTTPException(status_code=status_code, detail=detail) + + return {'message': 'Member removed successfully'} + + except HTTPException: + raise + except ValueError: + logger.exception('Invalid UUID format') + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail='Invalid organization or user ID format', + ) + except Exception: + logger.exception('Error removing organization member') + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail='Failed to remove member', + ) diff --git a/enterprise/server/services/org_member_service.py b/enterprise/server/services/org_member_service.py index 273ebc81df..8c6c5d7a53 100644 --- a/enterprise/server/services/org_member_service.py +++ b/enterprise/server/services/org_member_service.py @@ -4,6 +4,13 @@ from uuid import UUID from server.routes.org_models import OrgMemberPage, OrgMemberResponse from storage.org_member_store import OrgMemberStore +from storage.role_store import RoleStore + +from openhands.utils.async_utils import call_sync_from_async + +# Role rank constants +OWNER_RANK = 10 +ADMIN_RANK = 20 class OrgMemberService: @@ -65,3 +72,79 @@ class OrgMemberService: next_page_id = str(offset + limit) return True, None, OrgMemberPage(items=items, next_page_id=next_page_id) + + @staticmethod + async def remove_org_member( + org_id: UUID, + target_user_id: UUID, + current_user_id: UUID, + ) -> tuple[bool, str | None]: + """Remove a member from an organization. + + Returns: + Tuple of (success, error_message). If success is True, error_message is None. + """ + + def _remove_member(): + # Get current user's membership in the org + requester_membership = OrgMemberStore.get_org_member( + org_id, current_user_id + ) + if not requester_membership: + return False, 'not_a_member' + + # Check if trying to remove self + if str(current_user_id) == str(target_user_id): + return False, 'cannot_remove_self' + + # Get target user's membership + target_membership = OrgMemberStore.get_org_member(org_id, target_user_id) + if not target_membership: + return False, 'member_not_found' + + requester_role = RoleStore.get_role_by_id(requester_membership.role_id) + target_role = RoleStore.get_role_by_id(target_membership.role_id) + + if not requester_role or not target_role: + return False, 'role_not_found' + + # Check permission based on role ranks + if not OrgMemberService._can_remove_member( + requester_role.rank, target_role.rank + ): + return False, 'insufficient_permission' + + # Check if removing the last owner + if target_role.rank == OWNER_RANK: + if OrgMemberService._is_last_owner(org_id, target_user_id): + return False, 'cannot_remove_last_owner' + + # Perform the removal + success = OrgMemberStore.remove_user_from_org(org_id, target_user_id) + if not success: + return False, 'removal_failed' + + return True, None + + return await call_sync_from_async(_remove_member) + + @staticmethod + def _can_remove_member(requester_rank: int, target_rank: int) -> bool: + """Check if requester can remove target based on role ranks.""" + if requester_rank == OWNER_RANK: + return True + elif requester_rank == ADMIN_RANK: + return target_rank > ADMIN_RANK + return False + + @staticmethod + def _is_last_owner(org_id: UUID, user_id: UUID) -> bool: + """Check if user is the last owner of the organization.""" + members = OrgMemberStore.get_org_members(org_id) + owners = [] + for m in members: + # Use role_id (column) instead of role (relationship) to avoid DetachedInstanceError + role = RoleStore.get_role_by_id(m.role_id) + if role and role.rank == OWNER_RANK: + owners.append(m) + return len(owners) == 1 and str(owners[0].user_id) == str(user_id) diff --git a/enterprise/tests/unit/server/routes/test_orgs.py b/enterprise/tests/unit/server/routes/test_orgs.py index 1bccb80e34..bee6928aae 100644 --- a/enterprise/tests/unit/server/routes/test_orgs.py +++ b/enterprise/tests/unit/server/routes/test_orgs.py @@ -5,11 +5,11 @@ Tests the POST /api/organizations endpoint with various scenarios. """ import uuid -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest -from fastapi import FastAPI, HTTPException, status +from fastapi import FastAPI, HTTPException, Request, status from fastapi.testclient import TestClient # Mock database before imports @@ -26,7 +26,7 @@ with patch('storage.database.engine', create=True), patch( OrgNameExistsError, OrgNotFoundError, ) - from server.routes.orgs import get_org_members, org_router + from server.routes.orgs import get_org_members, org_router, remove_org_member from storage.org import Org from openhands.server.user_auth import get_user_id @@ -47,6 +47,13 @@ def mock_app(): return app +@pytest.fixture +def mock_request(): + """Create a mock request object.""" + request = MagicMock(spec=Request) + return request + + @pytest.fixture def org_id(): """Create a test organization ID.""" @@ -59,6 +66,12 @@ def current_user_id(): return str(uuid.uuid4()) +@pytest.fixture +def target_user_id(): + """Create a test target user ID.""" + return str(uuid.uuid4()) + + @pytest.mark.asyncio async def test_create_org_success(mock_app): """ @@ -1984,3 +1997,295 @@ class TestGetOrgMembersEndpoint: page_id='100', limit=100, ) + + +class TestRemoveOrgMemberEndpoint: + """Test cases for DELETE /api/organizations/{org_id}/members/{user_id} endpoint.""" + + @pytest.mark.asyncio + async def test_remove_member_succeeds_returns_200( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that successful removal returns 200 with success message.""" + # Arrange + with ( + patch( + 'server.routes.orgs.get_user_id', return_value=current_user_id + ) as mock_get_user_id, + patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove, + ): + mock_remove.return_value = (True, None) + + # Act + result = await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + # Assert + assert result == {'message': 'Member removed successfully'} + mock_get_user_id.assert_not_called() # current_user_id is passed directly + mock_remove.assert_called_once() + + @pytest.mark.asyncio + async def test_not_a_member_returns_403( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that not being a member returns 403 Forbidden.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'not_a_member') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN + assert 'not a member of this organization' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_cannot_remove_self_returns_403( + self, mock_request, org_id, current_user_id + ): + """Test that trying to remove oneself returns 403 Forbidden.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'cannot_remove_self') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=current_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN + assert 'Cannot remove yourself' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_member_not_found_returns_404( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that member not found returns 404 Not Found.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'member_not_found') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND + assert 'Member not found' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_insufficient_permission_returns_403( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that insufficient permission returns 403 Forbidden.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'insufficient_permission') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN + assert 'do not have permission' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_cannot_remove_last_owner_returns_400( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that removing last owner returns 400 Bad Request.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'cannot_remove_last_owner') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST + assert 'Cannot remove the last owner' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_removal_failed_returns_500( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that removal failure returns 500 Internal Server Error.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'removal_failed') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert 'Failed to remove member' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_unknown_error_returns_500( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that unknown error returns 500 Internal Server Error.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.return_value = (False, 'unknown_error') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert 'An error occurred' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_invalid_org_id_format_returns_400( + self, mock_request, current_user_id, target_user_id + ): + """Test that invalid org_id UUID format returns 400 Bad Request.""" + # Arrange + invalid_org_id = 'not-a-uuid' + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=invalid_org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST + assert 'Invalid organization or user ID format' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_invalid_user_id_format_returns_400( + self, mock_request, org_id, current_user_id + ): + """Test that invalid user_id UUID format returns 400 Bad Request.""" + # Arrange + invalid_user_id = 'not-a-uuid' + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=invalid_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST + assert 'Invalid organization or user ID format' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_invalid_current_user_id_format_returns_400( + self, mock_request, org_id, target_user_id + ): + """Test that invalid current_user_id UUID format returns 400 Bad Request.""" + # Arrange + invalid_current_user_id = 'not-a-uuid' + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=invalid_current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST + assert 'Invalid organization or user ID format' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_service_exception_returns_500( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that service exception returns 500 Internal Server Error.""" + # Arrange + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.side_effect = Exception('Database connection failed') + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert 'Failed to remove member' in exc_info.value.detail + + @pytest.mark.asyncio + async def test_http_exception_is_re_raised( + self, mock_request, org_id, current_user_id, target_user_id + ): + """Test that HTTPException from service is re-raised.""" + # Arrange + original_exception = HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail='Service temporarily unavailable', + ) + + with patch( + 'server.routes.orgs.OrgMemberService.remove_org_member' + ) as mock_remove: + mock_remove.side_effect = original_exception + + # Act & Assert + with pytest.raises(HTTPException) as exc_info: + await remove_org_member( + org_id=org_id, + user_id=target_user_id, + current_user_id=current_user_id, + ) + + assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + assert exc_info.value.detail == 'Service temporarily unavailable' diff --git a/enterprise/tests/unit/server/services/test_org_member_service.py b/enterprise/tests/unit/server/services/test_org_member_service.py index 881fc05567..f7b241f538 100644 --- a/enterprise/tests/unit/server/services/test_org_member_service.py +++ b/enterprise/tests/unit/server/services/test_org_member_service.py @@ -498,3 +498,643 @@ class TestOrgMemberServiceGetOrgMembers: assert success is True assert data is not None assert len(data.items) == 2 + + +@pytest.fixture +def target_membership_owner(org_id, target_user_id, owner_role): + """Create a mock target membership with owner role.""" + membership = MagicMock(spec=OrgMember) + membership.org_id = org_id + membership.user_id = target_user_id + membership.role_id = owner_role.id + return membership + + +class TestOrgMemberServiceRemoveOrgMember: + """Test cases for OrgMemberService.remove_org_member.""" + + @pytest.mark.asyncio + async def test_owner_removes_user_succeeds( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + target_membership_user, + owner_role, + user_role, + ): + """Test that an owner can successfully remove a regular user.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + patch( + 'server.services.org_member_service.OrgMemberStore.remove_user_from_org' + ) as mock_remove, + ): + mock_get_member.side_effect = [ + requester_membership_owner, + target_membership_user, + ] + mock_get_role.side_effect = [owner_role, user_role] + mock_remove.return_value = True + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is True + assert error is None + mock_remove.assert_called_once_with(org_id, target_user_id) + + @pytest.mark.asyncio + async def test_owner_removes_admin_succeeds( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + target_membership_admin, + owner_role, + admin_role, + ): + """Test that an owner can successfully remove an admin.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + patch( + 'server.services.org_member_service.OrgMemberStore.remove_user_from_org' + ) as mock_remove, + ): + mock_get_member.side_effect = [ + requester_membership_owner, + target_membership_admin, + ] + mock_get_role.side_effect = [owner_role, admin_role] + mock_remove.return_value = True + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is True + assert error is None + + @pytest.mark.asyncio + async def test_admin_removes_user_succeeds( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_admin, + target_membership_user, + admin_role, + user_role, + ): + """Test that an admin can successfully remove a regular user.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + patch( + 'server.services.org_member_service.OrgMemberStore.remove_user_from_org' + ) as mock_remove, + ): + mock_get_member.side_effect = [ + requester_membership_admin, + target_membership_user, + ] + mock_get_role.side_effect = [admin_role, user_role] + mock_remove.return_value = True + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is True + assert error is None + + @pytest.mark.asyncio + async def test_requester_not_a_member_returns_error( + self, org_id, current_user_id, target_user_id + ): + """Test that removing fails when requester is not a member of the organization.""" + # Arrange + with patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member: + mock_get_member.return_value = None + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'not_a_member' + + @pytest.mark.asyncio + async def test_cannot_remove_self_returns_error( + self, org_id, current_user_id, requester_membership_owner, owner_role + ): + """Test that removing fails when trying to remove oneself.""" + # Arrange + with patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member: + mock_get_member.return_value = requester_membership_owner + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, current_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'cannot_remove_self' + + @pytest.mark.asyncio + async def test_target_member_not_found_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + owner_role, + ): + """Test that removing fails when target member is not found.""" + # Arrange + with patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member: + mock_get_member.side_effect = [requester_membership_owner, None] + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'member_not_found' + + @pytest.mark.asyncio + async def test_role_not_found_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + target_membership_user, + owner_role, + ): + """Test that removing fails when role is not found.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_member.side_effect = [ + requester_membership_owner, + target_membership_user, + ] + mock_get_role.side_effect = [owner_role, None] + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'role_not_found' + + @pytest.mark.asyncio + async def test_admin_cannot_remove_admin_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_admin, + target_membership_admin, + admin_role, + ): + """Test that an admin cannot remove another admin.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_member.side_effect = [ + requester_membership_admin, + target_membership_admin, + ] + mock_get_role.side_effect = [admin_role, admin_role] + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'insufficient_permission' + + @pytest.mark.asyncio + async def test_admin_cannot_remove_owner_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_admin, + target_membership_owner, + admin_role, + owner_role, + ): + """Test that an admin cannot remove an owner.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_member.side_effect = [ + requester_membership_admin, + target_membership_owner, + ] + mock_get_role.side_effect = [admin_role, owner_role] + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'insufficient_permission' + + @pytest.mark.asyncio + async def test_user_cannot_remove_anyone_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_admin, + target_membership_user, + user_role, + ): + """Test that a regular user cannot remove anyone.""" + # Arrange + requester_membership_user = MagicMock(spec=OrgMember) + requester_membership_user.org_id = org_id + requester_membership_user.user_id = current_user_id + requester_membership_user.role_id = user_role.id + + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_member.side_effect = [ + requester_membership_user, + target_membership_user, + ] + mock_get_role.side_effect = [user_role, user_role] + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'insufficient_permission' + + @pytest.mark.asyncio + async def test_cannot_remove_last_owner_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + target_membership_owner, + owner_role, + ): + """Test that removing the last owner fails.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_members' + ) as mock_get_members, + ): + mock_get_member.side_effect = [ + requester_membership_owner, + target_membership_owner, + ] + mock_get_role.return_value = owner_role + # Only one owner (the target) + mock_get_members.return_value = [target_membership_owner] + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'cannot_remove_last_owner' + + @pytest.mark.asyncio + async def test_can_remove_owner_when_multiple_owners_exist( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + target_membership_owner, + owner_role, + ): + """Test that an owner can be removed when there are multiple owners.""" + # Arrange + another_owner = MagicMock(spec=OrgMember) + another_owner.user_id = uuid.uuid4() + another_owner.role_id = owner_role.id + + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_members' + ) as mock_get_members, + patch( + 'server.services.org_member_service.OrgMemberStore.remove_user_from_org' + ) as mock_remove, + ): + mock_get_member.side_effect = [ + requester_membership_owner, + target_membership_owner, + ] + mock_get_role.return_value = owner_role + # Multiple owners exist + mock_get_members.return_value = [ + requester_membership_owner, + target_membership_owner, + another_owner, + ] + mock_remove.return_value = True + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is True + assert error is None + + @pytest.mark.asyncio + async def test_removal_failed_returns_error( + self, + org_id, + current_user_id, + target_user_id, + requester_membership_owner, + target_membership_user, + owner_role, + user_role, + ): + """Test that removing fails when store removal returns False.""" + # Arrange + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_member' + ) as mock_get_member, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + patch( + 'server.services.org_member_service.OrgMemberStore.remove_user_from_org' + ) as mock_remove, + ): + mock_get_member.side_effect = [ + requester_membership_owner, + target_membership_user, + ] + mock_get_role.side_effect = [owner_role, user_role] + mock_remove.return_value = False + + # Act + success, error = await OrgMemberService.remove_org_member( + org_id, target_user_id, current_user_id + ) + + # Assert + assert success is False + assert error == 'removal_failed' + + +class TestOrgMemberServiceCanRemoveMember: + """Test cases for OrgMemberService._can_remove_member.""" + + def test_owner_can_remove_admin(self): + """Test that owner (rank 10) can remove admin (rank 20).""" + # Arrange + requester_rank = 10 + target_rank = 20 + + # Act + result = OrgMemberService._can_remove_member(requester_rank, target_rank) + + # Assert + assert result is True + + def test_owner_can_remove_user(self): + """Test that owner (rank 10) can remove user (rank 1000).""" + # Arrange + requester_rank = 10 + target_rank = 1000 + + # Act + result = OrgMemberService._can_remove_member(requester_rank, target_rank) + + # Assert + assert result is True + + def test_admin_can_remove_user(self): + """Test that admin (rank 20) can remove user (rank 1000).""" + # Arrange + requester_rank = 20 + target_rank = 1000 + + # Act + result = OrgMemberService._can_remove_member(requester_rank, target_rank) + + # Assert + assert result is True + + def test_admin_cannot_remove_admin(self): + """Test that admin (rank 20) cannot remove another admin (rank 20).""" + # Arrange + requester_rank = 20 + target_rank = 20 + + # Act + result = OrgMemberService._can_remove_member(requester_rank, target_rank) + + # Assert + assert result is False + + def test_admin_cannot_remove_owner(self): + """Test that admin (rank 20) cannot remove owner (rank 10).""" + # Arrange + requester_rank = 20 + target_rank = 10 + + # Act + result = OrgMemberService._can_remove_member(requester_rank, target_rank) + + # Assert + assert result is False + + def test_user_cannot_remove_anyone(self): + """Test that user (rank 1000) cannot remove anyone.""" + # Arrange + requester_rank = 1000 + target_rank = 1000 + + # Act + result = OrgMemberService._can_remove_member(requester_rank, target_rank) + + # Assert + assert result is False + + +class TestOrgMemberServiceIsLastOwner: + """Test cases for OrgMemberService._is_last_owner.""" + + def test_is_last_owner_when_only_one_owner( + self, org_id, target_user_id, owner_role + ): + """Test that returns True when user is the only owner.""" + # Arrange + target_membership = MagicMock(spec=OrgMember) + target_membership.user_id = target_user_id + target_membership.role_id = owner_role.id + + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_members' + ) as mock_get_members, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_members.return_value = [target_membership] + mock_get_role.return_value = owner_role + + # Act + result = OrgMemberService._is_last_owner(org_id, target_user_id) + + # Assert + assert result is True + + def test_is_not_last_owner_when_multiple_owners( + self, org_id, target_user_id, owner_role + ): + """Test that returns False when there are multiple owners.""" + # Arrange + target_membership = MagicMock(spec=OrgMember) + target_membership.user_id = target_user_id + target_membership.role_id = owner_role.id + + another_owner = MagicMock(spec=OrgMember) + another_owner.user_id = uuid.uuid4() + another_owner.role_id = owner_role.id + + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_members' + ) as mock_get_members, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_members.return_value = [target_membership, another_owner] + mock_get_role.return_value = owner_role + + # Act + result = OrgMemberService._is_last_owner(org_id, target_user_id) + + # Assert + assert result is False + + def test_is_not_last_owner_when_user_is_not_owner( + self, org_id, target_user_id, user_role + ): + """Test that returns False when user is not an owner.""" + # Arrange + target_membership = MagicMock(spec=OrgMember) + target_membership.user_id = target_user_id + target_membership.role_id = user_role.id + + with ( + patch( + 'server.services.org_member_service.OrgMemberStore.get_org_members' + ) as mock_get_members, + patch( + 'server.services.org_member_service.RoleStore.get_role_by_id' + ) as mock_get_role, + ): + mock_get_members.return_value = [target_membership] + mock_get_role.return_value = user_role + + # Act + result = OrgMemberService._is_last_owner(org_id, target_user_id) + + # Assert + assert result is False