mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 05:37:20 +08:00
Feature/permission based authorization (#12906)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
306
enterprise/server/auth/authorization.py
Normal file
306
enterprise/server/auth/authorization.py
Normal file
@@ -0,0 +1,306 @@
|
||||
"""
|
||||
Permission-based authorization dependencies for API endpoints.
|
||||
|
||||
This module provides FastAPI dependencies for checking user permissions
|
||||
within organizations. It uses a permission-based authorization model where
|
||||
roles (owner, admin, member) are mapped to specific permissions.
|
||||
|
||||
Permissions are defined in the Permission enum and mapped to roles via
|
||||
ROLE_PERMISSIONS. This allows fine-grained access control while maintaining
|
||||
the familiar role-based hierarchy.
|
||||
|
||||
Usage:
|
||||
from server.auth.authorization import (
|
||||
Permission,
|
||||
require_permission,
|
||||
)
|
||||
|
||||
@router.get('/{org_id}/settings')
|
||||
async def get_settings(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)),
|
||||
):
|
||||
# Only users with VIEW_LLM_SETTINGS permission can access
|
||||
...
|
||||
|
||||
@router.patch('/{org_id}/settings')
|
||||
async def update_settings(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(require_permission(Permission.EDIT_LLM_SETTINGS)),
|
||||
):
|
||||
# Only users with EDIT_LLM_SETTINGS permission can access
|
||||
...
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from storage.org_member_store import OrgMemberStore
|
||||
from storage.role import Role
|
||||
from storage.role_store import RoleStore
|
||||
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.server.user_auth import get_user_id
|
||||
|
||||
|
||||
class Permission(str, Enum):
|
||||
"""Permissions that can be assigned to roles."""
|
||||
|
||||
# Secrets
|
||||
MANAGE_SECRETS = 'manage_secrets'
|
||||
|
||||
# MCP
|
||||
MANAGE_MCP = 'manage_mcp'
|
||||
|
||||
# Integrations
|
||||
MANAGE_INTEGRATIONS = 'manage_integrations'
|
||||
|
||||
# Application Settings
|
||||
MANAGE_APPLICATION_SETTINGS = 'manage_application_settings'
|
||||
|
||||
# API Keys
|
||||
MANAGE_API_KEYS = 'manage_api_keys'
|
||||
|
||||
# LLM Settings
|
||||
VIEW_LLM_SETTINGS = 'view_llm_settings'
|
||||
EDIT_LLM_SETTINGS = 'edit_llm_settings'
|
||||
|
||||
# Billing
|
||||
VIEW_BILLING = 'view_billing'
|
||||
ADD_CREDITS = 'add_credits'
|
||||
|
||||
# Organization Members
|
||||
INVITE_USER_TO_ORGANIZATION = 'invite_user_to_organization'
|
||||
CHANGE_USER_ROLE_MEMBER = 'change_user_role:member'
|
||||
CHANGE_USER_ROLE_ADMIN = 'change_user_role:admin'
|
||||
CHANGE_USER_ROLE_OWNER = 'change_user_role:owner'
|
||||
|
||||
# Organization Management
|
||||
VIEW_ORG_SETTINGS = 'view_org_settings'
|
||||
CHANGE_ORGANIZATION_NAME = 'change_organization_name'
|
||||
DELETE_ORGANIZATION = 'delete_organization'
|
||||
|
||||
# Temporary permissions until we finish the API updates.
|
||||
EDIT_ORG_SETTINGS = 'edit_org_settings'
|
||||
|
||||
|
||||
class RoleName(str, Enum):
|
||||
"""Role names used in the system."""
|
||||
|
||||
OWNER = 'owner'
|
||||
ADMIN = 'admin'
|
||||
MEMBER = 'member'
|
||||
|
||||
|
||||
# Permission mappings for each role
|
||||
ROLE_PERMISSIONS: dict[RoleName, frozenset[Permission]] = {
|
||||
RoleName.OWNER: frozenset(
|
||||
[
|
||||
# Settings (Full access)
|
||||
Permission.MANAGE_SECRETS,
|
||||
Permission.MANAGE_MCP,
|
||||
Permission.MANAGE_INTEGRATIONS,
|
||||
Permission.MANAGE_APPLICATION_SETTINGS,
|
||||
Permission.MANAGE_API_KEYS,
|
||||
Permission.VIEW_LLM_SETTINGS,
|
||||
Permission.EDIT_LLM_SETTINGS,
|
||||
Permission.VIEW_BILLING,
|
||||
Permission.ADD_CREDITS,
|
||||
# Organization Members
|
||||
Permission.INVITE_USER_TO_ORGANIZATION,
|
||||
Permission.CHANGE_USER_ROLE_MEMBER,
|
||||
Permission.CHANGE_USER_ROLE_ADMIN,
|
||||
Permission.CHANGE_USER_ROLE_OWNER,
|
||||
# Organization Management
|
||||
Permission.VIEW_ORG_SETTINGS,
|
||||
Permission.EDIT_ORG_SETTINGS,
|
||||
# Organization Management (Owner only)
|
||||
Permission.CHANGE_ORGANIZATION_NAME,
|
||||
Permission.DELETE_ORGANIZATION,
|
||||
]
|
||||
),
|
||||
RoleName.ADMIN: frozenset(
|
||||
[
|
||||
# Settings (Full access)
|
||||
Permission.MANAGE_SECRETS,
|
||||
Permission.MANAGE_MCP,
|
||||
Permission.MANAGE_INTEGRATIONS,
|
||||
Permission.MANAGE_APPLICATION_SETTINGS,
|
||||
Permission.MANAGE_API_KEYS,
|
||||
Permission.VIEW_LLM_SETTINGS,
|
||||
Permission.EDIT_LLM_SETTINGS,
|
||||
Permission.VIEW_BILLING,
|
||||
Permission.ADD_CREDITS,
|
||||
# Organization Members
|
||||
Permission.INVITE_USER_TO_ORGANIZATION,
|
||||
Permission.CHANGE_USER_ROLE_MEMBER,
|
||||
Permission.CHANGE_USER_ROLE_ADMIN,
|
||||
# Organization Management
|
||||
Permission.VIEW_ORG_SETTINGS,
|
||||
Permission.EDIT_ORG_SETTINGS,
|
||||
]
|
||||
),
|
||||
RoleName.MEMBER: frozenset(
|
||||
[
|
||||
# Settings (Full access)
|
||||
Permission.MANAGE_SECRETS,
|
||||
Permission.MANAGE_MCP,
|
||||
Permission.MANAGE_INTEGRATIONS,
|
||||
Permission.MANAGE_APPLICATION_SETTINGS,
|
||||
Permission.MANAGE_API_KEYS,
|
||||
# Settings (View only)
|
||||
Permission.VIEW_ORG_SETTINGS,
|
||||
Permission.VIEW_LLM_SETTINGS,
|
||||
]
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def get_user_org_role(user_id: str, org_id: UUID | None) -> Role | None:
|
||||
"""
|
||||
Get the user's role in an organization (synchronous version).
|
||||
|
||||
Args:
|
||||
user_id: User ID (string that will be converted to UUID)
|
||||
org_id: Organization ID, or None to use the user's current organization
|
||||
|
||||
Returns:
|
||||
Role object if user is a member, None otherwise
|
||||
"""
|
||||
from uuid import UUID as parse_uuid
|
||||
|
||||
if org_id is None:
|
||||
org_member = OrgMemberStore.get_org_member_for_current_org(parse_uuid(user_id))
|
||||
else:
|
||||
org_member = OrgMemberStore.get_org_member(org_id, parse_uuid(user_id))
|
||||
if not org_member:
|
||||
return None
|
||||
|
||||
return RoleStore.get_role_by_id(org_member.role_id)
|
||||
|
||||
|
||||
async def get_user_org_role_async(user_id: str, org_id: UUID | None) -> Role | None:
|
||||
"""
|
||||
Get the user's role in an organization (async version).
|
||||
|
||||
Args:
|
||||
user_id: User ID (string that will be converted to UUID)
|
||||
org_id: Organization ID, or None to use the user's current organization
|
||||
|
||||
Returns:
|
||||
Role object if user is a member, None otherwise
|
||||
"""
|
||||
from uuid import UUID as parse_uuid
|
||||
|
||||
if org_id is None:
|
||||
org_member = await OrgMemberStore.get_org_member_for_current_org_async(
|
||||
parse_uuid(user_id)
|
||||
)
|
||||
else:
|
||||
org_member = await OrgMemberStore.get_org_member_async(
|
||||
org_id, parse_uuid(user_id)
|
||||
)
|
||||
if not org_member:
|
||||
return None
|
||||
|
||||
return await RoleStore.get_role_by_id_async(org_member.role_id)
|
||||
|
||||
|
||||
def get_role_permissions(role_name: str) -> frozenset[Permission]:
|
||||
"""
|
||||
Get the permissions for a role.
|
||||
|
||||
Args:
|
||||
role_name: Name of the role
|
||||
|
||||
Returns:
|
||||
Set of permissions for the role
|
||||
"""
|
||||
try:
|
||||
role_enum = RoleName(role_name)
|
||||
return ROLE_PERMISSIONS.get(role_enum, frozenset())
|
||||
except ValueError:
|
||||
return frozenset()
|
||||
|
||||
|
||||
def has_permission(user_role: Role, permission: Permission) -> bool:
|
||||
"""
|
||||
Check if a role has a specific permission.
|
||||
|
||||
Args:
|
||||
user_role: User's Role object
|
||||
permission: Permission to check
|
||||
|
||||
Returns:
|
||||
True if the role has the permission
|
||||
"""
|
||||
permissions = get_role_permissions(user_role.name)
|
||||
return permission in permissions
|
||||
|
||||
|
||||
def require_permission(permission: Permission):
|
||||
"""
|
||||
Factory function that creates a dependency to require a specific permission.
|
||||
|
||||
This creates a FastAPI dependency that:
|
||||
1. Extracts org_id from the path parameter
|
||||
2. Gets the authenticated user_id
|
||||
3. Checks if the user has the required permission in the organization
|
||||
4. Returns the user_id if authorized, raises HTTPException otherwise
|
||||
|
||||
Usage:
|
||||
@router.get('/{org_id}/settings')
|
||||
async def get_settings(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(require_permission(Permission.VIEW_LLM_SETTINGS)),
|
||||
):
|
||||
...
|
||||
|
||||
Args:
|
||||
permission: The permission required to access the endpoint
|
||||
|
||||
Returns:
|
||||
Dependency function that validates permission and returns user_id
|
||||
"""
|
||||
|
||||
async def permission_checker(
|
||||
org_id: UUID | None = None,
|
||||
user_id: str | None = Depends(get_user_id),
|
||||
) -> str:
|
||||
if not user_id:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail='User not authenticated',
|
||||
)
|
||||
|
||||
user_role = await get_user_org_role_async(user_id, org_id)
|
||||
|
||||
if not user_role:
|
||||
logger.warning(
|
||||
'User not a member of organization',
|
||||
extra={'user_id': user_id, 'org_id': str(org_id)},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail='User is not a member of this organization',
|
||||
)
|
||||
|
||||
if not has_permission(user_role, permission):
|
||||
logger.warning(
|
||||
'Insufficient permissions',
|
||||
extra={
|
||||
'user_id': user_id,
|
||||
'org_id': str(org_id),
|
||||
'user_role': user_role.name,
|
||||
'required_permission': permission.value,
|
||||
},
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f'Requires {permission.value} permission',
|
||||
)
|
||||
|
||||
return user_id
|
||||
|
||||
return permission_checker
|
||||
@@ -2,6 +2,10 @@ from typing import Annotated
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||
from server.auth.authorization import (
|
||||
Permission,
|
||||
require_permission,
|
||||
)
|
||||
from server.email_validation import get_admin_user_id
|
||||
from server.routes.org_models import (
|
||||
CannotModifySelfError,
|
||||
@@ -189,23 +193,26 @@ async def create_org(
|
||||
@org_router.get('/{org_id}', response_model=OrgResponse, status_code=status.HTTP_200_OK)
|
||||
async def get_org(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_permission(Permission.VIEW_ORG_SETTINGS)),
|
||||
) -> OrgResponse:
|
||||
"""Get organization details by ID.
|
||||
|
||||
This endpoint allows authenticated users who are members of an organization
|
||||
to retrieve its details. Only members of the organization can access this endpoint.
|
||||
This endpoint retrieves details for a specific organization. Access requires
|
||||
the VIEW_ORG_SETTINGS permission, which is granted to all organization members
|
||||
(member, admin, and owner roles).
|
||||
|
||||
Args:
|
||||
org_id: Organization ID (UUID)
|
||||
user_id: Authenticated user ID (injected by dependency)
|
||||
user_id: Authenticated user ID (injected by require_permission dependency)
|
||||
|
||||
Returns:
|
||||
OrgResponse: The organization details
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user lacks VIEW_ORG_SETTINGS permission
|
||||
HTTPException: 404 if organization not found
|
||||
HTTPException: 422 if org_id is not a valid UUID (handled by FastAPI)
|
||||
HTTPException: 404 if organization not found or user is not a member
|
||||
HTTPException: 500 if retrieval fails
|
||||
"""
|
||||
logger.info(
|
||||
@@ -305,23 +312,24 @@ async def get_me(
|
||||
@org_router.delete('/{org_id}', status_code=status.HTTP_200_OK)
|
||||
async def delete_org(
|
||||
org_id: UUID,
|
||||
user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_permission(Permission.DELETE_ORGANIZATION)),
|
||||
) -> dict:
|
||||
"""Delete an organization.
|
||||
|
||||
This endpoint allows authenticated organization owners to delete their organization.
|
||||
All associated data including organization members, conversations, billing data,
|
||||
and external LiteLLM team resources will be permanently removed.
|
||||
This endpoint permanently deletes an organization and all associated data including
|
||||
organization members, conversations, billing data, and external LiteLLM team resources.
|
||||
Access requires the DELETE_ORGANIZATION permission, which is granted only to owners.
|
||||
|
||||
Args:
|
||||
org_id: Organization ID to delete
|
||||
user_id: Authenticated user ID (injected by dependency)
|
||||
org_id: Organization ID to delete (UUID)
|
||||
user_id: Authenticated user ID (injected by require_permission dependency)
|
||||
|
||||
Returns:
|
||||
dict: Confirmation message with deleted organization details
|
||||
|
||||
Raises:
|
||||
HTTPException: 403 if user is not the organization owner
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user lacks DELETE_ORGANIZATION permission
|
||||
HTTPException: 404 if organization not found
|
||||
HTTPException: 500 if deletion fails
|
||||
"""
|
||||
@@ -414,25 +422,26 @@ async def delete_org(
|
||||
async def update_org(
|
||||
org_id: UUID,
|
||||
update_data: OrgUpdate,
|
||||
user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_permission(Permission.EDIT_ORG_SETTINGS)),
|
||||
) -> OrgResponse:
|
||||
"""Update an existing organization.
|
||||
|
||||
This endpoint allows authenticated users to update organization settings.
|
||||
LLM-related settings require admin or owner role in the organization.
|
||||
This endpoint updates organization settings. Access requires the EDIT_ORG_SETTINGS
|
||||
permission, which is granted to admin and owner roles.
|
||||
|
||||
Args:
|
||||
org_id: Organization ID to update (UUID validated by FastAPI)
|
||||
org_id: Organization ID to update (UUID)
|
||||
update_data: Organization update data
|
||||
user_id: Authenticated user ID (injected by dependency)
|
||||
user_id: Authenticated user ID (injected by require_permission dependency)
|
||||
|
||||
Returns:
|
||||
OrgResponse: The updated organization details
|
||||
|
||||
Raises:
|
||||
HTTPException: 400 if org_id is invalid UUID format (handled by FastAPI)
|
||||
HTTPException: 403 if user lacks permission for LLM settings
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user lacks EDIT_ORG_SETTINGS permission
|
||||
HTTPException: 404 if organization not found
|
||||
HTTPException: 409 if organization name already exists
|
||||
HTTPException: 422 if validation errors occur (handled by FastAPI)
|
||||
HTTPException: 500 if update fails
|
||||
"""
|
||||
@@ -496,7 +505,7 @@ async def update_org(
|
||||
|
||||
@org_router.get('/{org_id}/members')
|
||||
async def get_org_members(
|
||||
org_id: str,
|
||||
org_id: UUID,
|
||||
page_id: Annotated[
|
||||
str | None,
|
||||
Query(title='Optional next_page_id from the previously returned page'),
|
||||
@@ -509,13 +518,33 @@ async def get_org_members(
|
||||
lte=100,
|
||||
),
|
||||
] = 100,
|
||||
current_user_id: str = Depends(get_user_id),
|
||||
user_id: str = Depends(require_permission(Permission.VIEW_ORG_SETTINGS)),
|
||||
) -> OrgMemberPage:
|
||||
"""Get all members of an organization with cursor-based pagination."""
|
||||
"""Get all members of an organization with cursor-based pagination.
|
||||
|
||||
This endpoint retrieves a paginated list of organization members. Access requires
|
||||
the VIEW_ORG_SETTINGS permission, which is granted to all organization members
|
||||
(member, admin, and owner roles).
|
||||
|
||||
Args:
|
||||
org_id: Organization ID (UUID)
|
||||
page_id: Optional page ID (offset) for pagination
|
||||
limit: Maximum number of members to return (1-100, default 100)
|
||||
user_id: Authenticated user ID (injected by require_permission dependency)
|
||||
|
||||
Returns:
|
||||
OrgMemberPage: Paginated list of organization members
|
||||
|
||||
Raises:
|
||||
HTTPException: 401 if user is not authenticated
|
||||
HTTPException: 403 if user lacks VIEW_ORG_SETTINGS permission
|
||||
HTTPException: 400 if org_id or page_id format is invalid
|
||||
HTTPException: 500 if retrieval fails
|
||||
"""
|
||||
try:
|
||||
success, error_code, data = await OrgMemberService.get_org_members(
|
||||
org_id=UUID(org_id),
|
||||
current_user_id=UUID(current_user_id),
|
||||
org_id=org_id,
|
||||
current_user_id=UUID(user_id),
|
||||
page_id=page_id,
|
||||
limit=limit,
|
||||
)
|
||||
@@ -562,7 +591,7 @@ async def get_org_members(
|
||||
|
||||
@org_router.delete('/{org_id}/members/{user_id}')
|
||||
async def remove_org_member(
|
||||
org_id: str,
|
||||
org_id: UUID,
|
||||
user_id: str,
|
||||
current_user_id: str = Depends(get_user_id),
|
||||
):
|
||||
@@ -576,7 +605,7 @@ async def remove_org_member(
|
||||
"""
|
||||
try:
|
||||
success, error = await OrgMemberService.remove_org_member(
|
||||
org_id=UUID(org_id),
|
||||
org_id=org_id,
|
||||
target_user_id=UUID(user_id),
|
||||
current_user_id=UUID(current_user_id),
|
||||
)
|
||||
@@ -708,7 +737,7 @@ async def switch_org(
|
||||
|
||||
@org_router.patch('/{org_id}/members/{user_id}', response_model=OrgMemberResponse)
|
||||
async def update_org_member(
|
||||
org_id: str,
|
||||
org_id: UUID,
|
||||
user_id: str,
|
||||
update_data: OrgMemberUpdate,
|
||||
current_user_id: str = Depends(get_user_id),
|
||||
@@ -725,7 +754,7 @@ async def update_org_member(
|
||||
"""
|
||||
try:
|
||||
return await OrgMemberService.update_org_member(
|
||||
org_id=UUID(org_id),
|
||||
org_id=org_id,
|
||||
target_user_id=UUID(user_id),
|
||||
current_user_id=UUID(current_user_id),
|
||||
update_data=update_data,
|
||||
|
||||
@@ -9,6 +9,7 @@ from sqlalchemy import select
|
||||
from sqlalchemy.orm import joinedload
|
||||
from storage.database import a_session_maker, session_maker
|
||||
from storage.org_member import OrgMember
|
||||
from storage.user import User
|
||||
from storage.user_settings import UserSettings
|
||||
|
||||
from openhands.storage.data_models.settings import Settings
|
||||
@@ -60,6 +61,51 @@ class OrgMemberStore:
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
||||
@staticmethod
|
||||
def get_org_member_for_current_org(user_id: UUID) -> Optional[OrgMember]:
|
||||
"""Get the org member for a user's current organization.
|
||||
|
||||
Args:
|
||||
user_id: The user's UUID.
|
||||
|
||||
Returns:
|
||||
The OrgMember for the user's current organization, or None if not found.
|
||||
"""
|
||||
with session_maker() as session:
|
||||
result = (
|
||||
session.query(OrgMember)
|
||||
.join(User, User.id == OrgMember.user_id)
|
||||
.filter(
|
||||
User.id == user_id,
|
||||
OrgMember.org_id == User.current_org_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
async def get_org_member_for_current_org_async(
|
||||
user_id: UUID,
|
||||
) -> Optional[OrgMember]:
|
||||
"""Get the org member for a user's current organization (async version).
|
||||
|
||||
Args:
|
||||
user_id: The user's UUID.
|
||||
|
||||
Returns:
|
||||
The OrgMember for the user's current organization, or None if not found.
|
||||
"""
|
||||
async with a_session_maker() as session:
|
||||
result = await session.execute(
|
||||
select(OrgMember)
|
||||
.join(User, User.id == OrgMember.user_id)
|
||||
.filter(
|
||||
User.id == user_id,
|
||||
OrgMember.org_id == User.current_org_id,
|
||||
)
|
||||
)
|
||||
return result.scalars().first()
|
||||
|
||||
@staticmethod
|
||||
def get_user_orgs(user_id: UUID) -> list[OrgMember]:
|
||||
"""Get all organizations for a user."""
|
||||
|
||||
@@ -29,6 +29,20 @@ class RoleStore:
|
||||
with session_maker() as session:
|
||||
return session.query(Role).filter(Role.id == role_id).first()
|
||||
|
||||
@staticmethod
|
||||
async def get_role_by_id_async(
|
||||
role_id: int,
|
||||
session: Optional[AsyncSession] = None,
|
||||
) -> Optional[Role]:
|
||||
"""Get role by ID (async version)."""
|
||||
if session is not None:
|
||||
result = await session.execute(select(Role).where(Role.id == role_id))
|
||||
return result.scalars().first()
|
||||
|
||||
async with a_session_maker() as session:
|
||||
result = await session.execute(select(Role).where(Role.id == role_id))
|
||||
return result.scalars().first()
|
||||
|
||||
@staticmethod
|
||||
def get_role_by_name(name: str) -> Optional[Role]:
|
||||
"""Get role by name."""
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
756
enterprise/tests/unit/test_authorization.py
Normal file
756
enterprise/tests/unit/test_authorization.py
Normal file
@@ -0,0 +1,756 @@
|
||||
"""
|
||||
Unit tests for permission-based authorization (authorization.py).
|
||||
|
||||
Tests the FastAPI dependencies that validate user permissions within organizations.
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from server.auth.authorization import (
|
||||
ROLE_PERMISSIONS,
|
||||
Permission,
|
||||
RoleName,
|
||||
get_role_permissions,
|
||||
get_user_org_role,
|
||||
has_permission,
|
||||
require_permission,
|
||||
)
|
||||
|
||||
# =============================================================================
|
||||
# Tests for Permission enum
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestPermission:
|
||||
"""Tests for Permission enum."""
|
||||
|
||||
def test_permission_values(self):
|
||||
"""
|
||||
GIVEN: Permission enum
|
||||
WHEN: Accessing permission values
|
||||
THEN: All expected permissions exist with correct string values
|
||||
"""
|
||||
assert Permission.MANAGE_SECRETS.value == 'manage_secrets'
|
||||
assert Permission.MANAGE_MCP.value == 'manage_mcp'
|
||||
assert Permission.MANAGE_INTEGRATIONS.value == 'manage_integrations'
|
||||
assert (
|
||||
Permission.MANAGE_APPLICATION_SETTINGS.value
|
||||
== 'manage_application_settings'
|
||||
)
|
||||
assert Permission.MANAGE_API_KEYS.value == 'manage_api_keys'
|
||||
assert Permission.VIEW_LLM_SETTINGS.value == 'view_llm_settings'
|
||||
assert Permission.EDIT_LLM_SETTINGS.value == 'edit_llm_settings'
|
||||
assert Permission.VIEW_BILLING.value == 'view_billing'
|
||||
assert Permission.ADD_CREDITS.value == 'add_credits'
|
||||
assert (
|
||||
Permission.INVITE_USER_TO_ORGANIZATION.value
|
||||
== 'invite_user_to_organization'
|
||||
)
|
||||
assert Permission.CHANGE_USER_ROLE_MEMBER.value == 'change_user_role:member'
|
||||
assert Permission.CHANGE_USER_ROLE_ADMIN.value == 'change_user_role:admin'
|
||||
assert Permission.CHANGE_USER_ROLE_OWNER.value == 'change_user_role:owner'
|
||||
assert Permission.VIEW_ORG_SETTINGS.value == 'view_org_settings'
|
||||
assert Permission.CHANGE_ORGANIZATION_NAME.value == 'change_organization_name'
|
||||
assert Permission.DELETE_ORGANIZATION.value == 'delete_organization'
|
||||
|
||||
def test_permission_from_string(self):
|
||||
"""
|
||||
GIVEN: Valid permission string
|
||||
WHEN: Creating Permission from string
|
||||
THEN: Correct enum value is returned
|
||||
"""
|
||||
assert Permission('manage_secrets') == Permission.MANAGE_SECRETS
|
||||
assert Permission('view_llm_settings') == Permission.VIEW_LLM_SETTINGS
|
||||
assert Permission('delete_organization') == Permission.DELETE_ORGANIZATION
|
||||
|
||||
def test_permission_invalid_string(self):
|
||||
"""
|
||||
GIVEN: Invalid permission string
|
||||
WHEN: Creating Permission from string
|
||||
THEN: ValueError is raised
|
||||
"""
|
||||
with pytest.raises(ValueError):
|
||||
Permission('invalid_permission')
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for RoleName enum
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestRoleName:
|
||||
"""Tests for RoleName enum."""
|
||||
|
||||
def test_role_name_values(self):
|
||||
"""
|
||||
GIVEN: RoleName enum
|
||||
WHEN: Accessing role name values
|
||||
THEN: All expected roles exist with correct string values
|
||||
"""
|
||||
assert RoleName.OWNER.value == 'owner'
|
||||
assert RoleName.ADMIN.value == 'admin'
|
||||
assert RoleName.MEMBER.value == 'member'
|
||||
|
||||
def test_role_name_from_string(self):
|
||||
"""
|
||||
GIVEN: Valid role name string
|
||||
WHEN: Creating RoleName from string
|
||||
THEN: Correct enum value is returned
|
||||
"""
|
||||
assert RoleName('owner') == RoleName.OWNER
|
||||
assert RoleName('admin') == RoleName.ADMIN
|
||||
assert RoleName('member') == RoleName.MEMBER
|
||||
|
||||
def test_role_name_invalid_string(self):
|
||||
"""
|
||||
GIVEN: Invalid role name string
|
||||
WHEN: Creating RoleName from string
|
||||
THEN: ValueError is raised
|
||||
"""
|
||||
with pytest.raises(ValueError):
|
||||
RoleName('invalid_role')
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for ROLE_PERMISSIONS mapping
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestRolePermissions:
|
||||
"""Tests for role permission mappings."""
|
||||
|
||||
def test_owner_has_all_permissions(self):
|
||||
"""
|
||||
GIVEN: ROLE_PERMISSIONS mapping
|
||||
WHEN: Checking owner permissions
|
||||
THEN: Owner has all permissions including owner-only permissions
|
||||
"""
|
||||
owner_perms = ROLE_PERMISSIONS[RoleName.OWNER]
|
||||
assert Permission.MANAGE_SECRETS in owner_perms
|
||||
assert Permission.MANAGE_MCP in owner_perms
|
||||
assert Permission.VIEW_LLM_SETTINGS in owner_perms
|
||||
assert Permission.EDIT_LLM_SETTINGS in owner_perms
|
||||
assert Permission.VIEW_BILLING in owner_perms
|
||||
assert Permission.ADD_CREDITS in owner_perms
|
||||
assert Permission.INVITE_USER_TO_ORGANIZATION in owner_perms
|
||||
assert Permission.CHANGE_USER_ROLE_MEMBER in owner_perms
|
||||
assert Permission.CHANGE_USER_ROLE_ADMIN in owner_perms
|
||||
assert Permission.CHANGE_USER_ROLE_OWNER in owner_perms
|
||||
assert Permission.CHANGE_ORGANIZATION_NAME in owner_perms
|
||||
assert Permission.DELETE_ORGANIZATION in owner_perms
|
||||
|
||||
def test_admin_has_admin_permissions(self):
|
||||
"""
|
||||
GIVEN: ROLE_PERMISSIONS mapping
|
||||
WHEN: Checking admin permissions
|
||||
THEN: Admin has admin permissions but not owner-only permissions
|
||||
"""
|
||||
admin_perms = ROLE_PERMISSIONS[RoleName.ADMIN]
|
||||
assert Permission.MANAGE_SECRETS in admin_perms
|
||||
assert Permission.MANAGE_MCP in admin_perms
|
||||
assert Permission.VIEW_LLM_SETTINGS in admin_perms
|
||||
assert Permission.EDIT_LLM_SETTINGS in admin_perms
|
||||
assert Permission.VIEW_BILLING in admin_perms
|
||||
assert Permission.ADD_CREDITS in admin_perms
|
||||
assert Permission.INVITE_USER_TO_ORGANIZATION in admin_perms
|
||||
assert Permission.CHANGE_USER_ROLE_MEMBER in admin_perms
|
||||
assert Permission.CHANGE_USER_ROLE_ADMIN in admin_perms
|
||||
# Admin should NOT have owner-only permissions
|
||||
assert Permission.CHANGE_USER_ROLE_OWNER not in admin_perms
|
||||
assert Permission.CHANGE_ORGANIZATION_NAME not in admin_perms
|
||||
assert Permission.DELETE_ORGANIZATION not in admin_perms
|
||||
|
||||
def test_member_has_limited_permissions(self):
|
||||
"""
|
||||
GIVEN: ROLE_PERMISSIONS mapping
|
||||
WHEN: Checking member permissions
|
||||
THEN: Member has limited permissions
|
||||
"""
|
||||
member_perms = ROLE_PERMISSIONS[RoleName.MEMBER]
|
||||
# Member has basic settings permissions
|
||||
assert Permission.MANAGE_SECRETS in member_perms
|
||||
assert Permission.MANAGE_MCP in member_perms
|
||||
assert Permission.MANAGE_INTEGRATIONS in member_perms
|
||||
assert Permission.MANAGE_APPLICATION_SETTINGS in member_perms
|
||||
assert Permission.MANAGE_API_KEYS in member_perms
|
||||
assert Permission.VIEW_LLM_SETTINGS in member_perms
|
||||
assert Permission.VIEW_ORG_SETTINGS in member_perms
|
||||
# Member should NOT have admin/owner permissions
|
||||
assert Permission.EDIT_LLM_SETTINGS not in member_perms
|
||||
assert Permission.VIEW_BILLING not in member_perms
|
||||
assert Permission.ADD_CREDITS not in member_perms
|
||||
assert Permission.INVITE_USER_TO_ORGANIZATION not in member_perms
|
||||
assert Permission.CHANGE_USER_ROLE_MEMBER not in member_perms
|
||||
assert Permission.CHANGE_USER_ROLE_ADMIN not in member_perms
|
||||
assert Permission.CHANGE_USER_ROLE_OWNER not in member_perms
|
||||
assert Permission.CHANGE_ORGANIZATION_NAME not in member_perms
|
||||
assert Permission.DELETE_ORGANIZATION not in member_perms
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for get_role_permissions function
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestGetRolePermissions:
|
||||
"""Tests for get_role_permissions function."""
|
||||
|
||||
def test_get_owner_permissions(self):
|
||||
"""
|
||||
GIVEN: Role name 'owner'
|
||||
WHEN: get_role_permissions is called
|
||||
THEN: Owner permissions are returned
|
||||
"""
|
||||
perms = get_role_permissions('owner')
|
||||
assert Permission.DELETE_ORGANIZATION in perms
|
||||
assert Permission.CHANGE_ORGANIZATION_NAME in perms
|
||||
|
||||
def test_get_admin_permissions(self):
|
||||
"""
|
||||
GIVEN: Role name 'admin'
|
||||
WHEN: get_role_permissions is called
|
||||
THEN: Admin permissions are returned
|
||||
"""
|
||||
perms = get_role_permissions('admin')
|
||||
assert Permission.EDIT_LLM_SETTINGS in perms
|
||||
assert Permission.DELETE_ORGANIZATION not in perms
|
||||
|
||||
def test_get_member_permissions(self):
|
||||
"""
|
||||
GIVEN: Role name 'member'
|
||||
WHEN: get_role_permissions is called
|
||||
THEN: Member permissions are returned
|
||||
"""
|
||||
perms = get_role_permissions('member')
|
||||
assert Permission.VIEW_LLM_SETTINGS in perms
|
||||
assert Permission.EDIT_LLM_SETTINGS not in perms
|
||||
|
||||
def test_get_invalid_role_permissions(self):
|
||||
"""
|
||||
GIVEN: Invalid role name
|
||||
WHEN: get_role_permissions is called
|
||||
THEN: Empty frozenset is returned
|
||||
"""
|
||||
perms = get_role_permissions('invalid_role')
|
||||
assert perms == frozenset()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for has_permission function
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestHasPermission:
|
||||
"""Tests for has_permission function."""
|
||||
|
||||
def test_owner_has_delete_organization_permission(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: Checking for DELETE_ORGANIZATION permission
|
||||
THEN: Returns True
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'owner'
|
||||
assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is True
|
||||
|
||||
def test_owner_has_view_llm_settings_permission(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: Checking for VIEW_LLM_SETTINGS permission
|
||||
THEN: Returns True
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'owner'
|
||||
assert has_permission(mock_role, Permission.VIEW_LLM_SETTINGS) is True
|
||||
|
||||
def test_admin_has_edit_llm_settings_permission(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: Checking for EDIT_LLM_SETTINGS permission
|
||||
THEN: Returns True
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
assert has_permission(mock_role, Permission.EDIT_LLM_SETTINGS) is True
|
||||
|
||||
def test_admin_lacks_delete_organization_permission(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: Checking for DELETE_ORGANIZATION permission
|
||||
THEN: Returns False
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is False
|
||||
|
||||
def test_member_has_view_llm_settings_permission(self):
|
||||
"""
|
||||
GIVEN: User with member role
|
||||
WHEN: Checking for VIEW_LLM_SETTINGS permission
|
||||
THEN: Returns True
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
assert has_permission(mock_role, Permission.VIEW_LLM_SETTINGS) is True
|
||||
|
||||
def test_member_lacks_edit_llm_settings_permission(self):
|
||||
"""
|
||||
GIVEN: User with member role
|
||||
WHEN: Checking for EDIT_LLM_SETTINGS permission
|
||||
THEN: Returns False
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
assert has_permission(mock_role, Permission.EDIT_LLM_SETTINGS) is False
|
||||
|
||||
def test_member_lacks_delete_organization_permission(self):
|
||||
"""
|
||||
GIVEN: User with member role
|
||||
WHEN: Checking for DELETE_ORGANIZATION permission
|
||||
THEN: Returns False
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is False
|
||||
|
||||
def test_invalid_role_has_no_permissions(self):
|
||||
"""
|
||||
GIVEN: User with invalid role
|
||||
WHEN: Checking for any permission
|
||||
THEN: Returns False
|
||||
"""
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'invalid_role'
|
||||
assert has_permission(mock_role, Permission.VIEW_LLM_SETTINGS) is False
|
||||
assert has_permission(mock_role, Permission.DELETE_ORGANIZATION) is False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for get_user_org_role function
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestGetUserOrgRole:
|
||||
"""Tests for get_user_org_role function."""
|
||||
|
||||
def test_returns_role_when_member_exists(self):
|
||||
"""
|
||||
GIVEN: User is a member of organization with role
|
||||
WHEN: get_user_org_role is called
|
||||
THEN: Role object is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_org_member = MagicMock()
|
||||
mock_org_member.role_id = 1
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with (
|
||||
patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member',
|
||||
return_value=mock_org_member,
|
||||
),
|
||||
patch(
|
||||
'server.auth.authorization.RoleStore.get_role_by_id',
|
||||
return_value=mock_role,
|
||||
),
|
||||
):
|
||||
result = get_user_org_role(user_id, org_id)
|
||||
assert result == mock_role
|
||||
|
||||
def test_returns_none_when_not_member(self):
|
||||
"""
|
||||
GIVEN: User is not a member of organization
|
||||
WHEN: get_user_org_role is called
|
||||
THEN: None is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member',
|
||||
return_value=None,
|
||||
):
|
||||
result = get_user_org_role(user_id, org_id)
|
||||
assert result is None
|
||||
|
||||
def test_returns_role_when_org_id_is_none(self):
|
||||
"""
|
||||
GIVEN: User with a current organization
|
||||
WHEN: get_user_org_role is called with org_id=None
|
||||
THEN: Role object is returned using get_org_member_for_current_org
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
|
||||
mock_org_member = MagicMock()
|
||||
mock_org_member.role_id = 1
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with (
|
||||
patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member_for_current_org',
|
||||
return_value=mock_org_member,
|
||||
) as mock_get_current,
|
||||
patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member',
|
||||
) as mock_get_org_member,
|
||||
patch(
|
||||
'server.auth.authorization.RoleStore.get_role_by_id',
|
||||
return_value=mock_role,
|
||||
),
|
||||
):
|
||||
result = get_user_org_role(user_id, None)
|
||||
assert result == mock_role
|
||||
mock_get_current.assert_called_once()
|
||||
mock_get_org_member.assert_not_called()
|
||||
|
||||
def test_returns_none_when_org_id_is_none_and_no_current_org(self):
|
||||
"""
|
||||
GIVEN: User with no current organization membership
|
||||
WHEN: get_user_org_role is called with org_id=None
|
||||
THEN: None is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.OrgMemberStore.get_org_member_for_current_org',
|
||||
return_value=None,
|
||||
):
|
||||
result = get_user_org_role(user_id, None)
|
||||
assert result is None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for require_permission dependency
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestRequirePermission:
|
||||
"""Tests for require_permission dependency factory."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_user_id_when_authorized(self):
|
||||
"""
|
||||
GIVEN: User with required permission
|
||||
WHEN: Permission checker is called
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS)
|
||||
result = await permission_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_401_when_not_authenticated(self):
|
||||
"""
|
||||
GIVEN: No user ID (not authenticated)
|
||||
WHEN: Permission checker is called
|
||||
THEN: 401 Unauthorized is raised
|
||||
"""
|
||||
org_id = uuid4()
|
||||
|
||||
permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=org_id, user_id=None)
|
||||
|
||||
assert exc_info.value.status_code == 401
|
||||
assert 'not authenticated' in exc_info.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_403_when_not_member(self):
|
||||
"""
|
||||
GIVEN: User is not a member of organization
|
||||
WHEN: Permission checker is called
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=None),
|
||||
):
|
||||
permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert 'not a member' in exc_info.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_403_when_insufficient_permission(self):
|
||||
"""
|
||||
GIVEN: User without required permission
|
||||
WHEN: Permission checker is called
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.DELETE_ORGANIZATION)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert 'delete_organization' in exc_info.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_owner_can_delete_organization(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: DELETE_ORGANIZATION permission is required
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'owner'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.DELETE_ORGANIZATION)
|
||||
result = await permission_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_cannot_delete_organization(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: DELETE_ORGANIZATION permission is required
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.DELETE_ORGANIZATION)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_logs_warning_on_insufficient_permission(self):
|
||||
"""
|
||||
GIVEN: User without required permission
|
||||
WHEN: Permission checker is called
|
||||
THEN: Warning is logged with details
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
|
||||
with (
|
||||
patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
),
|
||||
patch('server.auth.authorization.logger') as mock_logger,
|
||||
):
|
||||
permission_checker = require_permission(Permission.DELETE_ORGANIZATION)
|
||||
with pytest.raises(HTTPException):
|
||||
await permission_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
mock_logger.warning.assert_called()
|
||||
call_args = mock_logger.warning.call_args
|
||||
assert call_args[1]['extra']['user_id'] == user_id
|
||||
assert call_args[1]['extra']['user_role'] == 'member'
|
||||
assert call_args[1]['extra']['required_permission'] == 'delete_organization'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_user_id_when_org_id_is_none(self):
|
||||
"""
|
||||
GIVEN: User with required permission in their current org
|
||||
WHEN: Permission checker is called with org_id=None
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
) as mock_get_role:
|
||||
permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS)
|
||||
result = await permission_checker(org_id=None, user_id=user_id)
|
||||
assert result == user_id
|
||||
mock_get_role.assert_called_once_with(user_id, None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_403_when_org_id_is_none_and_not_member(self):
|
||||
"""
|
||||
GIVEN: User not a member of their current organization
|
||||
WHEN: Permission checker is called with org_id=None
|
||||
THEN: HTTPException with 403 status is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=None),
|
||||
):
|
||||
permission_checker = require_permission(Permission.VIEW_LLM_SETTINGS)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=None, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
assert 'not a member' in exc_info.value.detail
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for permission-based access control scenarios
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestPermissionScenarios:
|
||||
"""Tests for real-world permission scenarios."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_member_can_manage_secrets(self):
|
||||
"""
|
||||
GIVEN: User with member role
|
||||
WHEN: MANAGE_SECRETS permission is required
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.MANAGE_SECRETS)
|
||||
result = await permission_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_member_cannot_invite_users(self):
|
||||
"""
|
||||
GIVEN: User with member role
|
||||
WHEN: INVITE_USER_TO_ORGANIZATION permission is required
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'member'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(
|
||||
Permission.INVITE_USER_TO_ORGANIZATION
|
||||
)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_can_invite_users(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: INVITE_USER_TO_ORGANIZATION permission is required
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(
|
||||
Permission.INVITE_USER_TO_ORGANIZATION
|
||||
)
|
||||
result = await permission_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_cannot_change_owner_role(self):
|
||||
"""
|
||||
GIVEN: User with admin role
|
||||
WHEN: CHANGE_USER_ROLE_OWNER permission is required
|
||||
THEN: 403 Forbidden is raised
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'admin'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.CHANGE_USER_ROLE_OWNER)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await permission_checker(org_id=org_id, user_id=user_id)
|
||||
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_owner_can_change_owner_role(self):
|
||||
"""
|
||||
GIVEN: User with owner role
|
||||
WHEN: CHANGE_USER_ROLE_OWNER permission is required
|
||||
THEN: User ID is returned
|
||||
"""
|
||||
user_id = str(uuid4())
|
||||
org_id = uuid4()
|
||||
|
||||
mock_role = MagicMock()
|
||||
mock_role.name = 'owner'
|
||||
|
||||
with patch(
|
||||
'server.auth.authorization.get_user_org_role_async',
|
||||
AsyncMock(return_value=mock_role),
|
||||
):
|
||||
permission_checker = require_permission(Permission.CHANGE_USER_ROLE_OWNER)
|
||||
result = await permission_checker(org_id=org_id, user_id=user_id)
|
||||
assert result == user_id
|
||||
@@ -158,6 +158,57 @@ def test_get_org_member(session_maker):
|
||||
assert retrieved_org_member.llm_api_key.get_secret_value() == 'test-key'
|
||||
|
||||
|
||||
def test_get_org_member_for_current_org(session_maker):
|
||||
# Test getting org_member for user's current organization
|
||||
with session_maker() as session:
|
||||
# Create test data - user belongs to two orgs but current_org is org1
|
||||
org1 = Org(name='test-org-1')
|
||||
org2 = Org(name='test-org-2')
|
||||
session.add_all([org1, org2])
|
||||
session.flush()
|
||||
|
||||
user = User(id=uuid.uuid4(), current_org_id=org1.id)
|
||||
role = Role(name='admin', rank=1)
|
||||
session.add_all([user, role])
|
||||
session.flush()
|
||||
|
||||
org_member1 = OrgMember(
|
||||
org_id=org1.id,
|
||||
user_id=user.id,
|
||||
role_id=role.id,
|
||||
llm_api_key='test-key-1',
|
||||
status='active',
|
||||
)
|
||||
org_member2 = OrgMember(
|
||||
org_id=org2.id,
|
||||
user_id=user.id,
|
||||
role_id=role.id,
|
||||
llm_api_key='test-key-2',
|
||||
status='active',
|
||||
)
|
||||
session.add_all([org_member1, org_member2])
|
||||
session.commit()
|
||||
user_id = user.id
|
||||
org1_id = org1.id
|
||||
|
||||
# Test retrieval - should return org_member for current_org (org1)
|
||||
with patch('storage.org_member_store.session_maker', session_maker):
|
||||
retrieved_org_member = OrgMemberStore.get_org_member_for_current_org(user_id)
|
||||
assert retrieved_org_member is not None
|
||||
assert retrieved_org_member.org_id == org1_id
|
||||
assert retrieved_org_member.user_id == user_id
|
||||
assert retrieved_org_member.llm_api_key.get_secret_value() == 'test-key-1'
|
||||
|
||||
|
||||
def test_get_org_member_for_current_org_user_not_found(session_maker):
|
||||
# Test getting org_member for non-existent user
|
||||
with patch('storage.org_member_store.session_maker', session_maker):
|
||||
retrieved_org_member = OrgMemberStore.get_org_member_for_current_org(
|
||||
uuid.uuid4()
|
||||
)
|
||||
assert retrieved_org_member is None
|
||||
|
||||
|
||||
def test_add_user_to_org(session_maker):
|
||||
# Test adding a user to an org
|
||||
with session_maker() as session:
|
||||
|
||||
Reference in New Issue
Block a user