Add downgrade script and methods for reverting user migration (#12629)

Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: chuckbutkus <chuck@all-hands.dev>
This commit is contained in:
Tim O'Farrell
2026-01-27 13:41:34 -08:00
committed by GitHub
parent b6ce45b474
commit 102095affb
4 changed files with 820 additions and 0 deletions

View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python
"""
Downgrade script for migrated users.
This script identifies users who have been migrated (already_migrated=True)
and reverts them back to the pre-migration state.
Usage:
# Dry run - just list the users that would be downgraded
python downgrade_migrated_users.py --dry-run
# Downgrade a specific user by their keycloak_user_id
python downgrade_migrated_users.py --user-id <user_id>
# Downgrade all migrated users (with confirmation)
python downgrade_migrated_users.py --all
# Downgrade all migrated users without confirmation (dangerous!)
python downgrade_migrated_users.py --all --no-confirm
"""
import argparse
import asyncio
import sys
# Add the enterprise directory to the path
sys.path.insert(0, '/workspace/project/OpenHands/enterprise')
from server.logger import logger
from sqlalchemy import select, text
from storage.database import session_maker
from storage.user_settings import UserSettings
from storage.user_store import UserStore
def get_migrated_users() -> list[str]:
"""Get list of keycloak_user_ids for users who have been migrated.
This includes:
1. Users with already_migrated=True in user_settings (migrated users)
2. Users in the 'user' table who don't have a user_settings entry (new sign-ups)
"""
with session_maker() as session:
# Get users from user_settings with already_migrated=True
migrated_result = session.execute(
select(UserSettings.keycloak_user_id).where(
UserSettings.already_migrated.is_(True)
)
)
migrated_users = {row[0] for row in migrated_result.fetchall() if row[0]}
# Get users from the 'user' table (new sign-ups won't have user_settings)
# These are users who signed up after the migration was deployed
new_signup_result = session.execute(
text("""
SELECT CAST(u.id AS VARCHAR)
FROM "user" u
WHERE NOT EXISTS (
SELECT 1 FROM user_settings us
WHERE us.keycloak_user_id = CAST(u.id AS VARCHAR)
)
""")
)
new_signups = {row[0] for row in new_signup_result.fetchall() if row[0]}
# Combine both sets
all_users = migrated_users | new_signups
return list(all_users)
async def downgrade_user(user_id: str) -> bool:
"""Downgrade a single user.
Args:
user_id: The keycloak_user_id to downgrade
Returns:
True if successful, False otherwise
"""
try:
result = await UserStore.downgrade_user(user_id)
if result:
print(f'✓ Successfully downgraded user: {user_id}')
return True
else:
print(f'✗ Failed to downgrade user: {user_id}')
return False
except Exception as e:
print(f'✗ Error downgrading user {user_id}: {e}')
logger.exception(
'downgrade_script:error',
extra={'user_id': user_id, 'error': str(e)},
)
return False
async def main():
parser = argparse.ArgumentParser(
description='Downgrade migrated users back to pre-migration state'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Just list users that would be downgraded, without making changes',
)
parser.add_argument(
'--user-id',
type=str,
help='Downgrade a specific user by keycloak_user_id',
)
parser.add_argument(
'--all',
action='store_true',
help='Downgrade all migrated users',
)
parser.add_argument(
'--no-confirm',
action='store_true',
help='Skip confirmation prompt (use with caution!)',
)
args = parser.parse_args()
# Get list of migrated users
migrated_users = get_migrated_users()
print(f'\nFound {len(migrated_users)} migrated user(s).')
if args.dry_run:
print('\n--- DRY RUN MODE ---')
print('The following users would be downgraded:')
for user_id in migrated_users:
print(f' - {user_id}')
print('\nNo changes were made.')
return
if args.user_id:
# Downgrade a specific user
if args.user_id not in migrated_users:
print(f'\nUser {args.user_id} is not in the migrated users list.')
print('Either the user was not migrated, or the user_id is incorrect.')
return
print(f'\nDowngrading user: {args.user_id}')
if not args.no_confirm:
confirm = input('Are you sure? (yes/no): ')
if confirm.lower() != 'yes':
print('Cancelled.')
return
success = await downgrade_user(args.user_id)
if success:
print('\nDowngrade completed successfully.')
else:
print('\nDowngrade failed. Check logs for details.')
sys.exit(1)
elif args.all:
# Downgrade all migrated users
if not migrated_users:
print('\nNo migrated users to downgrade.')
return
print(f'\n⚠️ About to downgrade {len(migrated_users)} user(s).')
if not args.no_confirm:
print('\nThis will:')
print(' - Revert LiteLLM team/user budget settings')
print(' - Delete organization entries')
print(' - Delete user entries in the new schema')
print(' - Reset the already_migrated flag')
print('\nUsers to downgrade:')
for user_id in migrated_users[:10]: # Show first 10
print(f' - {user_id}')
if len(migrated_users) > 10:
print(f' ... and {len(migrated_users) - 10} more')
confirm = input('\nType "yes" to proceed: ')
if confirm.lower() != 'yes':
print('Cancelled.')
return
print('\nStarting downgrade...\n')
success_count = 0
fail_count = 0
for user_id in migrated_users:
success = await downgrade_user(user_id)
if success:
success_count += 1
else:
fail_count += 1
print('\n--- Summary ---')
print(f'Successful: {success_count}')
print(f'Failed: {fail_count}')
if fail_count > 0:
sys.exit(1)
else:
parser.print_help()
print('\nPlease specify --dry-run, --user-id, or --all')
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -195,6 +195,163 @@ class LiteLlmManager:
)
return user_settings
@staticmethod
async def downgrade_entries(
org_id: str,
keycloak_user_id: str,
user_settings: UserSettings,
) -> UserSettings | None:
"""Downgrade a migrated user's LiteLLM entries back to the pre-migration state.
This reverses the migrate_entries operation:
1. Get the user max budget from their org team in litellm
2. Set the max budget in the user in litellm (restore from team)
3. Add the user back to the default team in litellm
4. Update keys to remove org team association
5. Remove the user from their org team in litellm
6. Delete the user org team in litellm
Note: The database changes (already_migrated flag, org/org_member deletion)
should be handled separately by the caller.
Args:
org_id: The organization ID (which is also the team_id in litellm)
keycloak_user_id: The user's Keycloak ID
user_settings: The user's settings object
Returns:
The user_settings if downgrade was successful, None otherwise
"""
logger.info(
'LiteLlmManager:downgrade_entries:start',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
logger.warning('LiteLLM API configuration not found')
return None
local_deploy = os.environ.get('LOCAL_DEPLOYMENT', None)
if not local_deploy:
async with httpx.AsyncClient(
headers={
'x-goog-api-key': LITE_LLM_API_KEY,
}
) as client:
# Step 1: Get the team info to retrieve the budget
logger.debug(
'LiteLlmManager:downgrade_entries:get_team',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
team_info = await LiteLlmManager._get_team(client, org_id)
if not team_info:
logger.error(
'LiteLlmManager:downgrade_entries:team_not_found',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
return None
# Get team budget (max_budget) and spend to calculate current credits
team_data = team_info.get('team_info', {})
max_budget = team_data.get('max_budget', 0.0)
spend = team_data.get('spend', 0.0)
# Get user membership info for budget in team
user_membership = await LiteLlmManager._get_user_team_info(
client, keycloak_user_id, org_id
)
if user_membership:
# Use user's budget in team if available
user_max_budget_in_team = user_membership.get('max_budget_in_team')
user_spend_in_team = user_membership.get('spend', 0.0)
if user_max_budget_in_team is not None:
max_budget = user_max_budget_in_team
spend = user_spend_in_team
# Calculate total budget to restore (credits + spend = max_budget)
# We restore the full max_budget that was on the team/user-in-team
restored_budget = max_budget if max_budget else 0.0
logger.debug(
'LiteLlmManager:downgrade_entries:budget_info',
extra={
'org_id': org_id,
'user_id': keycloak_user_id,
'max_budget': max_budget,
'spend': spend,
'restored_budget': restored_budget,
},
)
# Step 2: Update user to set their max_budget back from unlimited
logger.debug(
'LiteLlmManager:downgrade_entries:update_user',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._update_user(
client, keycloak_user_id, max_budget=restored_budget, spend=spend
)
# Step 3: Add user back to the default team
if LITE_LLM_TEAM_ID:
logger.debug(
'LiteLlmManager:downgrade_entries:add_to_default_team',
extra={
'org_id': org_id,
'user_id': keycloak_user_id,
'default_team_id': LITE_LLM_TEAM_ID,
},
)
await LiteLlmManager._add_user_to_team(
client, keycloak_user_id, LITE_LLM_TEAM_ID, restored_budget
)
# Step 4: Update keys to remove org team association (set team_id to default)
if user_settings.llm_api_key:
logger.debug(
'LiteLlmManager:downgrade_entries:update_key',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._update_key(
client,
keycloak_user_id,
user_settings.llm_api_key,
team_id=LITE_LLM_TEAM_ID,
)
if user_settings.llm_api_key_for_byor:
logger.debug(
'LiteLlmManager:downgrade_entries:update_byor_key',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._update_key(
client,
keycloak_user_id,
user_settings.llm_api_key_for_byor,
team_id=LITE_LLM_TEAM_ID,
)
# Step 5: Remove user from their org team
logger.debug(
'LiteLlmManager:downgrade_entries:remove_from_org_team',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._remove_user_from_team(
client, keycloak_user_id, org_id
)
# Step 6: Delete the org team
logger.debug(
'LiteLlmManager:downgrade_entries:delete_team',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._delete_team(client, org_id)
logger.info(
'LiteLlmManager:downgrade_entries:complete',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
return user_settings
@staticmethod
async def update_team_and_users_budget(
team_id: str,
@@ -637,6 +794,45 @@ class LiteLlmManager:
)
response.raise_for_status()
@staticmethod
async def _remove_user_from_team(
client: httpx.AsyncClient,
keycloak_user_id: str,
team_id: str,
):
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
logger.warning('LiteLLM API configuration not found')
return
response = await client.post(
f'{LITE_LLM_API_URL}/team/member_delete',
json={
'team_id': team_id,
'user_id': keycloak_user_id,
},
)
if not response.is_success:
if response.status_code == 404:
# User not in team, that's fine for downgrade
logger.info(
'User not in team during removal',
extra={'user_id': keycloak_user_id, 'team_id': team_id},
)
return
logger.error(
'error_removing_litellm_user_from_team',
extra={
'status_code': response.status_code,
'text': response.text,
'user_id': keycloak_user_id,
'team_id': team_id,
},
)
response.raise_for_status()
logger.info(
'LiteLlmManager:_remove_user_from_team:user_removed',
extra={'user_id': keycloak_user_id, 'team_id': team_id},
)
@staticmethod
async def _generate_key(
client: httpx.AsyncClient,
@@ -880,6 +1076,7 @@ class LiteLlmManager:
delete_user = staticmethod(with_http_client(_delete_user))
delete_team = staticmethod(with_http_client(_delete_team))
add_user_to_team = staticmethod(with_http_client(_add_user_to_team))
remove_user_from_team = staticmethod(with_http_client(_remove_user_from_team))
get_user_team_info = staticmethod(with_http_client(_get_user_team_info))
update_user_in_team = staticmethod(with_http_client(_update_user_in_team))
generate_key = staticmethod(with_http_client(_generate_key))

View File

@@ -330,6 +330,253 @@ class UserStore:
)
return user
@staticmethod
async def downgrade_user(user_id: str) -> UserSettings | None:
"""Downgrade a migrated user back to the pre-migration state.
This reverses the migrate_user operation:
1. Get the user's settings from user_settings table (migrated users) or
create new user_settings from org_members table (new sign-ups)
2. Call LiteLlmManager.downgrade_entries to revert LiteLLM state
3. Copy user_id from conversation_metadata_saas to conversation_metadata
4. Delete conversation_metadata_saas entries
5. Reset org_id columns in related tables (stripe_customers, slack_users, etc.)
6. Delete the org_member and org entries
7. Delete the user entry
8. Set already_migrated=False on user_settings
For new sign-ups (users who registered after migration was deployed),
there won't be an existing user_settings entry. In this case, we fall back
to the org_members table to get the user's API keys and settings, and create
a new user_settings entry for them.
Args:
user_id: The Keycloak user ID to downgrade
Returns:
The user_settings if downgrade was successful, None otherwise.
Returns None if the org has multiple members (not a personal org).
"""
logger.info(
'user_store:downgrade_user:start',
extra={'user_id': user_id},
)
with session_maker() as session:
# Get the user and their org_member
user = (
session.query(User)
.options(joinedload(User.org_members))
.filter(User.id == uuid.UUID(user_id))
.first()
)
if not user:
logger.warning(
'user_store:downgrade_user:user_not_found',
extra={'user_id': user_id},
)
return None
# Get the user's personal org (org_id == user_id)
org = session.query(Org).filter(Org.id == uuid.UUID(user_id)).first()
if not org:
logger.warning(
'user_store:downgrade_user:org_not_found',
extra={'user_id': user_id},
)
return None
# Get the user_settings (for migrated users)
user_settings = (
session.query(UserSettings)
.filter(
UserSettings.keycloak_user_id == user_id,
UserSettings.already_migrated.is_(True),
)
.first()
)
# For new sign-ups after migration, user_settings won't exist
# Fall back to getting data from org_members
is_new_signup = False
if not user_settings:
logger.info(
'user_store:downgrade_user:user_settings_not_found_checking_org_members',
extra={'user_id': user_id},
)
# Get org_members for this org - should only be one for personal orgs
org_members = (
session.query(OrgMember).filter(OrgMember.org_id == org.id).all()
)
if len(org_members) != 1:
logger.error(
'user_store:downgrade_user:unexpected_org_members_count',
extra={
'user_id': user_id,
'org_id': str(org.id),
'org_members_count': len(org_members),
},
)
return None
org_member = org_members[0]
is_new_signup = True
# Create a new user_settings entry from org_member data
# This is needed for new sign-ups who don't have user_settings
user_settings = UserSettings(
keycloak_user_id=user_id,
llm_api_key=org_member.llm_api_key.get_secret_value()
if org_member.llm_api_key
else None,
llm_api_key_for_byor=org_member.llm_api_key_for_byor.get_secret_value()
if org_member.llm_api_key_for_byor
else None,
llm_model=org_member.llm_model,
llm_base_url=org_member.llm_base_url,
max_iterations=org_member.max_iterations,
already_migrated=False, # Will be set correctly below
)
session.add(user_settings)
session.flush()
logger.info(
'user_store:downgrade_user:created_user_settings_from_org_member',
extra={'user_id': user_id},
)
# Call LiteLLM downgrade
from storage.lite_llm_manager import LiteLlmManager
logger.debug(
'user_store:downgrade_user:calling_litellm_downgrade_entries',
extra={'user_id': user_id},
)
# Get the API keys for LiteLLM downgrade
if is_new_signup:
# For new signups, we already have decrypted values in user_settings
decrypted_user_settings = user_settings
else:
# For migrated users, decrypt the legacy model
kwargs = decrypt_legacy_model(
[
'llm_api_key',
'llm_api_key_for_byor',
'search_api_key',
'sandbox_api_key',
],
user_settings,
)
decrypted_user_settings = UserSettings(**kwargs)
await LiteLlmManager.downgrade_entries(
str(org.id),
user_id,
decrypted_user_settings,
)
logger.debug(
'user_store:downgrade_user:done_litellm_downgrade_entries',
extra={'user_id': user_id},
)
user_uuid = uuid.UUID(user_id)
# Step 3: Copy user_id from conversation_metadata_saas to conversation_metadata
# This ensures any conversations created after migration have their user_id
# preserved in the original table before we delete the saas entries
session.execute(
text("""
UPDATE conversation_metadata
SET user_id = :user_id
WHERE conversation_id IN (
SELECT conversation_id
FROM conversation_metadata_saas
WHERE user_id = :user_uuid
)
"""),
{'user_id': user_id, 'user_uuid': user_uuid},
)
# Step 4: Delete conversation_metadata_saas entries
session.execute(
text('DELETE FROM conversation_metadata_saas WHERE user_id = :user_id'),
{'user_id': user_uuid},
)
# Step 5: Reset org_id columns in related tables
# Reset stripe_customers
session.execute(
text(
'UPDATE stripe_customers SET org_id = NULL WHERE org_id = :org_id'
),
{'org_id': user_uuid},
)
# Reset slack_users
session.execute(
text('UPDATE slack_users SET org_id = NULL WHERE org_id = :org_id'),
{'org_id': user_uuid},
)
# Reset slack_conversation
session.execute(
text(
'UPDATE slack_conversation SET org_id = NULL WHERE org_id = :org_id'
),
{'org_id': user_uuid},
)
# Reset api_keys
session.execute(
text('UPDATE api_keys SET org_id = NULL WHERE org_id = :org_id'),
{'org_id': user_uuid},
)
# Reset custom_secrets
session.execute(
text('UPDATE custom_secrets SET org_id = NULL WHERE org_id = :org_id'),
{'org_id': user_uuid},
)
# Reset billing_sessions
session.execute(
text(
'UPDATE billing_sessions SET org_id = NULL WHERE org_id = :org_id'
),
{'org_id': user_uuid},
)
# Step 6: Delete org_member entries for this org
session.execute(
text('DELETE FROM org_member WHERE org_id = :org_id'),
{'org_id': user_uuid},
)
# Step 7: Delete the user entry
session.execute(
text('DELETE FROM "user" WHERE id = :user_id'),
{'user_id': user_uuid},
)
# Delete the org entry
session.execute(
text('DELETE FROM org WHERE id = :org_id'),
{'org_id': user_uuid},
)
# Step 8: Set already_migrated=False on user_settings
user_settings.already_migrated = False
session.merge(user_settings)
session.commit()
logger.info(
'user_store:downgrade_user:complete',
extra={'user_id': user_id},
)
return user_settings
@staticmethod
def get_user_by_id(user_id: str) -> Optional[User]:
"""Get user by Keycloak user ID (sync version).

View File

@@ -1126,3 +1126,174 @@ class TestLiteLlmManager:
'http://test.url/team/delete',
json={'team_ids': [team_id]},
)
@pytest.mark.asyncio
async def test_remove_user_from_team_successful(self):
"""
GIVEN: Valid user_id and team_id
WHEN: _remove_user_from_team is called
THEN: HTTP POST is made to remove user from team
"""
mock_response = AsyncMock()
mock_response.is_success = True
mock_response.status_code = 200
with (
patch('storage.lite_llm_manager.LITE_LLM_API_KEY', 'test-key'),
patch('storage.lite_llm_manager.LITE_LLM_API_URL', 'http://test.url'),
):
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
await LiteLlmManager._remove_user_from_team(
mock_client, 'test-user-id', 'test-team-id'
)
mock_client.post.assert_called_once_with(
'http://test.url/team/member_delete',
json={
'team_id': 'test-team-id',
'user_id': 'test-user-id',
},
)
@pytest.mark.asyncio
async def test_remove_user_from_team_not_found(self):
"""
GIVEN: User not in team
WHEN: _remove_user_from_team is called
THEN: 404 response is handled gracefully without raising
"""
mock_response = AsyncMock()
mock_response.is_success = False
mock_response.status_code = 404
mock_response.text = 'User not found in team'
mock_response.raise_for_status = MagicMock()
with (
patch('storage.lite_llm_manager.LITE_LLM_API_KEY', 'test-key'),
patch('storage.lite_llm_manager.LITE_LLM_API_URL', 'http://test.url'),
):
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
# Should not raise an exception
await LiteLlmManager._remove_user_from_team(
mock_client, 'test-user-id', 'test-team-id'
)
@pytest.mark.asyncio
async def test_downgrade_entries_missing_config(self, mock_user_settings):
"""Test downgrade_entries when LiteLLM config is missing."""
with patch('storage.lite_llm_manager.LITE_LLM_API_KEY', None):
with patch('storage.lite_llm_manager.LITE_LLM_API_URL', None):
result = await LiteLlmManager.downgrade_entries(
'test-org-id',
'test-user-id',
mock_user_settings,
)
assert result is None
@pytest.mark.asyncio
async def test_downgrade_entries_team_not_found(self, mock_user_settings):
"""Test downgrade_entries when team is not found."""
with patch.dict(os.environ, {'LOCAL_DEPLOYMENT': ''}):
with patch('storage.lite_llm_manager.LITE_LLM_API_KEY', 'test-key'):
with patch(
'storage.lite_llm_manager.LITE_LLM_API_URL', 'http://test.com'
):
with patch.object(
LiteLlmManager, '_get_team', new_callable=AsyncMock
) as mock_get_team:
mock_get_team.return_value = None
result = await LiteLlmManager.downgrade_entries(
'test-org-id',
'test-user-id',
mock_user_settings,
)
assert result is None
@pytest.mark.asyncio
async def test_downgrade_entries_successful(self, mock_user_settings):
"""Test successful downgrade_entries operation."""
mock_response = MagicMock()
mock_response.is_success = True
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
mock_team_info_response = MagicMock()
mock_team_info_response.is_success = True
mock_team_info_response.status_code = 200
mock_team_info_response.json.return_value = {
'team_info': {
'max_budget': 100.0,
'spend': 20.0,
},
'team_memberships': [
{
'user_id': 'test-user-id',
'team_id': 'test-org-id',
'max_budget_in_team': 100.0,
'spend': 20.0,
}
],
}
mock_team_info_response.raise_for_status = MagicMock()
with patch.dict(os.environ, {'LOCAL_DEPLOYMENT': ''}):
with patch('storage.lite_llm_manager.LITE_LLM_API_KEY', 'test-key'):
with patch(
'storage.lite_llm_manager.LITE_LLM_API_URL', 'http://test.com'
):
with patch(
'storage.lite_llm_manager.LITE_LLM_TEAM_ID', 'default-team'
):
with patch('httpx.AsyncClient') as mock_client_class:
mock_client = AsyncMock()
mock_client_class.return_value.__aenter__.return_value = (
mock_client
)
mock_client.get.return_value = mock_team_info_response
mock_client.post.return_value = mock_response
result = await LiteLlmManager.downgrade_entries(
'test-org-id',
'test-user-id',
mock_user_settings,
)
# downgrade_entries returns the user_settings
assert result is not None
assert result.agent == 'TestAgent'
# Verify downgrade steps were called:
# 1. get_team (GET)
# 2. get_user_team_info (GET via _get_team)
# 3. update_user (POST)
# 4. add_user_to_team (POST)
# 5. update_key (POST)
# 6. remove_user_from_team (POST)
# 7. delete_team (POST)
assert mock_client.get.call_count >= 1
assert mock_client.post.call_count >= 4
@pytest.mark.asyncio
async def test_downgrade_entries_local_deployment(self, mock_user_settings):
"""Test downgrade_entries in local deployment mode (skips LiteLLM calls)."""
with patch.dict(os.environ, {'LOCAL_DEPLOYMENT': 'true'}):
with patch('storage.lite_llm_manager.LITE_LLM_API_KEY', 'test-key'):
with patch(
'storage.lite_llm_manager.LITE_LLM_API_URL', 'http://test.com'
):
result = await LiteLlmManager.downgrade_entries(
'test-org-id',
'test-user-id',
mock_user_settings,
)
# In local deployment, should return user_settings without
# making any LiteLLM calls
assert result is not None
assert result.agent == 'TestAgent'