diff --git a/enterprise/server/auth/authorization.py b/enterprise/server/auth/authorization.py new file mode 100644 index 0000000000..522ef47631 --- /dev/null +++ b/enterprise/server/auth/authorization.py @@ -0,0 +1,306 @@ +""" +Permission-based authorization dependencies for API endpoints. + +This module provides FastAPI dependencies for checking user permissions +within organizations. It uses a permission-based authorization model where +roles (owner, admin, member) are mapped to specific permissions. + +Permissions are defined in the Permission enum and mapped to roles via +ROLE_PERMISSIONS. This allows fine-grained access control while maintaining +the familiar role-based hierarchy. + +Usage: + from server.auth.authorization import ( + Permission, + require_permission, + ) + + @router.get('/{org_id}/settings') + async def get_settings( + org_id: UUID, + user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)), + ): + # Only users with VIEW_LLM_SETTINGS permission can access + ... + + @router.patch('/{org_id}/settings') + async def update_settings( + org_id: UUID, + user_id: str = Depends(require_permission(Permission.EDIT_LLM_SETTINGS)), + ): + # Only users with EDIT_LLM_SETTINGS permission can access + ... +""" + +from enum import Enum +from uuid import UUID + +from fastapi import Depends, HTTPException, status +from storage.org_member_store import OrgMemberStore +from storage.role import Role +from storage.role_store import RoleStore + +from openhands.core.logger import openhands_logger as logger +from openhands.server.user_auth import get_user_id + + +class Permission(str, Enum): + """Permissions that can be assigned to roles.""" + + # Secrets + MANAGE_SECRETS = 'manage_secrets' + + # MCP + MANAGE_MCP = 'manage_mcp' + + # Integrations + MANAGE_INTEGRATIONS = 'manage_integrations' + + # Application Settings + MANAGE_APPLICATION_SETTINGS = 'manage_application_settings' + + # API Keys + MANAGE_API_KEYS = 'manage_api_keys' + + # LLM Settings + VIEW_LLM_SETTINGS = 'view_llm_settings' + EDIT_LLM_SETTINGS = 'edit_llm_settings' + + # Billing + VIEW_BILLING = 'view_billing' + ADD_CREDITS = 'add_credits' + + # Organization Members + INVITE_USER_TO_ORGANIZATION = 'invite_user_to_organization' + CHANGE_USER_ROLE_MEMBER = 'change_user_role:member' + CHANGE_USER_ROLE_ADMIN = 'change_user_role:admin' + CHANGE_USER_ROLE_OWNER = 'change_user_role:owner' + + # Organization Management + VIEW_ORG_SETTINGS = 'view_org_settings' + CHANGE_ORGANIZATION_NAME = 'change_organization_name' + DELETE_ORGANIZATION = 'delete_organization' + + # Temporary permissions until we finish the API updates. + EDIT_ORG_SETTINGS = 'edit_org_settings' + + +class RoleName(str, Enum): + """Role names used in the system.""" + + OWNER = 'owner' + ADMIN = 'admin' + MEMBER = 'member' + + +# Permission mappings for each role +ROLE_PERMISSIONS: dict[RoleName, frozenset[Permission]] = { + RoleName.OWNER: frozenset( + [ + # Settings (Full access) + Permission.MANAGE_SECRETS, + Permission.MANAGE_MCP, + Permission.MANAGE_INTEGRATIONS, + Permission.MANAGE_APPLICATION_SETTINGS, + Permission.MANAGE_API_KEYS, + Permission.VIEW_LLM_SETTINGS, + Permission.EDIT_LLM_SETTINGS, + Permission.VIEW_BILLING, + Permission.ADD_CREDITS, + # Organization Members + Permission.INVITE_USER_TO_ORGANIZATION, + Permission.CHANGE_USER_ROLE_MEMBER, + Permission.CHANGE_USER_ROLE_ADMIN, + Permission.CHANGE_USER_ROLE_OWNER, + # Organization Management + Permission.VIEW_ORG_SETTINGS, + Permission.EDIT_ORG_SETTINGS, + # Organization Management (Owner only) + Permission.CHANGE_ORGANIZATION_NAME, + Permission.DELETE_ORGANIZATION, + ] + ), + RoleName.ADMIN: frozenset( + [ + # Settings (Full access) + Permission.MANAGE_SECRETS, + Permission.MANAGE_MCP, + Permission.MANAGE_INTEGRATIONS, + Permission.MANAGE_APPLICATION_SETTINGS, + Permission.MANAGE_API_KEYS, + Permission.VIEW_LLM_SETTINGS, + Permission.EDIT_LLM_SETTINGS, + Permission.VIEW_BILLING, + Permission.ADD_CREDITS, + # Organization Members + Permission.INVITE_USER_TO_ORGANIZATION, + Permission.CHANGE_USER_ROLE_MEMBER, + Permission.CHANGE_USER_ROLE_ADMIN, + # Organization Management + Permission.VIEW_ORG_SETTINGS, + Permission.EDIT_ORG_SETTINGS, + ] + ), + RoleName.MEMBER: frozenset( + [ + # Settings (Full access) + Permission.MANAGE_SECRETS, + Permission.MANAGE_MCP, + Permission.MANAGE_INTEGRATIONS, + Permission.MANAGE_APPLICATION_SETTINGS, + Permission.MANAGE_API_KEYS, + # Settings (View only) + Permission.VIEW_ORG_SETTINGS, + Permission.VIEW_LLM_SETTINGS, + ] + ), +} + + +def get_user_org_role(user_id: str, org_id: UUID | None) -> Role | None: + """ + Get the user's role in an organization (synchronous version). + + Args: + user_id: User ID (string that will be converted to UUID) + org_id: Organization ID, or None to use the user's current organization + + Returns: + Role object if user is a member, None otherwise + """ + from uuid import UUID as parse_uuid + + if org_id is None: + org_member = OrgMemberStore.get_org_member_for_current_org(parse_uuid(user_id)) + else: + org_member = OrgMemberStore.get_org_member(org_id, parse_uuid(user_id)) + if not org_member: + return None + + return RoleStore.get_role_by_id(org_member.role_id) + + +async def get_user_org_role_async(user_id: str, org_id: UUID | None) -> Role | None: + """ + Get the user's role in an organization (async version). + + Args: + user_id: User ID (string that will be converted to UUID) + org_id: Organization ID, or None to use the user's current organization + + Returns: + Role object if user is a member, None otherwise + """ + from uuid import UUID as parse_uuid + + if org_id is None: + org_member = await OrgMemberStore.get_org_member_for_current_org_async( + parse_uuid(user_id) + ) + else: + org_member = await OrgMemberStore.get_org_member_async( + org_id, parse_uuid(user_id) + ) + if not org_member: + return None + + return await RoleStore.get_role_by_id_async(org_member.role_id) + + +def get_role_permissions(role_name: str) -> frozenset[Permission]: + """ + Get the permissions for a role. + + Args: + role_name: Name of the role + + Returns: + Set of permissions for the role + """ + try: + role_enum = RoleName(role_name) + return ROLE_PERMISSIONS.get(role_enum, frozenset()) + except ValueError: + return frozenset() + + +def has_permission(user_role: Role, permission: Permission) -> bool: + """ + Check if a role has a specific permission. + + Args: + user_role: User's Role object + permission: Permission to check + + Returns: + True if the role has the permission + """ + permissions = get_role_permissions(user_role.name) + return permission in permissions + + +def require_permission(permission: Permission): + """ + Factory function that creates a dependency to require a specific permission. + + This creates a FastAPI dependency that: + 1. Extracts org_id from the path parameter + 2. Gets the authenticated user_id + 3. Checks if the user has the required permission in the organization + 4. Returns the user_id if authorized, raises HTTPException otherwise + + Usage: + @router.get('/{org_id}/settings') + async def get_settings( + org_id: UUID, + user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)), + ): + ... + + Args: + permission: The permission required to access the endpoint + + Returns: + Dependency function that validates permission and returns user_id + """ + + async def permission_checker( + org_id: UUID | None = None, + user_id: str | None = Depends(get_user_id), + ) -> str: + if not user_id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail='User not authenticated', + ) + + user_role = await get_user_org_role_async(user_id, org_id) + + if not user_role: + logger.warning( + 'User not a member of organization', + extra={'user_id': user_id, 'org_id': str(org_id)}, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail='User is not a member of this organization', + ) + + if not has_permission(user_role, permission): + logger.warning( + 'Insufficient permissions', + extra={ + 'user_id': user_id, + 'org_id': str(org_id), + 'user_role': user_role.name, + 'required_permission': permission.value, + }, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f'Requires {permission.value} permission', + ) + + return user_id + + return permission_checker diff --git a/enterprise/server/routes/orgs.py b/enterprise/server/routes/orgs.py index 9421f4d56a..258f633d2d 100644 --- a/enterprise/server/routes/orgs.py +++ b/enterprise/server/routes/orgs.py @@ -2,6 +2,10 @@ from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status +from server.auth.authorization import ( + Permission, + require_permission, +) from server.email_validation import get_admin_user_id from server.routes.org_models import ( CannotModifySelfError, @@ -189,23 +193,26 @@ async def create_org( @org_router.get('/{org_id}', response_model=OrgResponse, status_code=status.HTTP_200_OK) async def get_org( org_id: UUID, - user_id: str = Depends(get_user_id), + user_id: str = Depends(require_permission(Permission.VIEW_ORG_SETTINGS)), ) -> OrgResponse: """Get organization details by ID. - This endpoint allows authenticated users who are members of an organization - to retrieve its details. Only members of the organization can access this endpoint. + This endpoint retrieves details for a specific organization. Access requires + the VIEW_ORG_SETTINGS permission, which is granted to all organization members + (member, admin, and owner roles). Args: org_id: Organization ID (UUID) - user_id: Authenticated user ID (injected by dependency) + user_id: Authenticated user ID (injected by require_permission dependency) Returns: OrgResponse: The organization details Raises: + HTTPException: 401 if user is not authenticated + HTTPException: 403 if user lacks VIEW_ORG_SETTINGS permission + HTTPException: 404 if organization not found HTTPException: 422 if org_id is not a valid UUID (handled by FastAPI) - HTTPException: 404 if organization not found or user is not a member HTTPException: 500 if retrieval fails """ logger.info( @@ -305,23 +312,24 @@ async def get_me( @org_router.delete('/{org_id}', status_code=status.HTTP_200_OK) async def delete_org( org_id: UUID, - user_id: str = Depends(get_user_id), + user_id: str = Depends(require_permission(Permission.DELETE_ORGANIZATION)), ) -> dict: """Delete an organization. - This endpoint allows authenticated organization owners to delete their organization. - All associated data including organization members, conversations, billing data, - and external LiteLLM team resources will be permanently removed. + This endpoint permanently deletes an organization and all associated data including + organization members, conversations, billing data, and external LiteLLM team resources. + Access requires the DELETE_ORGANIZATION permission, which is granted only to owners. Args: - org_id: Organization ID to delete - user_id: Authenticated user ID (injected by dependency) + org_id: Organization ID to delete (UUID) + user_id: Authenticated user ID (injected by require_permission dependency) Returns: dict: Confirmation message with deleted organization details Raises: - HTTPException: 403 if user is not the organization owner + HTTPException: 401 if user is not authenticated + HTTPException: 403 if user lacks DELETE_ORGANIZATION permission HTTPException: 404 if organization not found HTTPException: 500 if deletion fails """ @@ -414,25 +422,26 @@ async def delete_org( async def update_org( org_id: UUID, update_data: OrgUpdate, - user_id: str = Depends(get_user_id), + user_id: str = Depends(require_permission(Permission.EDIT_ORG_SETTINGS)), ) -> OrgResponse: """Update an existing organization. - This endpoint allows authenticated users to update organization settings. - LLM-related settings require admin or owner role in the organization. + This endpoint updates organization settings. Access requires the EDIT_ORG_SETTINGS + permission, which is granted to admin and owner roles. Args: - org_id: Organization ID to update (UUID validated by FastAPI) + org_id: Organization ID to update (UUID) update_data: Organization update data - user_id: Authenticated user ID (injected by dependency) + user_id: Authenticated user ID (injected by require_permission dependency) Returns: OrgResponse: The updated organization details Raises: - HTTPException: 400 if org_id is invalid UUID format (handled by FastAPI) - HTTPException: 403 if user lacks permission for LLM settings + HTTPException: 401 if user is not authenticated + HTTPException: 403 if user lacks EDIT_ORG_SETTINGS permission HTTPException: 404 if organization not found + HTTPException: 409 if organization name already exists HTTPException: 422 if validation errors occur (handled by FastAPI) HTTPException: 500 if update fails """ @@ -496,7 +505,7 @@ async def update_org( @org_router.get('/{org_id}/members') async def get_org_members( - org_id: str, + org_id: UUID, page_id: Annotated[ str | None, Query(title='Optional next_page_id from the previously returned page'), @@ -509,13 +518,33 @@ async def get_org_members( lte=100, ), ] = 100, - current_user_id: str = Depends(get_user_id), + user_id: str = Depends(require_permission(Permission.VIEW_ORG_SETTINGS)), ) -> OrgMemberPage: - """Get all members of an organization with cursor-based pagination.""" + """Get all members of an organization with cursor-based pagination. + + This endpoint retrieves a paginated list of organization members. Access requires + the VIEW_ORG_SETTINGS permission, which is granted to all organization members + (member, admin, and owner roles). + + Args: + org_id: Organization ID (UUID) + page_id: Optional page ID (offset) for pagination + limit: Maximum number of members to return (1-100, default 100) + user_id: Authenticated user ID (injected by require_permission dependency) + + Returns: + OrgMemberPage: Paginated list of organization members + + Raises: + HTTPException: 401 if user is not authenticated + HTTPException: 403 if user lacks VIEW_ORG_SETTINGS permission + HTTPException: 400 if org_id or page_id format is invalid + HTTPException: 500 if retrieval fails + """ try: success, error_code, data = await OrgMemberService.get_org_members( - org_id=UUID(org_id), - current_user_id=UUID(current_user_id), + org_id=org_id, + current_user_id=UUID(user_id), page_id=page_id, limit=limit, ) @@ -562,7 +591,7 @@ async def get_org_members( @org_router.delete('/{org_id}/members/{user_id}') async def remove_org_member( - org_id: str, + org_id: UUID, user_id: str, current_user_id: str = Depends(get_user_id), ): @@ -576,7 +605,7 @@ async def remove_org_member( """ try: success, error = await OrgMemberService.remove_org_member( - org_id=UUID(org_id), + org_id=org_id, target_user_id=UUID(user_id), current_user_id=UUID(current_user_id), ) @@ -708,7 +737,7 @@ async def switch_org( @org_router.patch('/{org_id}/members/{user_id}', response_model=OrgMemberResponse) async def update_org_member( - org_id: str, + org_id: UUID, user_id: str, update_data: OrgMemberUpdate, current_user_id: str = Depends(get_user_id), @@ -725,7 +754,7 @@ async def update_org_member( """ try: return await OrgMemberService.update_org_member( - org_id=UUID(org_id), + org_id=org_id, target_user_id=UUID(user_id), current_user_id=UUID(current_user_id), update_data=update_data, diff --git a/enterprise/storage/org_member_store.py b/enterprise/storage/org_member_store.py index bbac631de0..b0cecdad24 100644 --- a/enterprise/storage/org_member_store.py +++ b/enterprise/storage/org_member_store.py @@ -9,6 +9,7 @@ from sqlalchemy import select from sqlalchemy.orm import joinedload from storage.database import a_session_maker, session_maker from storage.org_member import OrgMember +from storage.user import User from storage.user_settings import UserSettings from openhands.storage.data_models.settings import Settings @@ -60,6 +61,51 @@ class OrgMemberStore: ) return result.scalars().first() + @staticmethod + def get_org_member_for_current_org(user_id: UUID) -> Optional[OrgMember]: + """Get the org member for a user's current organization. + + Args: + user_id: The user's UUID. + + Returns: + The OrgMember for the user's current organization, or None if not found. + """ + with session_maker() as session: + result = ( + session.query(OrgMember) + .join(User, User.id == OrgMember.user_id) + .filter( + User.id == user_id, + OrgMember.org_id == User.current_org_id, + ) + .first() + ) + return result + + @staticmethod + async def get_org_member_for_current_org_async( + user_id: UUID, + ) -> Optional[OrgMember]: + """Get the org member for a user's current organization (async version). + + Args: + user_id: The user's UUID. + + Returns: + The OrgMember for the user's current organization, or None if not found. + """ + async with a_session_maker() as session: + result = await session.execute( + select(OrgMember) + .join(User, User.id == OrgMember.user_id) + .filter( + User.id == user_id, + OrgMember.org_id == User.current_org_id, + ) + ) + return result.scalars().first() + @staticmethod def get_user_orgs(user_id: UUID) -> list[OrgMember]: """Get all organizations for a user.""" diff --git a/enterprise/storage/role_store.py b/enterprise/storage/role_store.py index 9bc220a5bf..fa35cc461f 100644 --- a/enterprise/storage/role_store.py +++ b/enterprise/storage/role_store.py @@ -29,6 +29,20 @@ class RoleStore: with session_maker() as session: return session.query(Role).filter(Role.id == role_id).first() + @staticmethod + async def get_role_by_id_async( + role_id: int, + session: Optional[AsyncSession] = None, + ) -> Optional[Role]: + """Get role by ID (async version).""" + if session is not None: + result = await session.execute(select(Role).where(Role.id == role_id)) + return result.scalars().first() + + async with a_session_maker() as session: + result = await session.execute(select(Role).where(Role.id == role_id)) + return result.scalars().first() + @staticmethod def get_role_by_name(name: str) -> Optional[Role]: """Get role by name.""" diff --git a/enterprise/tests/unit/server/routes/test_orgs.py b/enterprise/tests/unit/server/routes/test_orgs.py index a2a4aa2256..5d95e28892 100644 --- a/enterprise/tests/unit/server/routes/test_orgs.py +++ b/enterprise/tests/unit/server/routes/test_orgs.py @@ -47,6 +47,10 @@ with patch('storage.database.engine', create=True), patch( from openhands.server.user_auth import get_user_id +# Test user ID constant (must be a valid UUID string) +TEST_USER_ID = str(uuid.uuid4()) + + @pytest.fixture def mock_app(): """Create a test FastAPI app with organization routes and mocked auth.""" @@ -55,9 +59,13 @@ def mock_app(): # Override the auth dependency to return a test user def mock_get_admin_user_id(): - return 'test-user-123' + return TEST_USER_ID + + def mock_get_user_id(): + return TEST_USER_ID app.dependency_overrides[get_admin_user_id] = mock_get_admin_user_id + app.dependency_overrides[get_user_id] = mock_get_user_id return app @@ -870,17 +878,25 @@ def mock_app_with_get_user_id(): app = FastAPI() app.include_router(org_router) - # Override the auth dependency to return a test user + # Override the auth dependency to return a valid UUID test user def mock_get_user_id(): - return 'test-user-123' + return TEST_USER_ID app.dependency_overrides[get_user_id] = mock_get_user_id return app +@pytest.fixture +def mock_owner_role(): + """Create a mock owner role for authorization tests.""" + mock_role = MagicMock() + mock_role.name = 'owner' + return mock_role + + @pytest.mark.asyncio -async def test_get_org_success(mock_app_with_get_user_id): +async def test_get_org_success(mock_app_with_get_user_id, mock_owner_role): """ GIVEN: Valid org_id and authenticated user who is a member WHEN: GET /api/organizations/{org_id} is called @@ -900,6 +916,10 @@ async def test_get_org_success(mock_app_with_get_user_id): ) with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), patch( 'server.routes.orgs.OrgService.get_org_by_id', AsyncMock(return_value=mock_org), @@ -930,27 +950,28 @@ async def test_get_org_user_not_member(mock_app_with_get_user_id): """ GIVEN: User is not a member of the organization WHEN: GET /api/organizations/{org_id} is called - THEN: 404 Not Found error is returned + THEN: 403 Forbidden error is returned (permission check fails first) """ # Arrange org_id = uuid.uuid4() + # When user is not a member, get_user_org_role returns None with patch( - 'server.routes.orgs.OrgService.get_org_by_id', - AsyncMock(side_effect=OrgNotFoundError(str(org_id))), + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=None), ): client = TestClient(mock_app_with_get_user_id) # Act response = client.get(f'/api/organizations/{org_id}') - # Assert - assert response.status_code == status.HTTP_404_NOT_FOUND - assert 'not found' in response.json()['detail'].lower() + # Assert - Permission check now returns 403 before org lookup + assert response.status_code == status.HTTP_403_FORBIDDEN + assert 'not a member' in response.json()['detail'].lower() @pytest.mark.asyncio -async def test_get_org_not_found(mock_app_with_get_user_id): +async def test_get_org_not_found(mock_app_with_get_user_id, mock_owner_role): """ GIVEN: Organization does not exist WHEN: GET /api/organizations/{org_id} is called @@ -959,9 +980,15 @@ async def test_get_org_not_found(mock_app_with_get_user_id): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.get_org_by_id', - AsyncMock(side_effect=OrgNotFoundError(str(org_id))), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.get_org_by_id', + AsyncMock(side_effect=OrgNotFoundError(str(org_id))), + ), ): client = TestClient(mock_app_with_get_user_id) @@ -1019,7 +1046,7 @@ async def test_get_org_unauthorized(): @pytest.mark.asyncio -async def test_get_org_unexpected_error(mock_app_with_get_user_id): +async def test_get_org_unexpected_error(mock_app_with_get_user_id, mock_owner_role): """ GIVEN: Unexpected error occurs during retrieval WHEN: GET /api/organizations/{org_id} is called @@ -1028,9 +1055,15 @@ async def test_get_org_unexpected_error(mock_app_with_get_user_id): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.get_org_by_id', - AsyncMock(side_effect=RuntimeError('Unexpected database error')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.get_org_by_id', + AsyncMock(side_effect=RuntimeError('Unexpected database error')), + ), ): client = TestClient(mock_app_with_get_user_id) @@ -1070,7 +1103,14 @@ async def test_get_org_personal_workspace(): org_version=5, ) + mock_role = MagicMock() + mock_role.name = 'owner' + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ), patch( 'server.routes.orgs.OrgService.get_org_by_id', AsyncMock(return_value=mock_org), @@ -1092,7 +1132,7 @@ async def test_get_org_personal_workspace(): @pytest.mark.asyncio -async def test_get_org_team_workspace(mock_app_with_get_user_id): +async def test_get_org_team_workspace(mock_app_with_get_user_id, mock_owner_role): """ GIVEN: User retrieves a team organization (org.id != user_id) WHEN: GET /api/organizations/{org_id} is called @@ -1109,6 +1149,10 @@ async def test_get_org_team_workspace(mock_app_with_get_user_id): ) with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), patch( 'server.routes.orgs.OrgService.get_org_by_id', AsyncMock(return_value=mock_org), @@ -1130,7 +1174,7 @@ async def test_get_org_team_workspace(mock_app_with_get_user_id): @pytest.mark.asyncio -async def test_get_org_with_credits_none(mock_app_with_get_user_id): +async def test_get_org_with_credits_none(mock_app_with_get_user_id, mock_owner_role): """ GIVEN: Organization exists but credits retrieval returns None WHEN: GET /api/organizations/{org_id} is called @@ -1150,6 +1194,10 @@ async def test_get_org_with_credits_none(mock_app_with_get_user_id): ) with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), patch( 'server.routes.orgs.OrgService.get_org_by_id', AsyncMock(return_value=mock_org), @@ -1171,7 +1219,9 @@ async def test_get_org_with_credits_none(mock_app_with_get_user_id): @pytest.mark.asyncio -async def test_get_org_sensitive_fields_not_exposed(mock_app_with_get_user_id): +async def test_get_org_sensitive_fields_not_exposed( + mock_app_with_get_user_id, mock_owner_role +): """ GIVEN: Organization is retrieved successfully WHEN: Response is returned @@ -1193,6 +1243,10 @@ async def test_get_org_sensitive_fields_not_exposed(mock_app_with_get_user_id): ) with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), patch( 'server.routes.orgs.OrgService.get_org_by_id', AsyncMock(return_value=mock_org), @@ -1223,7 +1277,7 @@ async def test_get_org_sensitive_fields_not_exposed(mock_app_with_get_user_id): @pytest.mark.asyncio -async def test_delete_org_success(mock_app): +async def test_delete_org_success(mock_app, mock_owner_role): """ GIVEN: Valid organization deletion request by owner WHEN: DELETE /api/organizations/{org_id} is called @@ -1238,9 +1292,15 @@ async def test_delete_org_success(mock_app): contact_email='john@example.com', ) - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock(return_value=mock_deleted_org), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock(return_value=mock_deleted_org), + ), ): client = TestClient(mock_app) @@ -1258,7 +1318,7 @@ async def test_delete_org_success(mock_app): @pytest.mark.asyncio -async def test_delete_org_not_found(mock_app): +async def test_delete_org_not_found(mock_app, mock_owner_role): """ GIVEN: Organization does not exist WHEN: DELETE /api/organizations/{org_id} is called @@ -1267,9 +1327,15 @@ async def test_delete_org_not_found(mock_app): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock(side_effect=OrgNotFoundError(str(org_id))), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock(side_effect=OrgNotFoundError(str(org_id))), + ), ): client = TestClient(mock_app) @@ -1282,7 +1348,7 @@ async def test_delete_org_not_found(mock_app): @pytest.mark.asyncio -async def test_delete_org_not_owner(mock_app): +async def test_delete_org_not_owner(mock_app, mock_owner_role): """ GIVEN: User is not the organization owner WHEN: DELETE /api/organizations/{org_id} is called @@ -1291,12 +1357,18 @@ async def test_delete_org_not_owner(mock_app): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock( - side_effect=OrgAuthorizationError( - 'Only organization owners can delete organizations' - ) + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock( + side_effect=OrgAuthorizationError( + 'Only organization owners can delete organizations' + ) + ), ), ): client = TestClient(mock_app) @@ -1319,13 +1391,10 @@ async def test_delete_org_not_member(mock_app): # Arrange org_id = uuid.uuid4() + # When user is not a member, get_user_org_role returns None with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock( - side_effect=OrgAuthorizationError( - 'User is not a member of this organization' - ) - ), + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=None), ): client = TestClient(mock_app) @@ -1338,7 +1407,7 @@ async def test_delete_org_not_member(mock_app): @pytest.mark.asyncio -async def test_delete_org_database_failure(mock_app): +async def test_delete_org_database_failure(mock_app, mock_owner_role): """ GIVEN: Database operation fails during deletion WHEN: DELETE /api/organizations/{org_id} is called @@ -1347,9 +1416,15 @@ async def test_delete_org_database_failure(mock_app): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock(side_effect=OrgDatabaseError('Database connection failed')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock(side_effect=OrgDatabaseError('Database connection failed')), + ), ): client = TestClient(mock_app) @@ -1362,7 +1437,7 @@ async def test_delete_org_database_failure(mock_app): @pytest.mark.asyncio -async def test_delete_org_unexpected_error(mock_app): +async def test_delete_org_unexpected_error(mock_app, mock_owner_role): """ GIVEN: Unexpected error occurs during deletion WHEN: DELETE /api/organizations/{org_id} is called @@ -1371,9 +1446,15 @@ async def test_delete_org_unexpected_error(mock_app): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock(side_effect=RuntimeError('Unexpected system error')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock(side_effect=RuntimeError('Unexpected system error')), + ), ): client = TestClient(mock_app) @@ -1404,7 +1485,7 @@ async def test_delete_org_invalid_uuid(mock_app): @pytest.mark.asyncio -async def test_delete_org_unauthorized(mock_app): +async def test_delete_org_unauthorized(mock_app, mock_owner_role): """ GIVEN: User is not authenticated WHEN: DELETE /api/organizations/{org_id} is called @@ -1413,9 +1494,15 @@ async def test_delete_org_unauthorized(mock_app): # Arrange org_id = uuid.uuid4() - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock(side_effect=OrgAuthorizationError('User not authorized')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock(side_effect=OrgAuthorizationError('User not authorized')), + ), ): client = TestClient(mock_app) @@ -1427,7 +1514,7 @@ async def test_delete_org_unauthorized(mock_app): @pytest.mark.asyncio -async def test_delete_org_orphaned_users(mock_app): +async def test_delete_org_orphaned_users(mock_app, mock_owner_role): """ GIVEN: Deleting org would leave users without any organization WHEN: DELETE /api/organizations/{org_id} is called @@ -1437,9 +1524,15 @@ async def test_delete_org_orphaned_users(mock_app): org_id = uuid.uuid4() orphaned_user_ids = [str(uuid.uuid4()), str(uuid.uuid4())] - with patch( - 'server.routes.orgs.OrgService.delete_org_with_cleanup', - AsyncMock(side_effect=OrphanedUserError(orphaned_user_ids)), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.delete_org_with_cleanup', + AsyncMock(side_effect=OrphanedUserError(orphaned_user_ids)), + ), ): client = TestClient(mock_app) @@ -1460,7 +1553,7 @@ def mock_update_app(): # Override the auth dependency to return a test user async def mock_user_id(): - return 'test-user-123' + return TEST_USER_ID app.dependency_overrides[get_user_id] = mock_user_id @@ -1500,7 +1593,14 @@ async def test_update_org_personal_workspace_preserved(): update_data = {'contact_name': 'Updated Name'} + mock_role = MagicMock() + mock_role.name = 'owner' + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ), patch( 'server.routes.orgs.OrgService.update_org_with_permissions', AsyncMock(return_value=updated_org), @@ -1554,7 +1654,14 @@ async def test_update_org_team_workspace_preserved(): update_data = {'name': 'Updated Team Org'} + mock_role = MagicMock() + mock_role.name = 'owner' + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ), patch( 'server.routes.orgs.OrgService.update_org_with_permissions', AsyncMock(return_value=updated_org), @@ -1580,7 +1687,7 @@ async def test_update_org_team_workspace_preserved(): @pytest.mark.asyncio -async def test_update_org_not_found(mock_update_app): +async def test_update_org_not_found(mock_update_app, mock_owner_role): """ GIVEN: Organization ID does not exist WHEN: PATCH /api/organizations/{org_id} is called @@ -1590,9 +1697,17 @@ async def test_update_org_not_found(mock_update_app): org_id = uuid.uuid4() update_data = {'contact_name': 'Jane Doe'} - with patch( - 'server.routes.orgs.OrgService.update_org_with_permissions', - AsyncMock(side_effect=ValueError(f'Organization with ID {org_id} not found')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.update_org_with_permissions', + AsyncMock( + side_effect=ValueError(f'Organization with ID {org_id} not found') + ), + ), ): async with httpx.AsyncClient( transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' @@ -1618,13 +1733,10 @@ async def test_update_org_permission_denied_non_member(mock_update_app): org_id = uuid.uuid4() update_data = {'contact_name': 'Jane Doe'} + # When user is not a member, get_user_org_role returns None with patch( - 'server.routes.orgs.OrgService.update_org_with_permissions', - AsyncMock( - side_effect=PermissionError( - 'User must be a member of the organization to update it' - ) - ), + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=None), ): async with httpx.AsyncClient( transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' @@ -1640,7 +1752,9 @@ async def test_update_org_permission_denied_non_member(mock_update_app): @pytest.mark.asyncio -async def test_update_org_permission_denied_llm_settings(mock_update_app): +async def test_update_org_permission_denied_llm_settings( + mock_update_app, mock_owner_role +): """ GIVEN: User lacks admin/owner role but tries to update LLM settings WHEN: PATCH /api/organizations/{org_id} is called @@ -1650,12 +1764,18 @@ async def test_update_org_permission_denied_llm_settings(mock_update_app): org_id = uuid.uuid4() update_data = {'default_llm_model': 'claude-opus-4-5-20251101'} - with patch( - 'server.routes.orgs.OrgService.update_org_with_permissions', - AsyncMock( - side_effect=PermissionError( - 'Admin or owner role required to update LLM settings' - ) + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.update_org_with_permissions', + AsyncMock( + side_effect=PermissionError( + 'Admin or owner role required to update LLM settings' + ) + ), ), ): async with httpx.AsyncClient( @@ -1675,7 +1795,7 @@ async def test_update_org_permission_denied_llm_settings(mock_update_app): @pytest.mark.asyncio -async def test_update_org_duplicate_name_returns_409(mock_update_app): +async def test_update_org_duplicate_name_returns_409(mock_update_app, mock_owner_role): """ GIVEN: User updates organization name to one already used by another org WHEN: PATCH /api/organizations/{org_id} is called with that name @@ -1685,9 +1805,15 @@ async def test_update_org_duplicate_name_returns_409(mock_update_app): org_id = uuid.uuid4() update_data = {'name': 'Existing Organization'} - with patch( - 'server.routes.orgs.OrgService.update_org_with_permissions', - AsyncMock(side_effect=OrgNameExistsError('Existing Organization')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.update_org_with_permissions', + AsyncMock(side_effect=OrgNameExistsError('Existing Organization')), + ), ): async with httpx.AsyncClient( transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' @@ -1703,7 +1829,7 @@ async def test_update_org_duplicate_name_returns_409(mock_update_app): @pytest.mark.asyncio -async def test_update_org_database_error(mock_update_app): +async def test_update_org_database_error(mock_update_app, mock_owner_role): """ GIVEN: Database operation fails during update WHEN: PATCH /api/organizations/{org_id} is called @@ -1713,9 +1839,15 @@ async def test_update_org_database_error(mock_update_app): org_id = uuid.uuid4() update_data = {'contact_name': 'Jane Doe'} - with patch( - 'server.routes.orgs.OrgService.update_org_with_permissions', - AsyncMock(side_effect=OrgDatabaseError('Database connection failed')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.update_org_with_permissions', + AsyncMock(side_effect=OrgDatabaseError('Database connection failed')), + ), ): async with httpx.AsyncClient( transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' @@ -1731,7 +1863,7 @@ async def test_update_org_database_error(mock_update_app): @pytest.mark.asyncio -async def test_update_org_unexpected_error(mock_update_app): +async def test_update_org_unexpected_error(mock_update_app, mock_owner_role): """ GIVEN: Unexpected error occurs during update WHEN: PATCH /api/organizations/{org_id} is called @@ -1741,9 +1873,15 @@ async def test_update_org_unexpected_error(mock_update_app): org_id = uuid.uuid4() update_data = {'contact_name': 'Jane Doe'} - with patch( - 'server.routes.orgs.OrgService.update_org_with_permissions', - AsyncMock(side_effect=RuntimeError('Unexpected system error')), + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ), + patch( + 'server.routes.orgs.OrgService.update_org_with_permissions', + AsyncMock(side_effect=RuntimeError('Unexpected system error')), + ), ): async with httpx.AsyncClient( transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' @@ -1782,7 +1920,7 @@ async def test_update_org_invalid_uuid_format(mock_update_app): @pytest.mark.asyncio -async def test_update_org_invalid_field_values(mock_update_app): +async def test_update_org_invalid_field_values(mock_update_app, mock_owner_role): """ GIVEN: Update request with invalid field values (e.g., negative max_iterations) WHEN: PATCH /api/organizations/{org_id} is called @@ -1792,18 +1930,24 @@ async def test_update_org_invalid_field_values(mock_update_app): org_id = uuid.uuid4() update_data = {'default_max_iterations': -1} # Invalid: must be > 0 - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' - ) as client: - # Act - response = await client.patch(f'/api/organizations/{org_id}', json=update_data) + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' + ) as client: + # Act + response = await client.patch( + f'/api/organizations/{org_id}', json=update_data + ) - # Assert - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + # Assert + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY @pytest.mark.asyncio -async def test_update_org_empty_name_returns_422(mock_update_app): +async def test_update_org_empty_name_returns_422(mock_update_app, mock_owner_role): """ GIVEN: Update request with empty organization name (after strip) WHEN: PATCH /api/organizations/{org_id} is called @@ -1813,18 +1957,24 @@ async def test_update_org_empty_name_returns_422(mock_update_app): org_id = uuid.uuid4() update_data = {'name': ' '} - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' - ) as client: - # Act - response = await client.patch(f'/api/organizations/{org_id}', json=update_data) + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' + ) as client: + # Act + response = await client.patch( + f'/api/organizations/{org_id}', json=update_data + ) - # Assert - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + # Assert + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY @pytest.mark.asyncio -async def test_update_org_invalid_email_format(mock_update_app): +async def test_update_org_invalid_email_format(mock_update_app, mock_owner_role): """ GIVEN: Update request with invalid email format WHEN: PATCH /api/organizations/{org_id} is called @@ -1834,14 +1984,20 @@ async def test_update_org_invalid_email_format(mock_update_app): org_id = uuid.uuid4() update_data = {'contact_email': 'invalid-email'} # Missing @ - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' - ) as client: - # Act - response = await client.patch(f'/api/organizations/{org_id}', json=update_data) + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_owner_role), + ): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=mock_update_app), base_url='http://test' + ) as client: + # Act + response = await client.patch( + f'/api/organizations/{org_id}', json=update_data + ) - # Assert - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + # Assert + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY class TestGetOrgMembersEndpoint: @@ -1871,10 +2027,10 @@ class TestGetOrgMembersEndpoint: ) as mock_get: # Act result = await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) # Assert @@ -1894,10 +2050,10 @@ class TestGetOrgMembersEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN @@ -1914,32 +2070,29 @@ class TestGetOrgMembersEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id='invalid', limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST assert 'Invalid page_id format' in exc_info.value.detail @pytest.mark.asyncio - async def test_invalid_org_id_format_returns_400(self, current_user_id): - """Test that invalid org_id UUID format returns 400 Bad Request.""" + async def test_invalid_org_id_format_returns_422(self, mock_app, current_user_id): + """Test that invalid org_id UUID format returns 422 Unprocessable Entity. + + Note: FastAPI validates UUID path parameters at the routing level, + so this test uses the HTTP test client to verify the validation. + """ # Arrange invalid_org_id = 'not-a-uuid' - # Act & Assert - with pytest.raises(HTTPException) as exc_info: - await get_org_members( - org_id=invalid_org_id, - page_id=None, - limit=100, - current_user_id=current_user_id, - ) - - assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST - assert 'Invalid organization ID format' in exc_info.value.detail + # Act & Assert - Use test client to test route-level validation + with TestClient(mock_app) as client: + response = client.get(f'/api/organizations/{invalid_org_id}/members') + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY @pytest.mark.asyncio async def test_invalid_current_user_id_format_returns_400(self, org_id): @@ -1947,13 +2100,13 @@ class TestGetOrgMembersEndpoint: # Arrange invalid_current_user_id = 'not-a-uuid' - # Act & Assert + # Act & Assert - endpoint now expects UUID type for org_id with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=invalid_current_user_id, + user_id=invalid_current_user_id, ) assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST @@ -1970,10 +2123,10 @@ class TestGetOrgMembersEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR @@ -1990,10 +2143,10 @@ class TestGetOrgMembersEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR @@ -2010,10 +2163,10 @@ class TestGetOrgMembersEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) assert exc_info.value.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR @@ -2035,10 +2188,10 @@ class TestGetOrgMembersEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id=None, limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) assert exc_info.value.status_code == status.HTTP_503_SERVICE_UNAVAILABLE @@ -2066,12 +2219,12 @@ class TestGetOrgMembersEndpoint: 'server.routes.orgs.OrgMemberService.get_org_members', AsyncMock(return_value=(True, None, mock_page)), ) as mock_get: - # Act + # Act - endpoint now expects UUID type for org_id result = await get_org_members( - org_id=org_id, + org_id=uuid.UUID(org_id), page_id='100', limit=100, - current_user_id=current_user_id, + user_id=current_user_id, ) # Assert @@ -2106,7 +2259,7 @@ class TestRemoveOrgMemberEndpoint: # Act result = await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2130,7 +2283,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2152,7 +2305,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=current_user_id, current_user_id=current_user_id, ) @@ -2174,7 +2327,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2196,7 +2349,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2218,7 +2371,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2240,7 +2393,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2262,7 +2415,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2271,23 +2424,23 @@ class TestRemoveOrgMemberEndpoint: assert 'An error occurred' in exc_info.value.detail @pytest.mark.asyncio - async def test_invalid_org_id_format_returns_400( - self, mock_request, current_user_id, target_user_id + async def test_invalid_org_id_format_returns_422( + self, mock_app, current_user_id, target_user_id ): - """Test that invalid org_id UUID format returns 400 Bad Request.""" + """Test that invalid org_id UUID format returns 422 Unprocessable Entity. + + Note: FastAPI validates UUID path parameters at the routing level, + so this test uses the HTTP test client to verify the validation. + """ # Arrange invalid_org_id = 'not-a-uuid' - # Act & Assert - with pytest.raises(HTTPException) as exc_info: - await remove_org_member( - org_id=invalid_org_id, - user_id=target_user_id, - current_user_id=current_user_id, + # Act & Assert - Use test client to test route-level validation + with TestClient(mock_app) as client: + response = client.delete( + f'/api/organizations/{invalid_org_id}/members/{target_user_id}' ) - - assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST - assert 'Invalid organization or user ID format' in exc_info.value.detail + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY @pytest.mark.asyncio async def test_invalid_user_id_format_returns_400( @@ -2297,10 +2450,10 @@ class TestRemoveOrgMemberEndpoint: # Arrange invalid_user_id = 'not-a-uuid' - # Act & Assert + # Act & Assert - endpoint now expects UUID type for org_id with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=invalid_user_id, current_user_id=current_user_id, ) @@ -2319,7 +2472,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=invalid_current_user_id, ) @@ -2341,7 +2494,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2368,7 +2521,7 @@ class TestRemoveOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await remove_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, current_user_id=current_user_id, ) @@ -2399,9 +2552,9 @@ class TestUpdateOrgMemberEndpoint: ) as mock_update: mock_update.return_value = updated - # Act + # Act - endpoint now expects UUID type for org_id result = await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, update_data=OrgMemberUpdate(role='admin'), current_user_id=current_user_id, @@ -2430,7 +2583,7 @@ class TestUpdateOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, update_data=OrgMemberUpdate(role='user'), current_user_id=current_user_id, @@ -2450,7 +2603,7 @@ class TestUpdateOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=current_user_id, update_data=OrgMemberUpdate(role='admin'), current_user_id=current_user_id, @@ -2472,7 +2625,7 @@ class TestUpdateOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, update_data=OrgMemberUpdate(role='user'), current_user_id=current_user_id, @@ -2494,7 +2647,7 @@ class TestUpdateOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, update_data=OrgMemberUpdate(role='superuser'), current_user_id=current_user_id, @@ -2518,7 +2671,7 @@ class TestUpdateOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, update_data=OrgMemberUpdate(role='admin'), current_user_id=current_user_id, @@ -2540,7 +2693,7 @@ class TestUpdateOrgMemberEndpoint: # Act & Assert with pytest.raises(HTTPException) as exc_info: await update_org_member( - org_id=org_id, + org_id=uuid.UUID(org_id), user_id=target_user_id, update_data=OrgMemberUpdate(role='admin'), current_user_id=current_user_id, @@ -2549,21 +2702,24 @@ class TestUpdateOrgMemberEndpoint: assert 'Cannot demote the last owner' in exc_info.value.detail @pytest.mark.asyncio - async def test_invalid_org_id_returns_400(self, current_user_id, target_user_id): - """GIVEN invalid org_id UUID WHEN PATCH is called THEN returns 400.""" + async def test_invalid_org_id_returns_422( + self, mock_app, current_user_id, target_user_id + ): + """GIVEN invalid org_id UUID WHEN PATCH is called THEN returns 422. + + Note: FastAPI validates UUID path parameters at the routing level, + so this test uses the HTTP test client to verify the validation. + """ # Arrange invalid_org_id = 'not-a-uuid' - # Act & Assert - with pytest.raises(HTTPException) as exc_info: - await update_org_member( - org_id=invalid_org_id, - user_id=target_user_id, - update_data=OrgMemberUpdate(role='user'), - current_user_id=current_user_id, + # Act & Assert - Use test client to test route-level validation + with TestClient(mock_app) as client: + response = client.patch( + f'/api/organizations/{invalid_org_id}/members/{target_user_id}', + json={'role': 'user'}, ) - assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST - assert 'Invalid organization or user ID format' in exc_info.value.detail + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY class TestGetMeEndpoint: diff --git a/enterprise/tests/unit/test_authorization.py b/enterprise/tests/unit/test_authorization.py new file mode 100644 index 0000000000..237f34c6f3 --- /dev/null +++ b/enterprise/tests/unit/test_authorization.py @@ -0,0 +1,756 @@ +""" +Unit tests for permission-based authorization (authorization.py). + +Tests the FastAPI dependencies that validate user permissions within organizations. +""" + +from unittest.mock import AsyncMock, MagicMock, patch +from uuid import uuid4 + +import pytest +from fastapi import HTTPException +from server.auth.authorization import ( + ROLE_PERMISSIONS, + Permission, + RoleName, + get_role_permissions, + get_user_org_role, + has_permission, + require_permission, +) + +# ============================================================================= +# Tests for Permission enum +# ============================================================================= + + +class TestPermission: + """Tests for Permission enum.""" + + def test_permission_values(self): + """ + GIVEN: Permission enum + WHEN: Accessing permission values + THEN: All expected permissions exist with correct string values + """ + assert Permission.MANAGE_SECRETS.value == 'manage_secrets' + assert Permission.MANAGE_MCP.value == 'manage_mcp' + assert Permission.MANAGE_INTEGRATIONS.value == 'manage_integrations' + assert ( + Permission.MANAGE_APPLICATION_SETTINGS.value + == 'manage_application_settings' + ) + assert Permission.MANAGE_API_KEYS.value == 'manage_api_keys' + assert Permission.VIEW_LLM_SETTINGS.value == 'view_llm_settings' + assert Permission.EDIT_LLM_SETTINGS.value == 'edit_llm_settings' + assert Permission.VIEW_BILLING.value == 'view_billing' + assert Permission.ADD_CREDITS.value == 'add_credits' + assert ( + Permission.INVITE_USER_TO_ORGANIZATION.value + == 'invite_user_to_organization' + ) + assert Permission.CHANGE_USER_ROLE_MEMBER.value == 'change_user_role:member' + assert Permission.CHANGE_USER_ROLE_ADMIN.value == 'change_user_role:admin' + assert Permission.CHANGE_USER_ROLE_OWNER.value == 'change_user_role:owner' + assert Permission.VIEW_ORG_SETTINGS.value == 'view_org_settings' + assert Permission.CHANGE_ORGANIZATION_NAME.value == 'change_organization_name' + assert Permission.DELETE_ORGANIZATION.value == 'delete_organization' + + def test_permission_from_string(self): + """ + GIVEN: Valid permission string + WHEN: Creating Permission from string + THEN: Correct enum value is returned + """ + assert Permission('manage_secrets') == Permission.MANAGE_SECRETS + assert Permission('view_llm_settings') == Permission.VIEW_LLM_SETTINGS + assert Permission('delete_organization') == Permission.DELETE_ORGANIZATION + + def test_permission_invalid_string(self): + """ + GIVEN: Invalid permission string + WHEN: Creating Permission from string + THEN: ValueError is raised + """ + with pytest.raises(ValueError): + Permission('invalid_permission') + + +# ============================================================================= +# Tests for RoleName enum +# ============================================================================= + + +class TestRoleName: + """Tests for RoleName enum.""" + + def test_role_name_values(self): + """ + GIVEN: RoleName enum + WHEN: Accessing role name values + THEN: All expected roles exist with correct string values + """ + assert RoleName.OWNER.value == 'owner' + assert RoleName.ADMIN.value == 'admin' + assert RoleName.MEMBER.value == 'member' + + def test_role_name_from_string(self): + """ + GIVEN: Valid role name string + WHEN: Creating RoleName from string + THEN: Correct enum value is returned + """ + assert RoleName('owner') == RoleName.OWNER + assert RoleName('admin') == RoleName.ADMIN + assert RoleName('member') == RoleName.MEMBER + + def test_role_name_invalid_string(self): + """ + GIVEN: Invalid role name string + WHEN: Creating RoleName from string + THEN: ValueError is raised + """ + with pytest.raises(ValueError): + RoleName('invalid_role') + + +# ============================================================================= +# Tests for ROLE_PERMISSIONS mapping +# ============================================================================= + + +class TestRolePermissions: + """Tests for role permission mappings.""" + + def test_owner_has_all_permissions(self): + """ + GIVEN: ROLE_PERMISSIONS mapping + WHEN: Checking owner permissions + THEN: Owner has all permissions including owner-only permissions + """ + owner_perms = ROLE_PERMISSIONS[RoleName.OWNER] + assert Permission.MANAGE_SECRETS in owner_perms + assert Permission.MANAGE_MCP in owner_perms + assert Permission.VIEW_LLM_SETTINGS in owner_perms + assert Permission.EDIT_LLM_SETTINGS in owner_perms + assert Permission.VIEW_BILLING in owner_perms + assert Permission.ADD_CREDITS in owner_perms + assert Permission.INVITE_USER_TO_ORGANIZATION in owner_perms + assert Permission.CHANGE_USER_ROLE_MEMBER in owner_perms + assert Permission.CHANGE_USER_ROLE_ADMIN in owner_perms + assert Permission.CHANGE_USER_ROLE_OWNER in owner_perms + assert Permission.CHANGE_ORGANIZATION_NAME in owner_perms + assert Permission.DELETE_ORGANIZATION in owner_perms + + def test_admin_has_admin_permissions(self): + """ + GIVEN: ROLE_PERMISSIONS mapping + WHEN: Checking admin permissions + THEN: Admin has admin permissions but not owner-only permissions + """ + admin_perms = ROLE_PERMISSIONS[RoleName.ADMIN] + assert Permission.MANAGE_SECRETS in admin_perms + assert Permission.MANAGE_MCP in admin_perms + assert Permission.VIEW_LLM_SETTINGS in admin_perms + assert Permission.EDIT_LLM_SETTINGS in admin_perms + assert Permission.VIEW_BILLING in admin_perms + assert Permission.ADD_CREDITS in admin_perms + assert Permission.INVITE_USER_TO_ORGANIZATION in admin_perms + assert Permission.CHANGE_USER_ROLE_MEMBER in admin_perms + assert Permission.CHANGE_USER_ROLE_ADMIN in admin_perms + # Admin should NOT have owner-only permissions + assert Permission.CHANGE_USER_ROLE_OWNER not in admin_perms + assert Permission.CHANGE_ORGANIZATION_NAME not in admin_perms + assert Permission.DELETE_ORGANIZATION not in admin_perms + + def test_member_has_limited_permissions(self): + """ + GIVEN: ROLE_PERMISSIONS mapping + WHEN: Checking member permissions + THEN: Member has limited permissions + """ + member_perms = ROLE_PERMISSIONS[RoleName.MEMBER] + # Member has basic settings permissions + assert Permission.MANAGE_SECRETS in member_perms + assert Permission.MANAGE_MCP in member_perms + assert Permission.MANAGE_INTEGRATIONS in member_perms + assert Permission.MANAGE_APPLICATION_SETTINGS in member_perms + assert Permission.MANAGE_API_KEYS in member_perms + assert Permission.VIEW_LLM_SETTINGS in member_perms + assert Permission.VIEW_ORG_SETTINGS in member_perms + # Member should NOT have admin/owner permissions + assert Permission.EDIT_LLM_SETTINGS not in member_perms + assert Permission.VIEW_BILLING not in member_perms + assert Permission.ADD_CREDITS not in member_perms + assert Permission.INVITE_USER_TO_ORGANIZATION not in member_perms + assert Permission.CHANGE_USER_ROLE_MEMBER not in member_perms + assert Permission.CHANGE_USER_ROLE_ADMIN not in member_perms + assert Permission.CHANGE_USER_ROLE_OWNER not in member_perms + assert Permission.CHANGE_ORGANIZATION_NAME not in member_perms + assert Permission.DELETE_ORGANIZATION not in member_perms + + +# ============================================================================= +# Tests for get_role_permissions function +# ============================================================================= + + +class TestGetRolePermissions: + """Tests for get_role_permissions function.""" + + def test_get_owner_permissions(self): + """ + GIVEN: Role name 'owner' + WHEN: get_role_permissions is called + THEN: Owner permissions are returned + """ + perms = get_role_permissions('owner') + assert Permission.DELETE_ORGANIZATION in perms + assert Permission.CHANGE_ORGANIZATION_NAME in perms + + def test_get_admin_permissions(self): + """ + GIVEN: Role name 'admin' + WHEN: get_role_permissions is called + THEN: Admin permissions are returned + """ + perms = get_role_permissions('admin') + assert Permission.EDIT_LLM_SETTINGS in perms + assert Permission.DELETE_ORGANIZATION not in perms + + def test_get_member_permissions(self): + """ + GIVEN: Role name 'member' + WHEN: get_role_permissions is called + THEN: Member permissions are returned + """ + perms = get_role_permissions('member') + assert Permission.VIEW_LLM_SETTINGS in perms + assert Permission.EDIT_LLM_SETTINGS not in perms + + def test_get_invalid_role_permissions(self): + """ + GIVEN: Invalid role name + WHEN: get_role_permissions is called + THEN: Empty frozenset is returned + """ + perms = get_role_permissions('invalid_role') + assert perms == frozenset() + + +# ============================================================================= +# Tests for has_permission function +# ============================================================================= + + +class TestHasPermission: + """Tests for has_permission function.""" + + def test_owner_has_delete_organization_permission(self): + """ + GIVEN: User with owner role + WHEN: Checking for DELETE_ORGANIZATION permission + THEN: Returns True + """ + mock_role = MagicMock() + mock_role.name = 'owner' + assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is True + + def test_owner_has_view_llm_settings_permission(self): + """ + GIVEN: User with owner role + WHEN: Checking for VIEW_LLM_SETTINGS permission + THEN: Returns True + """ + mock_role = MagicMock() + mock_role.name = 'owner' + assert has_permission(mock_role, Permission.VIEW_LLM_SETTINGS) is True + + def test_admin_has_edit_llm_settings_permission(self): + """ + GIVEN: User with admin role + WHEN: Checking for EDIT_LLM_SETTINGS permission + THEN: Returns True + """ + mock_role = MagicMock() + mock_role.name = 'admin' + assert has_permission(mock_role, Permission.EDIT_LLM_SETTINGS) is True + + def test_admin_lacks_delete_organization_permission(self): + """ + GIVEN: User with admin role + WHEN: Checking for DELETE_ORGANIZATION permission + THEN: Returns False + """ + mock_role = MagicMock() + mock_role.name = 'admin' + assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is False + + def test_member_has_view_llm_settings_permission(self): + """ + GIVEN: User with member role + WHEN: Checking for VIEW_LLM_SETTINGS permission + THEN: Returns True + """ + mock_role = MagicMock() + mock_role.name = 'member' + assert has_permission(mock_role, Permission.VIEW_LLM_SETTINGS) is True + + def test_member_lacks_edit_llm_settings_permission(self): + """ + GIVEN: User with member role + WHEN: Checking for EDIT_LLM_SETTINGS permission + THEN: Returns False + """ + mock_role = MagicMock() + mock_role.name = 'member' + assert has_permission(mock_role, Permission.EDIT_LLM_SETTINGS) is False + + def test_member_lacks_delete_organization_permission(self): + """ + GIVEN: User with member role + WHEN: Checking for DELETE_ORGANIZATION permission + THEN: Returns False + """ + mock_role = MagicMock() + mock_role.name = 'member' + assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is False + + def test_invalid_role_has_no_permissions(self): + """ + GIVEN: User with invalid role + WHEN: Checking for any permission + THEN: Returns False + """ + mock_role = MagicMock() + mock_role.name = 'invalid_role' + assert has_permission(mock_role, Permission.VIEW_LLM_SETTINGS) is False + assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is False + + +# ============================================================================= +# Tests for get_user_org_role function +# ============================================================================= + + +class TestGetUserOrgRole: + """Tests for get_user_org_role function.""" + + def test_returns_role_when_member_exists(self): + """ + GIVEN: User is a member of organization with role + WHEN: get_user_org_role is called + THEN: Role object is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_org_member = MagicMock() + mock_org_member.role_id = 1 + + mock_role = MagicMock() + mock_role.name = 'admin' + + with ( + patch( + 'server.auth.authorization.OrgMemberStore.get_org_member', + return_value=mock_org_member, + ), + patch( + 'server.auth.authorization.RoleStore.get_role_by_id', + return_value=mock_role, + ), + ): + result = get_user_org_role(user_id, org_id) + assert result == mock_role + + def test_returns_none_when_not_member(self): + """ + GIVEN: User is not a member of organization + WHEN: get_user_org_role is called + THEN: None is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + with patch( + 'server.auth.authorization.OrgMemberStore.get_org_member', + return_value=None, + ): + result = get_user_org_role(user_id, org_id) + assert result is None + + def test_returns_role_when_org_id_is_none(self): + """ + GIVEN: User with a current organization + WHEN: get_user_org_role is called with org_id=None + THEN: Role object is returned using get_org_member_for_current_org + """ + user_id = str(uuid4()) + + mock_org_member = MagicMock() + mock_org_member.role_id = 1 + + mock_role = MagicMock() + mock_role.name = 'admin' + + with ( + patch( + 'server.auth.authorization.OrgMemberStore.get_org_member_for_current_org', + return_value=mock_org_member, + ) as mock_get_current, + patch( + 'server.auth.authorization.OrgMemberStore.get_org_member', + ) as mock_get_org_member, + patch( + 'server.auth.authorization.RoleStore.get_role_by_id', + return_value=mock_role, + ), + ): + result = get_user_org_role(user_id, None) + assert result == mock_role + mock_get_current.assert_called_once() + mock_get_org_member.assert_not_called() + + def test_returns_none_when_org_id_is_none_and_no_current_org(self): + """ + GIVEN: User with no current organization membership + WHEN: get_user_org_role is called with org_id=None + THEN: None is returned + """ + user_id = str(uuid4()) + + with patch( + 'server.auth.authorization.OrgMemberStore.get_org_member_for_current_org', + return_value=None, + ): + result = get_user_org_role(user_id, None) + assert result is None + + +# ============================================================================= +# Tests for require_permission dependency +# ============================================================================= + + +class TestRequirePermission: + """Tests for require_permission dependency factory.""" + + @pytest.mark.asyncio + async def test_returns_user_id_when_authorized(self): + """ + GIVEN: User with required permission + WHEN: Permission checker is called + THEN: User ID is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'admin' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS) + result = await permission_checker(org_id=org_id, user_id=user_id) + assert result == user_id + + @pytest.mark.asyncio + async def test_raises_401_when_not_authenticated(self): + """ + GIVEN: No user ID (not authenticated) + WHEN: Permission checker is called + THEN: 401 Unauthorized is raised + """ + org_id = uuid4() + + permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=org_id, user_id=None) + + assert exc_info.value.status_code == 401 + assert 'not authenticated' in exc_info.value.detail.lower() + + @pytest.mark.asyncio + async def test_raises_403_when_not_member(self): + """ + GIVEN: User is not a member of organization + WHEN: Permission checker is called + THEN: 403 Forbidden is raised + """ + user_id = str(uuid4()) + org_id = uuid4() + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=None), + ): + permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=org_id, user_id=user_id) + + assert exc_info.value.status_code == 403 + assert 'not a member' in exc_info.value.detail.lower() + + @pytest.mark.asyncio + async def test_raises_403_when_insufficient_permission(self): + """ + GIVEN: User without required permission + WHEN: Permission checker is called + THEN: 403 Forbidden is raised + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'member' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.DELETE_ORGANIZATION) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=org_id, user_id=user_id) + + assert exc_info.value.status_code == 403 + assert 'delete_organization' in exc_info.value.detail.lower() + + @pytest.mark.asyncio + async def test_owner_can_delete_organization(self): + """ + GIVEN: User with owner role + WHEN: DELETE_ORGANIZATION permission is required + THEN: User ID is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'owner' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.DELETE_ORGANIZATION) + result = await permission_checker(org_id=org_id, user_id=user_id) + assert result == user_id + + @pytest.mark.asyncio + async def test_admin_cannot_delete_organization(self): + """ + GIVEN: User with admin role + WHEN: DELETE_ORGANIZATION permission is required + THEN: 403 Forbidden is raised + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'admin' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.DELETE_ORGANIZATION) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=org_id, user_id=user_id) + + assert exc_info.value.status_code == 403 + + @pytest.mark.asyncio + async def test_logs_warning_on_insufficient_permission(self): + """ + GIVEN: User without required permission + WHEN: Permission checker is called + THEN: Warning is logged with details + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'member' + + with ( + patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ), + patch('server.auth.authorization.logger') as mock_logger, + ): + permission_checker = require_permission(Permission.DELETE_ORGANIZATION) + with pytest.raises(HTTPException): + await permission_checker(org_id=org_id, user_id=user_id) + + mock_logger.warning.assert_called() + call_args = mock_logger.warning.call_args + assert call_args[1]['extra']['user_id'] == user_id + assert call_args[1]['extra']['user_role'] == 'member' + assert call_args[1]['extra']['required_permission'] == 'delete_organization' + + @pytest.mark.asyncio + async def test_returns_user_id_when_org_id_is_none(self): + """ + GIVEN: User with required permission in their current org + WHEN: Permission checker is called with org_id=None + THEN: User ID is returned + """ + user_id = str(uuid4()) + + mock_role = MagicMock() + mock_role.name = 'admin' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ) as mock_get_role: + permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS) + result = await permission_checker(org_id=None, user_id=user_id) + assert result == user_id + mock_get_role.assert_called_once_with(user_id, None) + + @pytest.mark.asyncio + async def test_raises_403_when_org_id_is_none_and_not_member(self): + """ + GIVEN: User not a member of their current organization + WHEN: Permission checker is called with org_id=None + THEN: HTTPException with 403 status is raised + """ + user_id = str(uuid4()) + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=None), + ): + permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=None, user_id=user_id) + + assert exc_info.value.status_code == 403 + assert 'not a member' in exc_info.value.detail + + +# ============================================================================= +# Tests for permission-based access control scenarios +# ============================================================================= + + +class TestPermissionScenarios: + """Tests for real-world permission scenarios.""" + + @pytest.mark.asyncio + async def test_member_can_manage_secrets(self): + """ + GIVEN: User with member role + WHEN: MANAGE_SECRETS permission is required + THEN: User ID is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'member' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.MANAGE_SECRETS) + result = await permission_checker(org_id=org_id, user_id=user_id) + assert result == user_id + + @pytest.mark.asyncio + async def test_member_cannot_invite_users(self): + """ + GIVEN: User with member role + WHEN: INVITE_USER_TO_ORGANIZATION permission is required + THEN: 403 Forbidden is raised + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'member' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission( + Permission.INVITE_USER_TO_ORGANIZATION + ) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=org_id, user_id=user_id) + + assert exc_info.value.status_code == 403 + + @pytest.mark.asyncio + async def test_admin_can_invite_users(self): + """ + GIVEN: User with admin role + WHEN: INVITE_USER_TO_ORGANIZATION permission is required + THEN: User ID is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'admin' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission( + Permission.INVITE_USER_TO_ORGANIZATION + ) + result = await permission_checker(org_id=org_id, user_id=user_id) + assert result == user_id + + @pytest.mark.asyncio + async def test_admin_cannot_change_owner_role(self): + """ + GIVEN: User with admin role + WHEN: CHANGE_USER_ROLE_OWNER permission is required + THEN: 403 Forbidden is raised + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'admin' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.CHANGE_USER_ROLE_OWNER) + with pytest.raises(HTTPException) as exc_info: + await permission_checker(org_id=org_id, user_id=user_id) + + assert exc_info.value.status_code == 403 + + @pytest.mark.asyncio + async def test_owner_can_change_owner_role(self): + """ + GIVEN: User with owner role + WHEN: CHANGE_USER_ROLE_OWNER permission is required + THEN: User ID is returned + """ + user_id = str(uuid4()) + org_id = uuid4() + + mock_role = MagicMock() + mock_role.name = 'owner' + + with patch( + 'server.auth.authorization.get_user_org_role_async', + AsyncMock(return_value=mock_role), + ): + permission_checker = require_permission(Permission.CHANGE_USER_ROLE_OWNER) + result = await permission_checker(org_id=org_id, user_id=user_id) + assert result == user_id diff --git a/enterprise/tests/unit/test_org_member_store.py b/enterprise/tests/unit/test_org_member_store.py index 1d14673ceb..1bf12bfa2d 100644 --- a/enterprise/tests/unit/test_org_member_store.py +++ b/enterprise/tests/unit/test_org_member_store.py @@ -158,6 +158,57 @@ def test_get_org_member(session_maker): assert retrieved_org_member.llm_api_key.get_secret_value() == 'test-key' +def test_get_org_member_for_current_org(session_maker): + # Test getting org_member for user's current organization + with session_maker() as session: + # Create test data - user belongs to two orgs but current_org is org1 + org1 = Org(name='test-org-1') + org2 = Org(name='test-org-2') + session.add_all([org1, org2]) + session.flush() + + user = User(id=uuid.uuid4(), current_org_id=org1.id) + role = Role(name='admin', rank=1) + session.add_all([user, role]) + session.flush() + + org_member1 = OrgMember( + org_id=org1.id, + user_id=user.id, + role_id=role.id, + llm_api_key='test-key-1', + status='active', + ) + org_member2 = OrgMember( + org_id=org2.id, + user_id=user.id, + role_id=role.id, + llm_api_key='test-key-2', + status='active', + ) + session.add_all([org_member1, org_member2]) + session.commit() + user_id = user.id + org1_id = org1.id + + # Test retrieval - should return org_member for current_org (org1) + with patch('storage.org_member_store.session_maker', session_maker): + retrieved_org_member = OrgMemberStore.get_org_member_for_current_org(user_id) + assert retrieved_org_member is not None + assert retrieved_org_member.org_id == org1_id + assert retrieved_org_member.user_id == user_id + assert retrieved_org_member.llm_api_key.get_secret_value() == 'test-key-1' + + +def test_get_org_member_for_current_org_user_not_found(session_maker): + # Test getting org_member for non-existent user + with patch('storage.org_member_store.session_maker', session_maker): + retrieved_org_member = OrgMemberStore.get_org_member_for_current_org( + uuid.uuid4() + ) + assert retrieved_org_member is None + + def test_add_user_to_org(session_maker): # Test adding a user to an org with session_maker() as session: