""" Store class for managing organizational settings. """ import functools import os from typing import Any, Awaitable, Callable import httpx from pydantic import SecretStr from server.auth.token_manager import TokenManager from server.constants import ( LITE_LLM_API_KEY, LITE_LLM_API_URL, LITE_LLM_TEAM_ID, ORG_SETTINGS_VERSION, get_default_litellm_model, ) from server.logger import logger from storage.encrypt_utils import decrypt_legacy_value from storage.user_settings import UserSettings from openhands.server.settings import Settings from openhands.utils.http_session import httpx_verify_option # Timeout in seconds for key verification requests to LiteLLM KEY_VERIFICATION_TIMEOUT = 5.0 # A very large number to represent "unlimited" until LiteLLM fixes their unlimited update bug. UNLIMITED_BUDGET_SETTING = 1000000000.0 try: DEFAULT_INITIAL_BUDGET = float(os.environ.get('DEFAULT_INITIAL_BUDGET', 0.0)) if DEFAULT_INITIAL_BUDGET < 0: raise ValueError( f'DEFAULT_INITIAL_BUDGET must be non-negative, got {DEFAULT_INITIAL_BUDGET}' ) except ValueError as e: raise ValueError(f'Invalid DEFAULT_INITIAL_BUDGET environment variable: {e}') from e def get_openhands_cloud_key_alias(keycloak_user_id: str, org_id: str) -> str: """Generate the key alias for OpenHands Cloud managed keys.""" return f'OpenHands Cloud - user {keycloak_user_id} - org {org_id}' def get_byor_key_alias(keycloak_user_id: str, org_id: str) -> str: """Generate the key alias for BYOR (Bring Your Own Runtime) keys.""" return f'BYOR Key - user {keycloak_user_id}, org {org_id}' class LiteLlmManager: """Manage LiteLLM interactions.""" @staticmethod def get_budget_from_team_info( user_team_info: dict | None, user_id: str, org_id: str ) -> tuple[float, float]: """Extract max_budget and spend from user team info. For personal orgs (user_id == org_id), uses litellm_budget_table.max_budget. For team orgs, uses max_budget_in_team (populated by get_user_team_info). Args: user_team_info: The response from get_user_team_info user_id: The user's ID org_id: The organization's ID Returns: Tuple of (max_budget, spend) """ if not user_team_info: return 0, 0 spend = user_team_info.get('spend', 0) if user_id == org_id: max_budget = (user_team_info.get('litellm_budget_table') or {}).get( 'max_budget', 0 ) else: max_budget = user_team_info.get('max_budget_in_team') or 0 return max_budget, spend @staticmethod async def create_entries( org_id: str, keycloak_user_id: str, oss_settings: Settings, create_user: bool, ) -> Settings | None: logger.info( 'SettingsStore:update_settings_with_litellm_default:start', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None) key = LITE_LLM_API_KEY if not local_deploy: # Get user info to add to litellm token_manager = TokenManager() keycloak_user_info = ( await token_manager.get_user_info_from_user_id(keycloak_user_id) or {} ) async with httpx.AsyncClient( headers={ 'x-goog-api-key': LITE_LLM_API_KEY, } ) as client: # Check if team already exists and get its budget # New users joining existing orgs should inherit the team's budget team_budget: float = DEFAULT_INITIAL_BUDGET try: existing_team = await LiteLlmManager._get_team(client, org_id) if existing_team: team_info = existing_team.get('team_info', {}) team_budget = team_info.get('max_budget', 0.0) or 0.0 logger.info( 'LiteLlmManager:create_entries:existing_team_budget', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'team_budget': team_budget, }, ) except httpx.HTTPStatusError as e: # Team doesn't exist yet (404) - this is expected for first user if e.response.status_code != 404: raise logger.info( 'LiteLlmManager:create_entries:no_existing_team', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._create_team( client, keycloak_user_id, org_id, team_budget ) if create_user: await LiteLlmManager._create_user( client, keycloak_user_info.get('email'), keycloak_user_id ) await LiteLlmManager._add_user_to_team( client, keycloak_user_id, org_id, team_budget ) # We delete the key if it already exists. In environments where multiple # installations are using the same keycloak and litellm instance, this # will mean other installations will have their key invalidated. key_alias = get_openhands_cloud_key_alias(keycloak_user_id, org_id) try: await LiteLlmManager._delete_key_by_alias(client, key_alias) except httpx.HTTPStatusError as ex: if ex.status_code == 404: logger.debug(f'Key "{key_alias}" did not exist - continuing') else: raise key = await LiteLlmManager._generate_key( client, keycloak_user_id, org_id, key_alias, None, ) oss_settings.agent = 'CodeActAgent' # Use the model corresponding to the current user settings version oss_settings.llm_model = get_default_litellm_model() oss_settings.llm_api_key = SecretStr(key) oss_settings.llm_base_url = LITE_LLM_API_URL return oss_settings @staticmethod async def migrate_entries( org_id: str, keycloak_user_id: str, user_settings: UserSettings, ) -> UserSettings | None: logger.info( 'LiteLlmManager:migrate_lite_llm_entries:start', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None) if not local_deploy: # Get user info to add to litellm async with httpx.AsyncClient( headers={ 'x-goog-api-key': LITE_LLM_API_KEY, } ) as client: user_json = await LiteLlmManager._get_user(client, keycloak_user_id) if not user_json: return None user_info = user_json['user_info'] # Log original user values before any modifications for debugging original_max_budget = user_info.get('max_budget') original_spend = user_info.get('spend') logger.info( 'LiteLlmManager:migrate_lite_llm_entries:original_user_values', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'original_max_budget': original_max_budget, 'original_spend': original_spend, }, ) max_budget = ( original_max_budget if original_max_budget is not None else 0.0 ) spend = original_spend if original_spend is not None else 0.0 # In upgrade to V4, we no longer use billing margin, but instead apply this directly # in litellm. The default billing marign was 2 before this (hence the magic numbers below) if ( user_settings and user_settings.user_version < 4 and user_settings.billing_margin and user_settings.billing_margin != 1.0 ): billing_margin = user_settings.billing_margin logger.info( 'user_settings_v4_budget_upgrade', extra={ 'max_budget': max_budget, 'billing_margin': billing_margin, 'spend': spend, }, ) max_budget *= billing_margin spend *= billing_margin # Check if max_budget is None (not 0.0) or set to unlimited to determine if already migrated # A user with max_budget=0.0 is different from max_budget=None if ( original_max_budget is None or original_max_budget == UNLIMITED_BUDGET_SETTING ): # if max_budget is None or UNLIMITED, then we've already migrated the User logger.info( 'LiteLlmManager:migrate_lite_llm_entries:already_migrated', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'original_max_budget': original_max_budget, }, ) return None credits = max(max_budget - spend, 0.0) # Log calculated migration values before performing updates logger.info( 'LiteLlmManager:migrate_lite_llm_entries:calculated_values', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'adjusted_max_budget': max_budget, 'adjusted_spend': spend, 'calculated_credits': credits, 'new_user_max_budget': UNLIMITED_BUDGET_SETTING, }, ) logger.debug( 'LiteLlmManager:migrate_lite_llm_entries:create_team', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._create_team( client, keycloak_user_id, org_id, credits ) logger.debug( 'LiteLlmManager:migrate_lite_llm_entries:update_user', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._update_user( client, keycloak_user_id, max_budget=UNLIMITED_BUDGET_SETTING ) logger.debug( 'LiteLlmManager:migrate_lite_llm_entries:add_user_to_team', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._add_user_to_team( client, keycloak_user_id, org_id, credits ) logger.debug( 'LiteLlmManager:migrate_lite_llm_entries:update_user_keys', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._update_user_keys( client, keycloak_user_id, team_id=org_id, ) # Check if the database key exists in LiteLLM # If not, generate a new key to prevent verification failures later db_key = None if ( user_settings and user_settings.llm_api_key and user_settings.llm_base_url == LITE_LLM_API_URL ): db_key = user_settings.llm_api_key if hasattr(db_key, 'get_secret_value'): db_key = db_key.get_secret_value() if db_key: # Verify the database key exists in LiteLLM key_valid = await LiteLlmManager.verify_key( db_key, keycloak_user_id ) if not key_valid: logger.warning( 'LiteLlmManager:migrate_lite_llm_entries:db_key_not_in_litellm', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'key_prefix': db_key[:10] + '...' if len(db_key) > 10 else db_key, }, ) # Generate a new key for the user new_key = await LiteLlmManager._generate_key( client, keycloak_user_id, org_id, get_openhands_cloud_key_alias(keycloak_user_id, org_id), None, ) logger.info( 'LiteLlmManager:migrate_lite_llm_entries:generated_new_key', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) # Update user_settings with the new key so it gets stored in org_member user_settings.llm_api_key = SecretStr(new_key) user_settings.llm_api_key_for_byor = SecretStr(new_key) logger.info( 'LiteLlmManager:migrate_lite_llm_entries:complete', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) return user_settings @staticmethod async def downgrade_entries( org_id: str, keycloak_user_id: str, user_settings: UserSettings, ) -> UserSettings | None: """Downgrade a migrated user's LiteLLM entries back to the pre-migration state. This reverses the migrate_entries operation: 1. Get the user max budget from their org team in litellm 2. Set the max budget in the user in litellm (restore from team) 3. Add the user back to the default team in litellm 4. Update keys to remove org team association 5. Remove the user from their org team in litellm 6. Delete the user org team in litellm Note: The database changes (already_migrated flag, org/org_member deletion) should be handled separately by the caller. Args: org_id: The organization ID (which is also the team_id in litellm) keycloak_user_id: The user's Keycloak ID user_settings: The user's settings object Returns: The user_settings if downgrade was successful, None otherwise """ logger.info( 'LiteLlmManager:downgrade_entries:start', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None) if not local_deploy: async with httpx.AsyncClient( headers={ 'x-goog-api-key': LITE_LLM_API_KEY, } ) as client: # Step 1: Get the team info to retrieve the budget logger.debug( 'LiteLlmManager:downgrade_entries:get_team', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) team_info = await LiteLlmManager._get_team(client, org_id) if not team_info: logger.error( 'LiteLlmManager:downgrade_entries:team_not_found', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) return None # Get team budget (max_budget) and spend to calculate current credits team_data = team_info.get('team_info', {}) max_budget = team_data.get('max_budget', 0.0) spend = team_data.get('spend', 0.0) # Get user membership info for budget in team user_membership = await LiteLlmManager._get_user_team_info( client, keycloak_user_id, org_id ) if user_membership: # Use user's budget in team if available user_max_budget_in_team = user_membership.get('max_budget_in_team') user_spend_in_team = user_membership.get('spend', 0.0) if user_max_budget_in_team is not None: max_budget = user_max_budget_in_team spend = user_spend_in_team # Calculate total budget to restore (credits + spend = max_budget) # We restore the full max_budget that was on the team/user-in-team restored_budget = max_budget if max_budget else 0.0 logger.debug( 'LiteLlmManager:downgrade_entries:budget_info', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'max_budget': max_budget, 'spend': spend, 'restored_budget': restored_budget, }, ) # Step 2: Update user to set their max_budget back from unlimited logger.debug( 'LiteLlmManager:downgrade_entries:update_user', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._update_user( client, keycloak_user_id, max_budget=restored_budget, spend=spend ) # Step 3: Add user back to the default team if LITE_LLM_TEAM_ID: logger.debug( 'LiteLlmManager:downgrade_entries:add_to_default_team', extra={ 'org_id': org_id, 'user_id': keycloak_user_id, 'default_team_id': LITE_LLM_TEAM_ID, }, ) await LiteLlmManager._add_user_to_team( client, keycloak_user_id, LITE_LLM_TEAM_ID, restored_budget ) # Step 4: Update all user keys to remove org team association (set team_id to default) logger.debug( 'LiteLlmManager:downgrade_entries:update_user_keys', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._update_user_keys( client, keycloak_user_id, team_id=LITE_LLM_TEAM_ID, ) # Step 5: Remove user from their org team logger.debug( 'LiteLlmManager:downgrade_entries:remove_from_org_team', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._remove_user_from_team( client, keycloak_user_id, org_id ) # Step 6: Delete the org team logger.debug( 'LiteLlmManager:downgrade_entries:delete_team', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) await LiteLlmManager._delete_team(client, org_id) logger.info( 'LiteLlmManager:downgrade_entries:complete', extra={'org_id': org_id, 'user_id': keycloak_user_id}, ) return user_settings @staticmethod async def update_team_and_users_budget( team_id: str, max_budget: float, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return async with httpx.AsyncClient( headers={ 'x-goog-api-key': LITE_LLM_API_KEY, } ) as client: await LiteLlmManager._update_team(client, team_id, None, max_budget) team_info = await LiteLlmManager._get_team(client, team_id) if not team_info: return None # TODO: change to use bulk update endpoint for membership in team_info.get('team_memberships', []): user_id = membership.get('user_id') if not user_id: continue await LiteLlmManager._update_user_in_team( client, user_id, team_id, max_budget ) @staticmethod async def _create_team( client: httpx.AsyncClient, team_alias: str, team_id: str, max_budget: float, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return json_data: dict[str, Any] = { 'team_id': team_id, 'team_alias': team_alias, 'models': [], 'spend': 0, 'metadata': { 'version': ORG_SETTINGS_VERSION, 'model': get_default_litellm_model(), }, } if max_budget is not None: json_data['max_budget'] = max_budget response = await client.post( f'{LITE_LLM_API_URL}/team/new', json=json_data, ) # Team failed to create in litellm - this is an unforseen error state... if not response.is_success: if ( response.status_code == 400 and 'already exists. Please use a different team id' in response.text ): # team already exists, so update, then return await LiteLlmManager._update_team( client, team_id, team_alias, max_budget ) return logger.error( 'error_creating_litellm_team', extra={ 'status_code': response.status_code, 'text': response.text, 'team_id': team_id, 'max_budget': max_budget, }, ) response.raise_for_status() @staticmethod async def _get_team(client: httpx.AsyncClient, team_id: str) -> dict | None: if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None """Get a team from litellm with the id matching that given.""" response = await client.get( f'{LITE_LLM_API_URL}/team/info?team_id={team_id}', ) response.raise_for_status() return response.json() @staticmethod async def _update_team( client: httpx.AsyncClient, team_id: str, team_alias: str | None, max_budget: float | None, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return json_data: dict[str, Any] = { 'team_id': team_id, 'metadata': { 'version': ORG_SETTINGS_VERSION, 'model': get_default_litellm_model(), }, } if max_budget is not None: json_data['max_budget'] = max_budget if team_alias is not None: json_data['team_alias'] = team_alias response = await client.post( f'{LITE_LLM_API_URL}/team/update', json=json_data, ) # Team failed to update in litellm - this is an unforseen error state... if not response.is_success: logger.error( 'error_updating_litellm_team', extra={ 'status_code': response.status_code, 'text': response.text, 'team_id': [team_id], 'max_budget': max_budget, }, ) response.raise_for_status() @staticmethod async def _create_user( client: httpx.AsyncClient, email: str | None, keycloak_user_id: str, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return response = await client.post( f'{LITE_LLM_API_URL}/user/new', json={ 'user_email': email, 'models': [], 'user_id': keycloak_user_id, 'teams': [LITE_LLM_TEAM_ID], 'auto_create_key': False, 'send_invite_email': False, 'metadata': { 'version': ORG_SETTINGS_VERSION, 'model': get_default_litellm_model(), }, }, ) if not response.is_success: logger.warning( 'duplicate_user_email', extra={ 'user_id': keycloak_user_id, 'email': email, }, ) # Litellm insists on unique email addresses - it is possible the email address was registered with a different user. response = await client.post( f'{LITE_LLM_API_URL}/user/new', json={ 'user_email': None, 'models': [], 'user_id': keycloak_user_id, 'teams': [LITE_LLM_TEAM_ID], 'auto_create_key': False, 'send_invite_email': False, 'metadata': { 'version': ORG_SETTINGS_VERSION, 'model': get_default_litellm_model(), }, }, ) # User failed to create in litellm - this is an unforseen error state... if not response.is_success: if ( response.status_code in (400, 409) and 'already exists' in response.text ): logger.warning( 'litellm_user_already_exists', extra={ 'user_id': keycloak_user_id, }, ) return logger.error( 'error_creating_litellm_user', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': [keycloak_user_id], 'email': None, }, ) response.raise_for_status() @staticmethod async def _get_user(client: httpx.AsyncClient, user_id: str) -> dict | None: if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None """Get a user from litellm with the id matching that given.""" response = await client.get( f'{LITE_LLM_API_URL}/user/info?user_id={user_id}', ) response.raise_for_status() return response.json() @staticmethod async def _update_user( client: httpx.AsyncClient, keycloak_user_id: str, **kwargs, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return payload = { 'user_id': keycloak_user_id, } payload.update(kwargs) response = await client.post( f'{LITE_LLM_API_URL}/user/update', json=payload, ) if not response.is_success: logger.error( 'error_updating_litellm_user', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': keycloak_user_id, }, ) response.raise_for_status() @staticmethod async def _update_key( client: httpx.AsyncClient, keycloak_user_id: str, key: str, **kwargs, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return try: # Sometimes the key we get is encrypted - attempt to decrypt. key = decrypt_legacy_value(key) except Exception: # The key was not encrypted pass payload = { 'key': key, } payload.update(kwargs) response = await client.post( f'{LITE_LLM_API_URL}/key/update', json=payload, ) if not response.is_success: if response.status_code == 401: logger.warning( 'invalid_litellm_key_during_update', extra={ 'user_id': keycloak_user_id, 'text': response.text, }, ) return logger.error( 'error_updating_litellm_key', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': keycloak_user_id, }, ) response.raise_for_status() @staticmethod async def _get_user_keys( client: httpx.AsyncClient, keycloak_user_id: str, ) -> list[str]: """Get all keys for a user from LiteLLM. Args: client: The HTTP client to use for the request keycloak_user_id: The user's Keycloak ID Returns: A list of key strings belonging to the user """ if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return [] response = await client.get( f'{LITE_LLM_API_URL}/key/list', params={'user_id': keycloak_user_id}, ) if not response.is_success: logger.error( 'error_getting_user_keys', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': keycloak_user_id, }, ) return [] response_json = response.json() keys = response_json.get('keys', []) logger.debug( 'LiteLlmManager:_get_user_keys:keys_retrieved', extra={ 'user_id': keycloak_user_id, 'key_count': len(keys), }, ) return keys @staticmethod async def _update_user_keys( client: httpx.AsyncClient, keycloak_user_id: str, **kwargs, ): """Update all keys belonging to a user with the given parameters. Args: client: The HTTP client to use for the request keycloak_user_id: The user's Keycloak ID **kwargs: Parameters to update on each key (e.g., team_id) """ keys = await LiteLlmManager._get_user_keys(client, keycloak_user_id) logger.debug( 'LiteLlmManager:_update_user_keys:updating_keys', extra={ 'user_id': keycloak_user_id, 'key_count': len(keys), }, ) for key in keys: await LiteLlmManager._update_key(client, keycloak_user_id, key, **kwargs) @staticmethod async def _delete_user( client: httpx.AsyncClient, keycloak_user_id: str, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return response = await client.post( f'{LITE_LLM_API_URL}/user/delete', json={'user_ids': [keycloak_user_id]} ) if not response.is_success: logger.error( 'error_deleting_litellm_user', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': [keycloak_user_id], }, ) response.raise_for_status() @staticmethod async def _delete_team( client: httpx.AsyncClient, team_id: str, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return response = await client.post( f'{LITE_LLM_API_URL}/team/delete', json={'team_ids': [team_id]}, ) if not response.is_success: if response.status_code == 404: # Team doesn't exist, that's fine logger.info( 'Team already deleted or does not exist', extra={'team_id': team_id}, ) return logger.error( 'error_deleting_litellm_team', extra={ 'status_code': response.status_code, 'text': response.text, 'team_id': team_id, }, ) response.raise_for_status() logger.info( 'LiteLlmManager:_delete_team:team_deleted', extra={'team_id': team_id}, ) @staticmethod async def _add_user_to_team( client: httpx.AsyncClient, keycloak_user_id: str, team_id: str, max_budget: float, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return json_data: dict[str, Any] = { 'team_id': team_id, 'member': {'user_id': keycloak_user_id, 'role': 'user'}, } if max_budget is not None: json_data['max_budget_in_team'] = max_budget response = await client.post( f'{LITE_LLM_API_URL}/team/member_add', json=json_data, ) # Failed to add user to team - this is an unforseen error state... if not response.is_success: if ( response.status_code == 400 and 'already in team' in response.text.lower() ): logger.warning( 'user_already_in_team', extra={ 'user_id': keycloak_user_id, 'team_id': team_id, }, ) return logger.error( 'error_adding_litellm_user_to_team', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': [keycloak_user_id], 'team_id': [team_id], 'max_budget': max_budget, }, ) response.raise_for_status() @staticmethod async def _get_user_team_info( client: httpx.AsyncClient, keycloak_user_id: str, team_id: str, ) -> dict | None: if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None team_response = await LiteLlmManager._get_team(client, team_id) if not team_response: return None # Filter team_memberships based on team_id and keycloak_user_id user_membership = next( ( membership for membership in team_response.get('team_memberships', []) if membership.get('user_id') == keycloak_user_id and membership.get('team_id') == team_id ), None, ) if not user_membership: return None # For team orgs (user_id != team_id), include team-level budget info # The team's max_budget and spend are shared across all members if keycloak_user_id != team_id: team_info = team_response.get('team_info', {}) user_membership['max_budget_in_team'] = team_info.get('max_budget') user_membership['spend'] = team_info.get('spend', 0) return user_membership @staticmethod async def _update_user_in_team( client: httpx.AsyncClient, keycloak_user_id: str, team_id: str, max_budget: float, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return json_data: dict[str, Any] = { 'team_id': team_id, 'user_id': keycloak_user_id, } if max_budget is not None: json_data['max_budget_in_team'] = max_budget response = await client.post( f'{LITE_LLM_API_URL}/team/member_update', json=json_data, ) # Failed to update user in team - this is an unforseen error state... if not response.is_success: logger.error( 'error_updating_litellm_user_in_team', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': [keycloak_user_id], 'team_id': [team_id], 'max_budget': max_budget, }, ) response.raise_for_status() @staticmethod async def _remove_user_from_team( client: httpx.AsyncClient, keycloak_user_id: str, team_id: str, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return response = await client.post( f'{LITE_LLM_API_URL}/team/member_delete', json={ 'team_id': team_id, 'user_id': keycloak_user_id, }, ) if not response.is_success: if response.status_code == 404: # User not in team, that's fine for downgrade logger.info( 'User not in team during removal', extra={'user_id': keycloak_user_id, 'team_id': team_id}, ) return logger.error( 'error_removing_litellm_user_from_team', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': keycloak_user_id, 'team_id': team_id, }, ) response.raise_for_status() logger.info( 'LiteLlmManager:_remove_user_from_team:user_removed', extra={'user_id': keycloak_user_id, 'team_id': team_id}, ) @staticmethod async def _generate_key( client: httpx.AsyncClient, keycloak_user_id: str, team_id: str | None, key_alias: str | None, metadata: dict | None, ) -> str: if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: raise ValueError('LiteLLM API configuration not found') json_data: dict[str, Any] = { 'user_id': keycloak_user_id, 'models': [], } if team_id is not None: json_data['team_id'] = team_id if key_alias is not None: json_data['key_alias'] = key_alias if metadata is not None: json_data['metadata'] = metadata response = await client.post( f'{LITE_LLM_API_URL}/key/generate', json=json_data, ) # Failed to generate user key for team - this is an unforseen error state... if not response.is_success: logger.error( 'error_generate_user_team_key', extra={ 'status_code': response.status_code, 'text': response.text, 'user_id': keycloak_user_id, 'team_id': team_id, 'key_alias': key_alias, }, ) response.raise_for_status() response_json = response.json() key = response_json['key'] logger.info( 'LiteLlmManager:_lite_llm_generate_user_team_key:key_created', extra={ 'user_id': keycloak_user_id, 'team_id': team_id, 'key_alias': key_alias, }, ) return key @staticmethod async def verify_key(key: str, user_id: str) -> bool: """Verify that a key is valid in LiteLLM by making a lightweight API call. Args: key: The key to verify user_id: The user ID for logging purposes Returns: True if the key is verified as valid, False if verification fails or key is invalid. Returns False on network errors/timeouts to ensure we don't return potentially invalid keys. """ if not (LITE_LLM_API_URL and key): return False try: async with httpx.AsyncClient( verify=httpx_verify_option(), timeout=KEY_VERIFICATION_TIMEOUT, ) as client: # Make a lightweight request to verify the key # Using /v1/models endpoint as it's lightweight and requires authentication response = await client.get( f'{LITE_LLM_API_URL}/v1/models', headers={ 'Authorization': f'Bearer {key}', }, ) # Only 200 status code indicates valid key if response.status_code == 200: logger.debug( 'Key verification successful', extra={'user_id': user_id}, ) return True # All other status codes (401, 403, 500, etc.) are treated as invalid # This includes authentication errors and server errors logger.warning( 'Key verification failed - treating as invalid', extra={ 'user_id': user_id, 'status_code': response.status_code, 'key_prefix': key[:10] + '...' if len(key) > 10 else key, }, ) return False except (httpx.TimeoutException, Exception) as e: # Any exception (timeout, network error, etc.) means we can't verify # Return False to trigger regeneration rather than returning potentially invalid key logger.warning( 'Key verification error - treating as invalid to ensure key validity', extra={ 'user_id': user_id, 'error': str(e), 'error_type': type(e).__name__, }, ) return False @staticmethod async def _get_key_info( client: httpx.AsyncClient, org_id: str, keycloak_user_id: str, ) -> dict | None: from storage.user_store import UserStore if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return None user = await UserStore.get_user_by_id(keycloak_user_id) if not user: return {} org_member = None for om in user.org_members: if om.org_id == org_id: org_member = om break if not org_member or not org_member.llm_api_key: return {} response = await client.get( f'{LITE_LLM_API_URL}/key/info?key={org_member.llm_api_key}' ) response.raise_for_status() response_json = response.json() key_info = response_json.get('info') if not key_info: return {} return { 'key_max_budget': key_info.get('max_budget'), 'key_spend': key_info.get('spend'), } @staticmethod async def _get_all_keys_for_user( client: httpx.AsyncClient, keycloak_user_id: str, ) -> list[dict]: """Get all keys for a user from LiteLLM. Returns a list of key info dictionaries containing: - token: the key value (hashed or partial) - key_alias: the alias for the key - key_name: the name of the key - spend: the amount spent on this key - max_budget: the max budget for this key - team_id: the team the key belongs to - metadata: any metadata associated with the key Returns an empty list if no keys found or on error. """ if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return [] try: response = await client.get( f'{LITE_LLM_API_URL}/user/info?user_id={keycloak_user_id}', headers={'x-goog-api-key': LITE_LLM_API_KEY}, ) response.raise_for_status() user_json = response.json() # The user/info endpoint returns keys in the 'keys' field return user_json.get('keys', []) except Exception as e: logger.warning( 'LiteLlmManager:_get_all_keys_for_user:error', extra={ 'user_id': keycloak_user_id, 'error': str(e), }, ) return [] @staticmethod async def _verify_existing_key( client: httpx.AsyncClient, key_value: str, keycloak_user_id: str, org_id: str, openhands_type: bool = False, ) -> bool: """Check if an existing key exists for the user/org in LiteLLM. Verifies the provided key_value matches a key registered in LiteLLM for the given user and organization. For openhands_type=True, looks for keys with metadata type='openhands' and matching team_id. For openhands_type=False, looks for keys with matching alias and team_id. Returns True if the key is found and valid, False otherwise. """ found = False keys = await LiteLlmManager._get_all_keys_for_user(client, keycloak_user_id) for key_info in keys: metadata = key_info.get('metadata') or {} team_id = key_info.get('team_id') key_alias = key_info.get('key_alias') token = None if ( openhands_type and metadata.get('type') == 'openhands' and team_id == org_id ): # Found an existing OpenHands key for this org key_name = key_info.get('key_name') token = key_name[-4:] if key_name else None # last 4 digits of key if token and key_value.endswith( token ): # check if this is our current key found = True break if ( not openhands_type and team_id == org_id and ( key_alias == get_openhands_cloud_key_alias(keycloak_user_id, org_id) or key_alias == get_byor_key_alias(keycloak_user_id, org_id) ) ): # Found an existing key for this org (regardless of type) key_name = key_info.get('key_name') token = key_name[-4:] if key_name else None # last 4 digits of key if token and key_value.endswith( token ): # check if this is our current key found = True break return found @staticmethod async def _delete_key_by_alias( client: httpx.AsyncClient, key_alias: str, ): """Delete a key from LiteLLM by its alias. This is a best-effort operation that logs but does not raise on failure. """ if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return response = await client.post( f'{LITE_LLM_API_URL}/key/delete', json={ 'key_aliases': [key_alias], }, ) if response.is_success: logger.info( 'LiteLlmManager:_delete_key_by_alias:key_deleted', extra={'key_alias': key_alias}, ) elif response.status_code != 404: # Log non-404 errors but don't fail logger.warning( 'error_deleting_key_by_alias', extra={ 'key_alias': key_alias, 'status_code': response.status_code, 'text': response.text, }, ) @staticmethod async def _delete_key( client: httpx.AsyncClient, key_id: str, key_alias: str | None = None, ): if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None: logger.warning('LiteLLM API configuration not found') return response = await client.post( f'{LITE_LLM_API_URL}/key/delete', json={ 'keys': [key_id], }, ) # Failed to delete key... if not response.is_success: if response.status_code == 404: # Key doesn't exist by key_id. If we have a key_alias, # try deleting by alias to clean up any orphaned alias. if key_alias: await LiteLlmManager._delete_key_by_alias(client, key_alias) return logger.error( 'error_deleting_key', extra={ 'status_code': response.status_code, 'text': response.text, }, ) response.raise_for_status() logger.info( 'LiteLlmManager:_delete_key:key_deleted', ) @staticmethod def with_http_client( internal_fn: Callable[..., Awaitable[Any]], ) -> Callable[..., Awaitable[Any]]: @functools.wraps(internal_fn) async def wrapper(*args, **kwargs): async with httpx.AsyncClient( headers={'x-goog-api-key': LITE_LLM_API_KEY} ) as client: return await internal_fn(client, *args, **kwargs) return wrapper # Public methods with injected client create_team = staticmethod(with_http_client(_create_team)) get_team = staticmethod(with_http_client(_get_team)) update_team = staticmethod(with_http_client(_update_team)) create_user = staticmethod(with_http_client(_create_user)) get_user = staticmethod(with_http_client(_get_user)) update_user = staticmethod(with_http_client(_update_user)) delete_user = staticmethod(with_http_client(_delete_user)) delete_team = staticmethod(with_http_client(_delete_team)) add_user_to_team = staticmethod(with_http_client(_add_user_to_team)) remove_user_from_team = staticmethod(with_http_client(_remove_user_from_team)) get_user_team_info = staticmethod(with_http_client(_get_user_team_info)) update_user_in_team = staticmethod(with_http_client(_update_user_in_team)) generate_key = staticmethod(with_http_client(_generate_key)) get_key_info = staticmethod(with_http_client(_get_key_info)) verify_existing_key = staticmethod(with_http_client(_verify_existing_key)) delete_key = staticmethod(with_http_client(_delete_key)) get_user_keys = staticmethod(with_http_client(_get_user_keys)) delete_key_by_alias = staticmethod(with_http_client(_delete_key_by_alias)) update_user_keys = staticmethod(with_http_client(_update_user_keys))