mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 13:47:19 +08:00
1561 lines
57 KiB
Python
1561 lines
57 KiB
Python
"""
|
|
Store class for managing organizational settings.
|
|
"""
|
|
|
|
import functools
|
|
import os
|
|
from typing import Any, Awaitable, Callable
|
|
|
|
import httpx
|
|
from pydantic import SecretStr
|
|
from server.auth.token_manager import TokenManager
|
|
from server.constants import (
|
|
LITE_LLM_API_KEY,
|
|
LITE_LLM_API_URL,
|
|
LITE_LLM_TEAM_ID,
|
|
ORG_SETTINGS_VERSION,
|
|
get_default_litellm_model,
|
|
)
|
|
from server.logger import logger
|
|
from storage.encrypt_utils import decrypt_legacy_value
|
|
from storage.user_settings import UserSettings
|
|
|
|
from openhands.server.settings import Settings
|
|
from openhands.utils.http_session import httpx_verify_option
|
|
|
|
# Timeout in seconds for key verification requests to LiteLLM
|
|
KEY_VERIFICATION_TIMEOUT = 5.0
|
|
|
|
# A very large number to represent "unlimited" until LiteLLM fixes their unlimited update bug.
|
|
UNLIMITED_BUDGET_SETTING = 1000000000.0
|
|
|
|
# Check if billing is enabled (defaults to false for enterprise deployments)
|
|
ENABLE_BILLING = os.environ.get('ENABLE_BILLING', 'false').lower() == 'true'
|
|
|
|
|
|
def _get_default_initial_budget() -> float | None:
|
|
"""Get the default initial budget for new teams.
|
|
|
|
When billing is disabled (ENABLE_BILLING=false), returns None to disable
|
|
budget enforcement in LiteLLM. When billing is enabled, returns the
|
|
DEFAULT_INITIAL_BUDGET environment variable value (default 0.0).
|
|
|
|
Returns:
|
|
float | None: The default budget, or None to disable budget enforcement.
|
|
"""
|
|
if not ENABLE_BILLING:
|
|
return None
|
|
|
|
try:
|
|
budget = float(os.environ.get('DEFAULT_INITIAL_BUDGET', 0.0))
|
|
if budget < 0:
|
|
raise ValueError(
|
|
f'DEFAULT_INITIAL_BUDGET must be non-negative, got {budget}'
|
|
)
|
|
return budget
|
|
except ValueError as e:
|
|
raise ValueError(
|
|
f'Invalid DEFAULT_INITIAL_BUDGET environment variable: {e}'
|
|
) from e
|
|
|
|
|
|
DEFAULT_INITIAL_BUDGET: float | None = _get_default_initial_budget()
|
|
|
|
|
|
def get_openhands_cloud_key_alias(keycloak_user_id: str, org_id: str) -> str:
|
|
"""Generate the key alias for OpenHands Cloud managed keys."""
|
|
return f'OpenHands Cloud - user {keycloak_user_id} - org {org_id}'
|
|
|
|
|
|
def get_byor_key_alias(keycloak_user_id: str, org_id: str) -> str:
|
|
"""Generate the key alias for BYOR (Bring Your Own Runtime) keys."""
|
|
return f'BYOR Key - user {keycloak_user_id}, org {org_id}'
|
|
|
|
|
|
class LiteLlmManager:
|
|
"""Manage LiteLLM interactions."""
|
|
|
|
@staticmethod
|
|
def get_budget_from_team_info(
|
|
user_team_info: dict | None, user_id: str, org_id: str
|
|
) -> tuple[float, float]:
|
|
"""Extract max_budget and spend from user team info.
|
|
|
|
For personal orgs (user_id == org_id), uses litellm_budget_table.max_budget.
|
|
For team orgs, uses max_budget_in_team (populated by get_user_team_info).
|
|
|
|
Args:
|
|
user_team_info: The response from get_user_team_info
|
|
user_id: The user's ID
|
|
org_id: The organization's ID
|
|
|
|
Returns:
|
|
Tuple of (max_budget, spend)
|
|
"""
|
|
if not user_team_info:
|
|
return 0, 0
|
|
spend = user_team_info.get('spend', 0)
|
|
if user_id == org_id:
|
|
max_budget = (user_team_info.get('litellm_budget_table') or {}).get(
|
|
'max_budget', 0
|
|
)
|
|
else:
|
|
max_budget = user_team_info.get('max_budget_in_team') or 0
|
|
return max_budget, spend
|
|
|
|
@staticmethod
|
|
async def create_entries(
|
|
org_id: str,
|
|
keycloak_user_id: str,
|
|
oss_settings: Settings,
|
|
create_user: bool,
|
|
) -> Settings | None:
|
|
logger.info(
|
|
'SettingsStore:update_settings_with_litellm_default:start',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None)
|
|
key = LITE_LLM_API_KEY
|
|
if not local_deploy:
|
|
# Get user info to add to litellm
|
|
token_manager = TokenManager()
|
|
keycloak_user_info = (
|
|
await token_manager.get_user_info_from_user_id(keycloak_user_id) or {}
|
|
)
|
|
|
|
async with httpx.AsyncClient(
|
|
headers={
|
|
'x-goog-api-key': LITE_LLM_API_KEY,
|
|
}
|
|
) as client:
|
|
# Check if team already exists and get its budget
|
|
# New users joining existing orgs should inherit the team's budget
|
|
# When billing is disabled, DEFAULT_INITIAL_BUDGET is None
|
|
team_budget: float | None = DEFAULT_INITIAL_BUDGET
|
|
try:
|
|
existing_team = await LiteLlmManager._get_team(client, org_id)
|
|
if existing_team:
|
|
team_info = existing_team.get('team_info', {})
|
|
# Preserve None from existing team (no budget enforcement)
|
|
existing_budget = team_info.get('max_budget')
|
|
team_budget = existing_budget
|
|
logger.info(
|
|
'LiteLlmManager:create_entries:existing_team_budget',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'team_budget': team_budget,
|
|
},
|
|
)
|
|
except httpx.HTTPStatusError as e:
|
|
# Team doesn't exist yet (404) - this is expected for first user
|
|
if e.response.status_code != 404:
|
|
raise
|
|
logger.info(
|
|
'LiteLlmManager:create_entries:no_existing_team',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
|
|
await LiteLlmManager._create_team(
|
|
client, keycloak_user_id, org_id, team_budget
|
|
)
|
|
|
|
if create_user:
|
|
user_created = await LiteLlmManager._create_user(
|
|
client, keycloak_user_info.get('email'), keycloak_user_id
|
|
)
|
|
if not user_created:
|
|
logger.error(
|
|
'create_entries_failed_user_creation',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
},
|
|
)
|
|
return None
|
|
|
|
# Verify user exists before proceeding with key generation
|
|
user_exists = await LiteLlmManager._user_exists(
|
|
client, keycloak_user_id
|
|
)
|
|
if not user_exists:
|
|
logger.error(
|
|
'create_entries_user_not_found_before_key_generation',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'create_user_flag': create_user,
|
|
},
|
|
)
|
|
return None
|
|
|
|
await LiteLlmManager._add_user_to_team(
|
|
client, keycloak_user_id, org_id, team_budget
|
|
)
|
|
|
|
# We delete the key if it already exists. In environments where multiple
|
|
# installations are using the same keycloak and litellm instance, this
|
|
# will mean other installations will have their key invalidated.
|
|
key_alias = get_openhands_cloud_key_alias(keycloak_user_id, org_id)
|
|
try:
|
|
await LiteLlmManager._delete_key_by_alias(client, key_alias)
|
|
except httpx.HTTPStatusError as ex:
|
|
if ex.status_code == 404:
|
|
logger.debug(f'Key "{key_alias}" did not exist - continuing')
|
|
else:
|
|
raise
|
|
|
|
key = await LiteLlmManager._generate_key(
|
|
client,
|
|
keycloak_user_id,
|
|
org_id,
|
|
key_alias,
|
|
None,
|
|
)
|
|
|
|
oss_settings.agent = 'CodeActAgent'
|
|
# Use the model corresponding to the current user settings version
|
|
oss_settings.llm_model = get_default_litellm_model()
|
|
oss_settings.llm_api_key = SecretStr(key)
|
|
oss_settings.llm_base_url = LITE_LLM_API_URL
|
|
return oss_settings
|
|
|
|
@staticmethod
|
|
async def migrate_entries(
|
|
org_id: str,
|
|
keycloak_user_id: str,
|
|
user_settings: UserSettings,
|
|
) -> UserSettings | None:
|
|
logger.info(
|
|
'LiteLlmManager:migrate_lite_llm_entries:start',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None)
|
|
if not local_deploy:
|
|
# Get user info to add to litellm
|
|
async with httpx.AsyncClient(
|
|
headers={
|
|
'x-goog-api-key': LITE_LLM_API_KEY,
|
|
}
|
|
) as client:
|
|
user_json = await LiteLlmManager._get_user(client, keycloak_user_id)
|
|
if not user_json:
|
|
return None
|
|
user_info = user_json['user_info']
|
|
|
|
# Log original user values before any modifications for debugging
|
|
original_max_budget = user_info.get('max_budget')
|
|
original_spend = user_info.get('spend')
|
|
logger.info(
|
|
'LiteLlmManager:migrate_lite_llm_entries:original_user_values',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'original_max_budget': original_max_budget,
|
|
'original_spend': original_spend,
|
|
},
|
|
)
|
|
|
|
max_budget = (
|
|
original_max_budget if original_max_budget is not None else 0.0
|
|
)
|
|
spend = original_spend if original_spend is not None else 0.0
|
|
# In upgrade to V4, we no longer use billing margin, but instead apply this directly
|
|
# in litellm. The default billing marign was 2 before this (hence the magic numbers below)
|
|
if (
|
|
user_settings
|
|
and user_settings.user_version < 4
|
|
and user_settings.billing_margin
|
|
and user_settings.billing_margin != 1.0
|
|
):
|
|
billing_margin = user_settings.billing_margin
|
|
logger.info(
|
|
'user_settings_v4_budget_upgrade',
|
|
extra={
|
|
'max_budget': max_budget,
|
|
'billing_margin': billing_margin,
|
|
'spend': spend,
|
|
},
|
|
)
|
|
max_budget *= billing_margin
|
|
spend *= billing_margin
|
|
|
|
# Check if max_budget is None (not 0.0) or set to unlimited to determine if already migrated
|
|
# A user with max_budget=0.0 is different from max_budget=None
|
|
if (
|
|
original_max_budget is None
|
|
or original_max_budget == UNLIMITED_BUDGET_SETTING
|
|
):
|
|
# if max_budget is None or UNLIMITED, then we've already migrated the User
|
|
logger.info(
|
|
'LiteLlmManager:migrate_lite_llm_entries:already_migrated',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'original_max_budget': original_max_budget,
|
|
},
|
|
)
|
|
return None
|
|
credits = max(max_budget - spend, 0.0)
|
|
|
|
# Log calculated migration values before performing updates
|
|
logger.info(
|
|
'LiteLlmManager:migrate_lite_llm_entries:calculated_values',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'adjusted_max_budget': max_budget,
|
|
'adjusted_spend': spend,
|
|
'calculated_credits': credits,
|
|
'new_user_max_budget': UNLIMITED_BUDGET_SETTING,
|
|
},
|
|
)
|
|
|
|
logger.debug(
|
|
'LiteLlmManager:migrate_lite_llm_entries:create_team',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._create_team(
|
|
client, keycloak_user_id, org_id, credits
|
|
)
|
|
|
|
logger.debug(
|
|
'LiteLlmManager:migrate_lite_llm_entries:update_user',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._update_user(
|
|
client, keycloak_user_id, max_budget=UNLIMITED_BUDGET_SETTING
|
|
)
|
|
|
|
logger.debug(
|
|
'LiteLlmManager:migrate_lite_llm_entries:add_user_to_team',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._add_user_to_team(
|
|
client, keycloak_user_id, org_id, credits
|
|
)
|
|
|
|
logger.debug(
|
|
'LiteLlmManager:migrate_lite_llm_entries:update_user_keys',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._update_user_keys(
|
|
client,
|
|
keycloak_user_id,
|
|
team_id=org_id,
|
|
)
|
|
|
|
# Check if the database key exists in LiteLLM
|
|
# If not, generate a new key to prevent verification failures later
|
|
db_key = None
|
|
if (
|
|
user_settings
|
|
and user_settings.llm_api_key
|
|
and user_settings.llm_base_url == LITE_LLM_API_URL
|
|
):
|
|
db_key = user_settings.llm_api_key
|
|
if hasattr(db_key, 'get_secret_value'):
|
|
db_key = db_key.get_secret_value()
|
|
|
|
if db_key:
|
|
# Verify the database key exists in LiteLLM
|
|
key_valid = await LiteLlmManager.verify_key(
|
|
db_key, keycloak_user_id
|
|
)
|
|
if not key_valid:
|
|
logger.warning(
|
|
'LiteLlmManager:migrate_lite_llm_entries:db_key_not_in_litellm',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'key_prefix': db_key[:10] + '...'
|
|
if len(db_key) > 10
|
|
else db_key,
|
|
},
|
|
)
|
|
# Generate a new key for the user
|
|
new_key = await LiteLlmManager._generate_key(
|
|
client,
|
|
keycloak_user_id,
|
|
org_id,
|
|
get_openhands_cloud_key_alias(keycloak_user_id, org_id),
|
|
None,
|
|
)
|
|
logger.info(
|
|
'LiteLlmManager:migrate_lite_llm_entries:generated_new_key',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
# Update user_settings with the new key so it gets stored in org_member
|
|
user_settings.llm_api_key = SecretStr(new_key)
|
|
user_settings.llm_api_key_for_byor = SecretStr(new_key)
|
|
|
|
logger.info(
|
|
'LiteLlmManager:migrate_lite_llm_entries:complete',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
return user_settings
|
|
|
|
@staticmethod
|
|
async def downgrade_entries(
|
|
org_id: str,
|
|
keycloak_user_id: str,
|
|
user_settings: UserSettings,
|
|
) -> UserSettings | None:
|
|
"""Downgrade a migrated user's LiteLLM entries back to the pre-migration state.
|
|
|
|
This reverses the migrate_entries operation:
|
|
1. Get the user max budget from their org team in litellm
|
|
2. Set the max budget in the user in litellm (restore from team)
|
|
3. Add the user back to the default team in litellm
|
|
4. Update keys to remove org team association
|
|
5. Remove the user from their org team in litellm
|
|
6. Delete the user org team in litellm
|
|
|
|
Note: The database changes (already_migrated flag, org/org_member deletion)
|
|
should be handled separately by the caller.
|
|
|
|
Args:
|
|
org_id: The organization ID (which is also the team_id in litellm)
|
|
keycloak_user_id: The user's Keycloak ID
|
|
user_settings: The user's settings object
|
|
|
|
Returns:
|
|
The user_settings if downgrade was successful, None otherwise
|
|
"""
|
|
logger.info(
|
|
'LiteLlmManager:downgrade_entries:start',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
|
|
local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None)
|
|
if not local_deploy:
|
|
async with httpx.AsyncClient(
|
|
headers={
|
|
'x-goog-api-key': LITE_LLM_API_KEY,
|
|
}
|
|
) as client:
|
|
# Step 1: Get the team info to retrieve the budget
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:get_team',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
team_info = await LiteLlmManager._get_team(client, org_id)
|
|
if not team_info:
|
|
logger.error(
|
|
'LiteLlmManager:downgrade_entries:team_not_found',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
return None
|
|
|
|
# Get team budget (max_budget) and spend to calculate current credits
|
|
team_data = team_info.get('team_info', {})
|
|
max_budget = team_data.get('max_budget', 0.0)
|
|
spend = team_data.get('spend', 0.0)
|
|
|
|
# Get user membership info for budget in team
|
|
user_membership = await LiteLlmManager._get_user_team_info(
|
|
client, keycloak_user_id, org_id
|
|
)
|
|
if user_membership:
|
|
# Use user's budget in team if available
|
|
user_max_budget_in_team = user_membership.get('max_budget_in_team')
|
|
user_spend_in_team = user_membership.get('spend', 0.0)
|
|
if user_max_budget_in_team is not None:
|
|
max_budget = user_max_budget_in_team
|
|
spend = user_spend_in_team
|
|
|
|
# Calculate total budget to restore (credits + spend = max_budget)
|
|
# We restore the full max_budget that was on the team/user-in-team
|
|
restored_budget = max_budget if max_budget else 0.0
|
|
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:budget_info',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'max_budget': max_budget,
|
|
'spend': spend,
|
|
'restored_budget': restored_budget,
|
|
},
|
|
)
|
|
|
|
# Step 2: Update user to set their max_budget back from unlimited
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:update_user',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._update_user(
|
|
client, keycloak_user_id, max_budget=restored_budget, spend=spend
|
|
)
|
|
|
|
# Step 3: Add user back to the default team
|
|
if LITE_LLM_TEAM_ID:
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:add_to_default_team',
|
|
extra={
|
|
'org_id': org_id,
|
|
'user_id': keycloak_user_id,
|
|
'default_team_id': LITE_LLM_TEAM_ID,
|
|
},
|
|
)
|
|
await LiteLlmManager._add_user_to_team(
|
|
client, keycloak_user_id, LITE_LLM_TEAM_ID, restored_budget
|
|
)
|
|
|
|
# Step 4: Update all user keys to remove org team association (set team_id to default)
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:update_user_keys',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._update_user_keys(
|
|
client,
|
|
keycloak_user_id,
|
|
team_id=LITE_LLM_TEAM_ID,
|
|
)
|
|
|
|
# Step 5: Remove user from their org team
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:remove_from_org_team',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._remove_user_from_team(
|
|
client, keycloak_user_id, org_id
|
|
)
|
|
|
|
# Step 6: Delete the org team
|
|
logger.debug(
|
|
'LiteLlmManager:downgrade_entries:delete_team',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
await LiteLlmManager._delete_team(client, org_id)
|
|
|
|
logger.info(
|
|
'LiteLlmManager:downgrade_entries:complete',
|
|
extra={'org_id': org_id, 'user_id': keycloak_user_id},
|
|
)
|
|
return user_settings
|
|
|
|
@staticmethod
|
|
async def update_team_and_users_budget(
|
|
team_id: str,
|
|
max_budget: float,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
async with httpx.AsyncClient(
|
|
headers={
|
|
'x-goog-api-key': LITE_LLM_API_KEY,
|
|
}
|
|
) as client:
|
|
await LiteLlmManager._update_team(client, team_id, None, max_budget)
|
|
team_info = await LiteLlmManager._get_team(client, team_id)
|
|
if not team_info:
|
|
return None
|
|
# TODO: change to use bulk update endpoint
|
|
for membership in team_info.get('team_memberships', []):
|
|
user_id = membership.get('user_id')
|
|
if not user_id:
|
|
continue
|
|
await LiteLlmManager._update_user_in_team(
|
|
client, user_id, team_id, max_budget
|
|
)
|
|
|
|
@staticmethod
|
|
async def _create_team(
|
|
client: httpx.AsyncClient,
|
|
team_alias: str,
|
|
team_id: str,
|
|
max_budget: float | None,
|
|
):
|
|
"""Create a new team in LiteLLM.
|
|
|
|
Args:
|
|
client: The HTTP client to use.
|
|
team_alias: The alias for the team.
|
|
team_id: The ID for the team.
|
|
max_budget: The maximum budget for the team. When None, budget
|
|
enforcement is disabled (unlimited usage).
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
|
|
json_data: dict[str, Any] = {
|
|
'team_id': team_id,
|
|
'team_alias': team_alias,
|
|
'models': [],
|
|
'spend': 0,
|
|
'metadata': {
|
|
'version': ORG_SETTINGS_VERSION,
|
|
'model': get_default_litellm_model(),
|
|
},
|
|
}
|
|
|
|
if max_budget is not None:
|
|
json_data['max_budget'] = max_budget
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/team/new',
|
|
json=json_data,
|
|
)
|
|
|
|
# Team failed to create in litellm - this is an unforseen error state...
|
|
if not response.is_success:
|
|
if (
|
|
response.status_code == 400
|
|
and 'already exists. Please use a different team id' in response.text
|
|
):
|
|
# team already exists, so update, then return
|
|
await LiteLlmManager._update_team(
|
|
client, team_id, team_alias, max_budget
|
|
)
|
|
return
|
|
logger.error(
|
|
'error_creating_litellm_team',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'team_id': team_id,
|
|
'max_budget': max_budget,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _get_team(client: httpx.AsyncClient, team_id: str) -> dict | None:
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
"""Get a team from litellm with the id matching that given."""
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/team/info?team_id={team_id}',
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
@staticmethod
|
|
async def _update_team(
|
|
client: httpx.AsyncClient,
|
|
team_id: str,
|
|
team_alias: str | None,
|
|
max_budget: float | None,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
json_data: dict[str, Any] = {
|
|
'team_id': team_id,
|
|
'metadata': {
|
|
'version': ORG_SETTINGS_VERSION,
|
|
'model': get_default_litellm_model(),
|
|
},
|
|
}
|
|
|
|
if max_budget is not None:
|
|
json_data['max_budget'] = max_budget
|
|
|
|
if team_alias is not None:
|
|
json_data['team_alias'] = team_alias
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/team/update',
|
|
json=json_data,
|
|
)
|
|
|
|
# Team failed to update in litellm - this is an unforseen error state...
|
|
if not response.is_success:
|
|
logger.error(
|
|
'error_updating_litellm_team',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'team_id': [team_id],
|
|
'max_budget': max_budget,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _user_exists(
|
|
client: httpx.AsyncClient,
|
|
user_id: str,
|
|
) -> bool:
|
|
"""Check if a user exists in LiteLLM.
|
|
|
|
Returns True if the user exists, False otherwise.
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
return False
|
|
try:
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/user/info?user_id={user_id}',
|
|
)
|
|
if response.is_success:
|
|
user_data = response.json()
|
|
# Check that user_info exists and has the user_id
|
|
user_info = user_data.get('user_info', {})
|
|
return user_info.get('user_id') == user_id
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(
|
|
'litellm_user_exists_check_failed',
|
|
extra={'user_id': user_id, 'error': str(e)},
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
async def _create_user(
|
|
client: httpx.AsyncClient,
|
|
email: str | None,
|
|
keycloak_user_id: str,
|
|
) -> bool:
|
|
"""Create a user in LiteLLM.
|
|
|
|
Returns True if the user was created or already exists and is verified,
|
|
False if creation failed and user does not exist.
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return False
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/user/new',
|
|
json={
|
|
'user_email': email,
|
|
'models': [],
|
|
'user_id': keycloak_user_id,
|
|
'teams': [LITE_LLM_TEAM_ID],
|
|
'auto_create_key': False,
|
|
'send_invite_email': False,
|
|
'metadata': {
|
|
'version': ORG_SETTINGS_VERSION,
|
|
'model': get_default_litellm_model(),
|
|
},
|
|
},
|
|
)
|
|
if not response.is_success:
|
|
logger.warning(
|
|
'duplicate_user_email',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'email': email,
|
|
},
|
|
)
|
|
# Litellm insists on unique email addresses - it is possible the email address was registered with a different user.
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/user/new',
|
|
json={
|
|
'user_email': None,
|
|
'models': [],
|
|
'user_id': keycloak_user_id,
|
|
'teams': [LITE_LLM_TEAM_ID],
|
|
'auto_create_key': False,
|
|
'send_invite_email': False,
|
|
'metadata': {
|
|
'version': ORG_SETTINGS_VERSION,
|
|
'model': get_default_litellm_model(),
|
|
},
|
|
},
|
|
)
|
|
|
|
# User failed to create in litellm - this is an unforseen error state...
|
|
if not response.is_success:
|
|
if (
|
|
response.status_code in (400, 409)
|
|
and 'already exists' in response.text
|
|
):
|
|
logger.warning(
|
|
'litellm_user_already_exists',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
},
|
|
)
|
|
# Verify the user actually exists before returning success
|
|
user_exists = await LiteLlmManager._user_exists(
|
|
client, keycloak_user_id
|
|
)
|
|
if not user_exists:
|
|
logger.error(
|
|
'litellm_user_claimed_exists_but_not_found',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
},
|
|
)
|
|
return False
|
|
return True
|
|
logger.error(
|
|
'error_creating_litellm_user',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': keycloak_user_id,
|
|
'email': None,
|
|
},
|
|
)
|
|
return False
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
@staticmethod
|
|
async def _get_user(client: httpx.AsyncClient, user_id: str) -> dict | None:
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
"""Get a user from litellm with the id matching that given."""
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/user/info?user_id={user_id}',
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
@staticmethod
|
|
async def _update_user(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
**kwargs,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
|
|
payload = {
|
|
'user_id': keycloak_user_id,
|
|
}
|
|
payload.update(kwargs)
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/user/update',
|
|
json=payload,
|
|
)
|
|
|
|
if not response.is_success:
|
|
logger.error(
|
|
'error_updating_litellm_user',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': keycloak_user_id,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _update_key(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
key: str,
|
|
**kwargs,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
|
|
try:
|
|
# Sometimes the key we get is encrypted - attempt to decrypt.
|
|
key = decrypt_legacy_value(key)
|
|
except Exception:
|
|
# The key was not encrypted
|
|
pass
|
|
|
|
payload = {
|
|
'key': key,
|
|
}
|
|
payload.update(kwargs)
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/key/update',
|
|
json=payload,
|
|
)
|
|
|
|
if not response.is_success:
|
|
if response.status_code == 401:
|
|
logger.warning(
|
|
'invalid_litellm_key_during_update',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'text': response.text,
|
|
},
|
|
)
|
|
return
|
|
logger.error(
|
|
'error_updating_litellm_key',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': keycloak_user_id,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _get_user_keys(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
) -> list[str]:
|
|
"""Get all keys for a user from LiteLLM.
|
|
|
|
Args:
|
|
client: The HTTP client to use for the request
|
|
keycloak_user_id: The user's Keycloak ID
|
|
|
|
Returns:
|
|
A list of key strings belonging to the user
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return []
|
|
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/key/list',
|
|
params={'user_id': keycloak_user_id},
|
|
)
|
|
|
|
if not response.is_success:
|
|
logger.error(
|
|
'error_getting_user_keys',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': keycloak_user_id,
|
|
},
|
|
)
|
|
return []
|
|
|
|
response_json = response.json()
|
|
keys = response_json.get('keys', [])
|
|
logger.debug(
|
|
'LiteLlmManager:_get_user_keys:keys_retrieved',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'key_count': len(keys),
|
|
},
|
|
)
|
|
return keys
|
|
|
|
@staticmethod
|
|
async def _update_user_keys(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
**kwargs,
|
|
):
|
|
"""Update all keys belonging to a user with the given parameters.
|
|
|
|
Args:
|
|
client: The HTTP client to use for the request
|
|
keycloak_user_id: The user's Keycloak ID
|
|
**kwargs: Parameters to update on each key (e.g., team_id)
|
|
"""
|
|
keys = await LiteLlmManager._get_user_keys(client, keycloak_user_id)
|
|
|
|
logger.debug(
|
|
'LiteLlmManager:_update_user_keys:updating_keys',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'key_count': len(keys),
|
|
},
|
|
)
|
|
|
|
for key in keys:
|
|
await LiteLlmManager._update_key(client, keycloak_user_id, key, **kwargs)
|
|
|
|
@staticmethod
|
|
async def _delete_user(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/user/delete', json={'user_ids': [keycloak_user_id]}
|
|
)
|
|
|
|
if not response.is_success:
|
|
logger.error(
|
|
'error_deleting_litellm_user',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': [keycloak_user_id],
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _delete_team(
|
|
client: httpx.AsyncClient,
|
|
team_id: str,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/team/delete',
|
|
json={'team_ids': [team_id]},
|
|
)
|
|
|
|
if not response.is_success:
|
|
if response.status_code == 404:
|
|
# Team doesn't exist, that's fine
|
|
logger.info(
|
|
'Team already deleted or does not exist',
|
|
extra={'team_id': team_id},
|
|
)
|
|
return
|
|
logger.error(
|
|
'error_deleting_litellm_team',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'team_id': team_id,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
logger.info(
|
|
'LiteLlmManager:_delete_team:team_deleted',
|
|
extra={'team_id': team_id},
|
|
)
|
|
|
|
@staticmethod
|
|
async def _add_user_to_team(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
team_id: str,
|
|
max_budget: float | None,
|
|
):
|
|
"""Add a user to a team in LiteLLM.
|
|
|
|
Args:
|
|
client: The HTTP client to use.
|
|
keycloak_user_id: The user's Keycloak ID.
|
|
team_id: The team ID.
|
|
max_budget: The maximum budget for the user in the team. When None,
|
|
budget enforcement is disabled (unlimited usage).
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
|
|
json_data: dict[str, Any] = {
|
|
'team_id': team_id,
|
|
'member': {'user_id': keycloak_user_id, 'role': 'user'},
|
|
}
|
|
|
|
if max_budget is not None:
|
|
json_data['max_budget_in_team'] = max_budget
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/team/member_add',
|
|
json=json_data,
|
|
)
|
|
|
|
# Failed to add user to team - this is an unforseen error state...
|
|
if not response.is_success:
|
|
if (
|
|
response.status_code == 400
|
|
and 'already in team' in response.text.lower()
|
|
):
|
|
logger.warning(
|
|
'user_already_in_team',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'team_id': team_id,
|
|
},
|
|
)
|
|
return
|
|
logger.error(
|
|
'error_adding_litellm_user_to_team',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': [keycloak_user_id],
|
|
'team_id': [team_id],
|
|
'max_budget': max_budget,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _get_user_team_info(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
team_id: str,
|
|
) -> dict | None:
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
team_response = await LiteLlmManager._get_team(client, team_id)
|
|
if not team_response:
|
|
return None
|
|
|
|
# Filter team_memberships based on team_id and keycloak_user_id
|
|
user_membership = next(
|
|
(
|
|
membership
|
|
for membership in team_response.get('team_memberships', [])
|
|
if membership.get('user_id') == keycloak_user_id
|
|
and membership.get('team_id') == team_id
|
|
),
|
|
None,
|
|
)
|
|
|
|
if not user_membership:
|
|
return None
|
|
|
|
# For team orgs (user_id != team_id), include team-level budget info
|
|
# The team's max_budget and spend are shared across all members
|
|
if keycloak_user_id != team_id:
|
|
team_info = team_response.get('team_info', {})
|
|
user_membership['max_budget_in_team'] = team_info.get('max_budget')
|
|
user_membership['spend'] = team_info.get('spend', 0)
|
|
|
|
return user_membership
|
|
|
|
@staticmethod
|
|
async def _update_user_in_team(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
team_id: str,
|
|
max_budget: float | None,
|
|
):
|
|
"""Update a user's budget in a team.
|
|
|
|
Args:
|
|
client: The HTTP client to use.
|
|
keycloak_user_id: The user's Keycloak ID.
|
|
team_id: The team ID.
|
|
max_budget: The maximum budget for the user in the team. When None,
|
|
budget enforcement is disabled (unlimited usage).
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
|
|
json_data: dict[str, Any] = {
|
|
'team_id': team_id,
|
|
'user_id': keycloak_user_id,
|
|
}
|
|
|
|
if max_budget is not None:
|
|
json_data['max_budget_in_team'] = max_budget
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/team/member_update',
|
|
json=json_data,
|
|
)
|
|
|
|
# Failed to update user in team - this is an unforseen error state...
|
|
if not response.is_success:
|
|
logger.error(
|
|
'error_updating_litellm_user_in_team',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': [keycloak_user_id],
|
|
'team_id': [team_id],
|
|
'max_budget': max_budget,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
|
|
@staticmethod
|
|
async def _remove_user_from_team(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
team_id: str,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/team/member_delete',
|
|
json={
|
|
'team_id': team_id,
|
|
'user_id': keycloak_user_id,
|
|
},
|
|
)
|
|
if not response.is_success:
|
|
if response.status_code == 404:
|
|
# User not in team, that's fine for downgrade
|
|
logger.info(
|
|
'User not in team during removal',
|
|
extra={'user_id': keycloak_user_id, 'team_id': team_id},
|
|
)
|
|
return
|
|
logger.error(
|
|
'error_removing_litellm_user_from_team',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': keycloak_user_id,
|
|
'team_id': team_id,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
logger.info(
|
|
'LiteLlmManager:_remove_user_from_team:user_removed',
|
|
extra={'user_id': keycloak_user_id, 'team_id': team_id},
|
|
)
|
|
|
|
@staticmethod
|
|
async def _generate_key(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
team_id: str | None,
|
|
key_alias: str | None,
|
|
metadata: dict | None,
|
|
) -> str:
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
raise ValueError('LiteLLM API configuration not found')
|
|
json_data: dict[str, Any] = {
|
|
'user_id': keycloak_user_id,
|
|
'models': [],
|
|
}
|
|
|
|
if team_id is not None:
|
|
json_data['team_id'] = team_id
|
|
|
|
if key_alias is not None:
|
|
json_data['key_alias'] = key_alias
|
|
|
|
if metadata is not None:
|
|
json_data['metadata'] = metadata
|
|
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/key/generate',
|
|
json=json_data,
|
|
)
|
|
# Failed to generate user key for team - this is an unforseen error state...
|
|
if not response.is_success:
|
|
logger.error(
|
|
'error_generate_user_team_key',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
'user_id': keycloak_user_id,
|
|
'team_id': team_id,
|
|
'key_alias': key_alias,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
response_json = response.json()
|
|
key = response_json['key']
|
|
logger.info(
|
|
'LiteLlmManager:_lite_llm_generate_user_team_key:key_created',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'team_id': team_id,
|
|
'key_alias': key_alias,
|
|
},
|
|
)
|
|
return key
|
|
|
|
@staticmethod
|
|
async def verify_key(key: str, user_id: str) -> bool:
|
|
"""Verify that a key is valid in LiteLLM by making a lightweight API call.
|
|
|
|
Args:
|
|
key: The key to verify
|
|
user_id: The user ID for logging purposes
|
|
|
|
Returns:
|
|
True if the key is verified as valid, False if verification fails or key is invalid.
|
|
Returns False on network errors/timeouts to ensure we don't return potentially invalid keys.
|
|
"""
|
|
if not (LITE_LLM_API_URL and key):
|
|
return False
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
verify=httpx_verify_option(),
|
|
timeout=KEY_VERIFICATION_TIMEOUT,
|
|
) as client:
|
|
# Make a lightweight request to verify the key
|
|
# Using /v1/models endpoint as it's lightweight and requires authentication
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/v1/models',
|
|
headers={
|
|
'Authorization': f'Bearer {key}',
|
|
},
|
|
)
|
|
|
|
# Only 200 status code indicates valid key
|
|
if response.status_code == 200:
|
|
logger.debug(
|
|
'Key verification successful',
|
|
extra={'user_id': user_id},
|
|
)
|
|
return True
|
|
|
|
# All other status codes (401, 403, 500, etc.) are treated as invalid
|
|
# This includes authentication errors and server errors
|
|
logger.warning(
|
|
'Key verification failed - treating as invalid',
|
|
extra={
|
|
'user_id': user_id,
|
|
'status_code': response.status_code,
|
|
'key_prefix': key[:10] + '...' if len(key) > 10 else key,
|
|
},
|
|
)
|
|
return False
|
|
|
|
except (httpx.TimeoutException, Exception) as e:
|
|
# Any exception (timeout, network error, etc.) means we can't verify
|
|
# Return False to trigger regeneration rather than returning potentially invalid key
|
|
logger.warning(
|
|
'Key verification error - treating as invalid to ensure key validity',
|
|
extra={
|
|
'user_id': user_id,
|
|
'error': str(e),
|
|
'error_type': type(e).__name__,
|
|
},
|
|
)
|
|
return False
|
|
|
|
@staticmethod
|
|
async def _get_key_info(
|
|
client: httpx.AsyncClient,
|
|
org_id: str,
|
|
keycloak_user_id: str,
|
|
) -> dict | None:
|
|
from storage.user_store import UserStore
|
|
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return None
|
|
user = await UserStore.get_user_by_id(keycloak_user_id)
|
|
if not user:
|
|
return {}
|
|
|
|
org_member = None
|
|
for om in user.org_members:
|
|
if om.org_id == org_id:
|
|
org_member = om
|
|
break
|
|
if not org_member or not org_member.llm_api_key:
|
|
return {}
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/key/info?key={org_member.llm_api_key}'
|
|
)
|
|
response.raise_for_status()
|
|
response_json = response.json()
|
|
key_info = response_json.get('info')
|
|
if not key_info:
|
|
return {}
|
|
return {
|
|
'key_max_budget': key_info.get('max_budget'),
|
|
'key_spend': key_info.get('spend'),
|
|
}
|
|
|
|
@staticmethod
|
|
async def _get_all_keys_for_user(
|
|
client: httpx.AsyncClient,
|
|
keycloak_user_id: str,
|
|
) -> list[dict]:
|
|
"""Get all keys for a user from LiteLLM.
|
|
|
|
Returns a list of key info dictionaries containing:
|
|
- token: the key value (hashed or partial)
|
|
- key_alias: the alias for the key
|
|
- key_name: the name of the key
|
|
- spend: the amount spent on this key
|
|
- max_budget: the max budget for this key
|
|
- team_id: the team the key belongs to
|
|
- metadata: any metadata associated with the key
|
|
|
|
Returns an empty list if no keys found or on error.
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return []
|
|
|
|
try:
|
|
response = await client.get(
|
|
f'{LITE_LLM_API_URL}/user/info?user_id={keycloak_user_id}',
|
|
headers={'x-goog-api-key': LITE_LLM_API_KEY},
|
|
)
|
|
response.raise_for_status()
|
|
user_json = response.json()
|
|
# The user/info endpoint returns keys in the 'keys' field
|
|
return user_json.get('keys', [])
|
|
except Exception as e:
|
|
logger.warning(
|
|
'LiteLlmManager:_get_all_keys_for_user:error',
|
|
extra={
|
|
'user_id': keycloak_user_id,
|
|
'error': str(e),
|
|
},
|
|
)
|
|
return []
|
|
|
|
@staticmethod
|
|
async def _verify_existing_key(
|
|
client: httpx.AsyncClient,
|
|
key_value: str,
|
|
keycloak_user_id: str,
|
|
org_id: str,
|
|
openhands_type: bool = False,
|
|
) -> bool:
|
|
"""Check if an existing key exists for the user/org in LiteLLM.
|
|
|
|
Verifies the provided key_value matches a key registered in LiteLLM for
|
|
the given user and organization. For openhands_type=True, looks for keys
|
|
with metadata type='openhands' and matching team_id. For openhands_type=False,
|
|
looks for keys with matching alias and team_id.
|
|
|
|
Returns True if the key is found and valid, False otherwise.
|
|
"""
|
|
found = False
|
|
keys = await LiteLlmManager._get_all_keys_for_user(client, keycloak_user_id)
|
|
for key_info in keys:
|
|
metadata = key_info.get('metadata') or {}
|
|
team_id = key_info.get('team_id')
|
|
key_alias = key_info.get('key_alias')
|
|
token = None
|
|
if (
|
|
openhands_type
|
|
and metadata.get('type') == 'openhands'
|
|
and team_id == org_id
|
|
):
|
|
# Found an existing OpenHands key for this org
|
|
key_name = key_info.get('key_name')
|
|
token = key_name[-4:] if key_name else None # last 4 digits of key
|
|
if token and key_value.endswith(
|
|
token
|
|
): # check if this is our current key
|
|
found = True
|
|
break
|
|
if (
|
|
not openhands_type
|
|
and team_id == org_id
|
|
and (
|
|
key_alias == get_openhands_cloud_key_alias(keycloak_user_id, org_id)
|
|
or key_alias == get_byor_key_alias(keycloak_user_id, org_id)
|
|
)
|
|
):
|
|
# Found an existing key for this org (regardless of type)
|
|
key_name = key_info.get('key_name')
|
|
token = key_name[-4:] if key_name else None # last 4 digits of key
|
|
if token and key_value.endswith(
|
|
token
|
|
): # check if this is our current key
|
|
found = True
|
|
break
|
|
|
|
return found
|
|
|
|
@staticmethod
|
|
async def _delete_key_by_alias(
|
|
client: httpx.AsyncClient,
|
|
key_alias: str,
|
|
):
|
|
"""Delete a key from LiteLLM by its alias.
|
|
|
|
This is a best-effort operation that logs but does not raise on failure.
|
|
"""
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/key/delete',
|
|
json={
|
|
'key_aliases': [key_alias],
|
|
},
|
|
)
|
|
if response.is_success:
|
|
logger.info(
|
|
'LiteLlmManager:_delete_key_by_alias:key_deleted',
|
|
extra={'key_alias': key_alias},
|
|
)
|
|
elif response.status_code != 404:
|
|
# Log non-404 errors but don't fail
|
|
logger.warning(
|
|
'error_deleting_key_by_alias',
|
|
extra={
|
|
'key_alias': key_alias,
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
},
|
|
)
|
|
|
|
@staticmethod
|
|
async def _delete_key(
|
|
client: httpx.AsyncClient,
|
|
key_id: str,
|
|
key_alias: str | None = None,
|
|
):
|
|
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
|
|
logger.warning('LiteLLM API configuration not found')
|
|
return
|
|
response = await client.post(
|
|
f'{LITE_LLM_API_URL}/key/delete',
|
|
json={
|
|
'keys': [key_id],
|
|
},
|
|
)
|
|
# Failed to delete key...
|
|
if not response.is_success:
|
|
if response.status_code == 404:
|
|
# Key doesn't exist by key_id. If we have a key_alias,
|
|
# try deleting by alias to clean up any orphaned alias.
|
|
if key_alias:
|
|
await LiteLlmManager._delete_key_by_alias(client, key_alias)
|
|
return
|
|
logger.error(
|
|
'error_deleting_key',
|
|
extra={
|
|
'status_code': response.status_code,
|
|
'text': response.text,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
logger.info(
|
|
'LiteLlmManager:_delete_key:key_deleted',
|
|
)
|
|
|
|
@staticmethod
|
|
def with_http_client(
|
|
internal_fn: Callable[..., Awaitable[Any]],
|
|
) -> Callable[..., Awaitable[Any]]:
|
|
@functools.wraps(internal_fn)
|
|
async def wrapper(*args, **kwargs):
|
|
async with httpx.AsyncClient(
|
|
headers={'x-goog-api-key': LITE_LLM_API_KEY}
|
|
) as client:
|
|
return await internal_fn(client, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
# Public methods with injected client
|
|
create_team = staticmethod(with_http_client(_create_team))
|
|
get_team = staticmethod(with_http_client(_get_team))
|
|
update_team = staticmethod(with_http_client(_update_team))
|
|
user_exists = staticmethod(with_http_client(_user_exists))
|
|
create_user = staticmethod(with_http_client(_create_user))
|
|
get_user = staticmethod(with_http_client(_get_user))
|
|
update_user = staticmethod(with_http_client(_update_user))
|
|
delete_user = staticmethod(with_http_client(_delete_user))
|
|
delete_team = staticmethod(with_http_client(_delete_team))
|
|
add_user_to_team = staticmethod(with_http_client(_add_user_to_team))
|
|
remove_user_from_team = staticmethod(with_http_client(_remove_user_from_team))
|
|
get_user_team_info = staticmethod(with_http_client(_get_user_team_info))
|
|
update_user_in_team = staticmethod(with_http_client(_update_user_in_team))
|
|
generate_key = staticmethod(with_http_client(_generate_key))
|
|
get_key_info = staticmethod(with_http_client(_get_key_info))
|
|
verify_existing_key = staticmethod(with_http_client(_verify_existing_key))
|
|
delete_key = staticmethod(with_http_client(_delete_key))
|
|
get_user_keys = staticmethod(with_http_client(_get_user_keys))
|
|
delete_key_by_alias = staticmethod(with_http_client(_delete_key_by_alias))
|
|
update_user_keys = staticmethod(with_http_client(_update_user_keys))
|