""" Store class for managing users. """ import asyncio import uuid from typing import Optional from uuid import UUID from server.auth.token_manager import TokenManager from server.constants import ( LITE_LLM_API_URL, ORG_SETTINGS_VERSION, PERSONAL_WORKSPACE_VERSION_TO_MODEL, get_default_litellm_model, ) from server.logger import logger from sqlalchemy import select, text from sqlalchemy.orm import joinedload from storage.database import a_session_maker, session_maker from storage.encrypt_utils import ( decrypt_legacy_model, decrypt_legacy_value, encrypt_legacy_value, ) from storage.org import Org from storage.org_member import OrgMember from storage.role_store import RoleStore from storage.user import User from storage.user_settings import UserSettings from utils.identity import resolve_display_name from openhands.utils.async_utils import GENERAL_TIMEOUT, call_async_from_sync # The max possible time to wait for another process to finish creating a user before retrying _REDIS_CREATE_TIMEOUT_SECONDS = 30 # The delay to wait for another process to finish creating a user before trying to load again _RETRY_LOAD_DELAY_SECONDS = 2 # Redis key prefix for user creation locks _REDIS_USER_CREATION_KEY_PREFIX = 'create_user:' class UserStore: """Store for managing users.""" @staticmethod async def create_user( user_id: str, user_info: dict, role_id: Optional[int] = None, ) -> User | None: """Create a new user.""" with session_maker() as session: # create personal org org = Org( id=uuid.UUID(user_id), name=f'user_{user_id}_org', contact_name=resolve_display_name(user_info) or user_info.get('preferred_username', ''), contact_email=user_info['email'], v1_enabled=True, ) session.add(org) settings = await UserStore.create_default_settings( org_id=str(org.id), user_id=user_id ) if not settings: return None from storage.org_store import OrgStore org_kwargs = OrgStore.get_kwargs_from_settings(settings) for key, value in org_kwargs.items(): if hasattr(org, key): setattr(org, key, value) user_kwargs = UserStore.get_kwargs_from_settings(settings) user = User( id=uuid.UUID(user_id), current_org_id=org.id, role_id=role_id, **user_kwargs, ) user.email = user_info.get('email') user.email_verified = user_info.get('email_verified') session.add(user) role = RoleStore.get_role_by_name('owner') if role is None: raise ValueError('Owner role not found in database') from storage.org_member_store import OrgMemberStore org_member_kwargs = OrgMemberStore.get_kwargs_from_settings(settings) # avoid setting org member llm fields to use org defaults on user creation del org_member_kwargs['llm_model'] del org_member_kwargs['llm_base_url'] org_member = OrgMember( org_id=org.id, user_id=user.id, role_id=role.id, # owner of your own org. status='active', **org_member_kwargs, ) session.add(org_member) session.commit() session.refresh(user) user.org_members # load org_members return user @staticmethod def _get_redis_client(): """Get the Redis client from the Socket.IO manager.""" from openhands.server.shared import sio return getattr(sio.manager, 'redis', None) @staticmethod async def _acquire_user_creation_lock(user_id: str) -> bool: """Attempt to acquire a distributed lock for user creation. Returns True if the lock was acquired or if Redis is unavailable (fallback to no locking). Returns False if another process holds the lock. """ redis_client = UserStore._get_redis_client() if redis_client is None: logger.warning( 'user_store:_acquire_user_creation_lock:no_redis_client', extra={'user_id': user_id}, ) return True # Proceed without locking if Redis is unavailable user_key = f'{_REDIS_USER_CREATION_KEY_PREFIX}{user_id}' lock_acquired = await redis_client.set( user_key, 1, nx=True, ex=_REDIS_CREATE_TIMEOUT_SECONDS ) return bool(lock_acquired) @staticmethod async def _release_user_creation_lock(user_id: str) -> bool: """Release the distributed lock for user creation. Returns True if the lock was released or if Redis is unavailable. Returns False if the lock could not be released. """ redis_client = UserStore._get_redis_client() if redis_client is None: logger.warning( 'user_store:_release_user_creation_lock:no_redis_client', extra={'user_id': user_id}, ) return True # Nothing to release if Redis is unavailable user_key = f'{_REDIS_USER_CREATION_KEY_PREFIX}{user_id}' deleted = await redis_client.delete(user_key) return bool(deleted) @staticmethod async def migrate_user( user_id: str, user_settings: UserSettings, user_info: dict, ) -> User: if not user_id or not user_settings: return None kwargs = decrypt_legacy_model( [ 'llm_api_key', 'llm_api_key_for_byor', 'search_api_key', 'sandbox_api_key', ], user_settings, ) decrypted_user_settings = UserSettings(**kwargs) with session_maker() as session: # Check if user has completed billing sessions to enable BYOR export from storage.billing_session import BillingSession has_completed_billing = ( session.query(BillingSession) .filter( BillingSession.user_id == user_id, BillingSession.status == 'completed', ) .first() is not None ) # create personal org org = Org( id=uuid.UUID(user_id), name=f'user_{user_id}_org', org_version=user_settings.user_version, contact_name=resolve_display_name(user_info) or user_info.get('username', ''), contact_email=user_info['email'], byor_export_enabled=has_completed_billing, ) session.add(org) from storage.lite_llm_manager import LiteLlmManager logger.debug( 'user_store:migrate_user:calling_litellm_migrate_entries', extra={'user_id': user_id}, ) await LiteLlmManager.migrate_entries( str(org.id), user_id, decrypted_user_settings, ) logger.debug( 'user_store:migrate_user:done_litellm_migrate_entries', extra={'user_id': user_id}, ) custom_settings = UserStore._has_custom_settings( decrypted_user_settings, user_settings.user_version ) # avoids circular reference. This migrate method is temprorary until all users are migrated. from integrations.stripe_service import migrate_customer logger.debug( 'user_store:migrate_user:calling_stripe_migrate_customer', extra={'user_id': user_id}, ) await migrate_customer(user_id, org) logger.debug( 'user_store:migrate_user:done_stripe_migrate_customer', extra={'user_id': user_id}, ) from storage.org_store import OrgStore org_kwargs = OrgStore.get_kwargs_from_user_settings(decrypted_user_settings) org_kwargs.pop('id', None) # if user has custom settings, set org defaults to current version if custom_settings: org_kwargs['default_llm_model'] = get_default_litellm_model() org_kwargs['llm_base_url'] = LITE_LLM_API_URL org_kwargs['org_version'] = ORG_SETTINGS_VERSION for key, value in org_kwargs.items(): if hasattr(org, key): setattr(org, key, value) user_kwargs = UserStore.get_kwargs_from_user_settings( decrypted_user_settings ) user_kwargs.pop('id', None) user = User( id=uuid.UUID(user_id), current_org_id=org.id, role_id=None, **user_kwargs, ) session.add(user) logger.debug( 'user_store:migrate_user:calling_get_role_by_name', extra={'user_id': user_id}, ) role = await RoleStore.get_role_by_name_async('owner') logger.debug( 'user_store:migrate_user:done_get_role_by_name', extra={'user_id': user_id}, ) if role is None: raise ValueError('Owner role not found in database') from storage.org_member_store import OrgMemberStore org_member_kwargs = OrgMemberStore.get_kwargs_from_user_settings( decrypted_user_settings ) # if the user did not have custom settings in the old model, # then use the org defaults by not setting org_member fields if not custom_settings: del org_member_kwargs['llm_model'] del org_member_kwargs['llm_base_url'] org_member = OrgMember( org_id=org.id, user_id=user.id, role_id=role.id, # owner of your own org. status='active', **org_member_kwargs, ) session.add(org_member) # Mark the old user_settings as migrated instead of deleting user_settings.already_migrated = True session.merge(user_settings) session.flush() logger.debug( 'user_store:migrate_user:session_flush_complete', extra={'user_id': user_id}, ) # need to migrate conversation metadata session.execute( text(""" INSERT INTO conversation_metadata_saas (conversation_id, user_id, org_id) SELECT conversation_id, :user_id, :user_id FROM conversation_metadata WHERE user_id = :user_id """), {'user_id': user_id}, ) # Update org_id for tables that had org_id added user_uuid = uuid.UUID(user_id) # Update stripe_customers session.execute( text( 'UPDATE stripe_customers SET org_id = :org_id WHERE keycloak_user_id = :user_id' ), {'org_id': user_uuid, 'user_id': user_uuid}, ) # Update slack_users session.execute( text( 'UPDATE slack_users SET org_id = :org_id WHERE keycloak_user_id = :user_id' ), {'org_id': user_uuid, 'user_id': user_uuid}, ) # Update slack_conversation session.execute( text( 'UPDATE slack_conversation SET org_id = :org_id WHERE keycloak_user_id = :user_id' ), {'org_id': user_uuid, 'user_id': user_uuid}, ) # Update api_keys session.execute( text('UPDATE api_keys SET org_id = :org_id WHERE user_id = :user_id'), {'org_id': user_uuid, 'user_id': user_uuid}, ) # Update custom_secrets session.execute( text( 'UPDATE custom_secrets SET org_id = :org_id WHERE keycloak_user_id = :user_id' ), {'org_id': user_uuid, 'user_id': user_uuid}, ) # Update billing_sessions session.execute( text( 'UPDATE billing_sessions SET org_id = :org_id WHERE user_id = :user_id' ), {'org_id': user_uuid, 'user_id': user_uuid}, ) session.commit() session.refresh(user) user.org_members # load org_members logger.debug( 'user_store:migrate_user:session_committed', extra={'user_id': user_id}, ) return user @staticmethod async def downgrade_user(user_id: str) -> UserSettings | None: """ This method can be removed once orgs is established - probably after Feb 15 2026 Downgrade a migrated user back to the pre-migration state. This reverses the migrate_user operation: 1. Get the user's settings from user_settings table (migrated users) or create new user_settings from org_members table (new sign-ups) 2. Call LiteLlmManager.downgrade_entries to revert LiteLLM state 3. Copy user_id from conversation_metadata_saas to conversation_metadata 4. Delete conversation_metadata_saas entries 5. Reset org_id columns in related tables (stripe_customers, slack_users, etc.) 6. Delete the org_member and org entries 7. Delete the user entry 8. Set already_migrated=False on user_settings For new sign-ups (users who registered after migration was deployed), there won't be an existing user_settings entry. In this case, we fall back to the org_members table to get the user's API keys and settings, and create a new user_settings entry for them. Args: user_id: The Keycloak user ID to downgrade Returns: The user_settings if downgrade was successful, None otherwise. Returns None if the org has multiple members (not a personal org). """ logger.info( 'user_store:downgrade_user:start', extra={'user_id': user_id}, ) with session_maker() as session: # Get the user and their org_member user = ( session.query(User) .options(joinedload(User.org_members)) .filter(User.id == uuid.UUID(user_id)) .first() ) if not user: logger.warning( 'user_store:downgrade_user:user_not_found', extra={'user_id': user_id}, ) return None # Get the user's personal org (org_id == user_id) org = session.query(Org).filter(Org.id == uuid.UUID(user_id)).first() if not org: logger.warning( 'user_store:downgrade_user:org_not_found', extra={'user_id': user_id}, ) return None # Get org_members for this org - should only be one for personal orgs org_members = ( session.query(OrgMember).filter(OrgMember.org_id == org.id).all() ) if len(org_members) != 1: logger.error( 'user_store:downgrade_user:unexpected_org_members_count', extra={ 'user_id': user_id, 'org_id': str(org.id), 'org_members_count': len(org_members), }, ) return None org_member = org_members[0] # Get the user_settings (for migrated users) user_settings = ( session.query(UserSettings) .filter( UserSettings.keycloak_user_id == user_id, UserSettings.already_migrated.is_(True), ) .first() ) # For new sign-ups after migration, user_settings won't exist # Fall back to getting data from org_members if user_settings: if org_member.llm_api_key and org_member.llm_api_key.get_secret_value(): user_settings.llm_api_key = encrypt_legacy_value( org_member.llm_api_key.get_secret_value() ) if ( org_member.llm_api_key_for_byor and org_member.llm_api_key_for_byor.get_secret_value() ): user_settings.llm_api_key_for_byor = encrypt_legacy_value( org_member.llm_api_key_for_byor.get_secret_value() ) logger.info( 'user_store:downgrade_user:updated_user_settings_from_org_member', extra={'user_id': user_id}, ) else: # Create a new user_settings entry from OrgMember, User, and Org data # This is needed for new sign-ups who don't have user_settings user_settings = UserStore._create_user_settings_from_entities( user_id, org_member, user, org ) session.add(user_settings) logger.info( 'user_store:downgrade_user:created_user_settings_from_org_member', extra={'user_id': user_id}, ) session.flush() # Call LiteLLM downgrade from storage.lite_llm_manager import LiteLlmManager logger.debug( 'user_store:downgrade_user:calling_litellm_downgrade_entries', extra={'user_id': user_id}, ) encrypted_fields = [ 'llm_api_key', 'llm_api_key_for_byor', 'search_api_key', 'sandbox_api_key', ] for field in encrypted_fields: value = getattr(user_settings, field, None) if value: try: value = decrypt_legacy_value(value) setattr(user_settings, field, value) except Exception: pass await LiteLlmManager.downgrade_entries( str(org.id), user_id, user_settings, ) logger.debug( 'user_store:downgrade_user:done_litellm_downgrade_entries', extra={'user_id': user_id}, ) user_uuid = uuid.UUID(user_id) # Step 3: Copy user_id from conversation_metadata_saas to conversation_metadata # This ensures any conversations created after migration have their user_id # preserved in the original table before we delete the saas entries session.execute( text(""" UPDATE conversation_metadata SET user_id = :user_id WHERE conversation_id IN ( SELECT conversation_id FROM conversation_metadata_saas WHERE user_id = :user_uuid ) """), {'user_id': user_id, 'user_uuid': user_uuid}, ) # Step 4: Delete conversation_metadata_saas entries session.execute( text('DELETE FROM conversation_metadata_saas WHERE user_id = :user_id'), {'user_id': user_uuid}, ) # Step 5: Reset org_id columns in related tables # Reset stripe_customers session.execute( text( 'UPDATE stripe_customers SET org_id = NULL WHERE org_id = :org_id' ), {'org_id': user_uuid}, ) # Reset slack_users session.execute( text('UPDATE slack_users SET org_id = NULL WHERE org_id = :org_id'), {'org_id': user_uuid}, ) # Reset slack_conversation session.execute( text( 'UPDATE slack_conversation SET org_id = NULL WHERE org_id = :org_id' ), {'org_id': user_uuid}, ) # Reset api_keys session.execute( text('UPDATE api_keys SET org_id = NULL WHERE org_id = :org_id'), {'org_id': user_uuid}, ) # Reset custom_secrets session.execute( text('UPDATE custom_secrets SET org_id = NULL WHERE org_id = :org_id'), {'org_id': user_uuid}, ) # Reset billing_sessions session.execute( text( 'UPDATE billing_sessions SET org_id = NULL WHERE org_id = :org_id' ), {'org_id': user_uuid}, ) # Step 6: Delete org_member entries for this org session.execute( text('DELETE FROM org_member WHERE org_id = :org_id'), {'org_id': user_uuid}, ) # Step 7: Delete the user entry session.execute( text('DELETE FROM "user" WHERE id = :user_id'), {'user_id': user_uuid}, ) # Delete the org entry session.execute( text('DELETE FROM org WHERE id = :org_id'), {'org_id': user_uuid}, ) # Step 8: Set already_migrated=False on user_settings and encrypt fields user_settings.already_migrated = False # Re-encrypt the sensitive fields before storing in the DB encrypt_keys = [ 'llm_api_key', 'llm_api_key_for_byor', 'search_api_key', 'sandbox_api_key', ] for key in encrypt_keys: value = getattr(user_settings, key, None) if value is not None and not _is_legacy_value_encrypted(value): setattr(user_settings, key, encrypt_legacy_value(value)) session.merge(user_settings) session.commit() logger.info( 'user_store:downgrade_user:complete', extra={'user_id': user_id}, ) return user_settings @staticmethod def get_user_by_id(user_id: str) -> Optional[User]: """Get user by Keycloak user ID (sync version). Note: This method uses call_async_from_sync internally which creates a new event loop. If you're already in an async context, use get_user_by_id_async instead to avoid event loop conflicts. """ with session_maker() as session: user = ( session.query(User) .options(joinedload(User.org_members)) .filter(User.id == uuid.UUID(user_id)) .first() ) if user: return user # Check if we need to migrate from user_settings while not call_async_from_sync( UserStore._acquire_user_creation_lock, GENERAL_TIMEOUT, user_id ): # The user is already being created in another thread / process logger.info( 'user_store:create_default_settings:waiting_for_lock', extra={'user_id': user_id}, ) call_async_from_sync( asyncio.sleep, GENERAL_TIMEOUT, _RETRY_LOAD_DELAY_SECONDS ) try: # Check for user again as migration could have happened while trying to get the lock. user = ( session.query(User) .options(joinedload(User.org_members)) .filter(User.id == uuid.UUID(user_id)) .first() ) if user: return user user_settings = ( session.query(UserSettings) .filter( UserSettings.keycloak_user_id == user_id, UserSettings.already_migrated.is_(False), ) .first() ) if user_settings: token_manager = TokenManager() user_info = call_async_from_sync( token_manager.get_user_info_from_user_id, GENERAL_TIMEOUT, user_id, ) user = call_async_from_sync( UserStore.migrate_user, GENERAL_TIMEOUT, user_id, user_settings, user_info, ) return user else: return None finally: call_async_from_sync( UserStore._release_user_creation_lock, GENERAL_TIMEOUT, user_id ) @staticmethod async def get_user_by_id_async(user_id: str) -> Optional[User]: """Get user by Keycloak user ID (async version). This is the preferred method when calling from an async context as it avoids event loop conflicts that can occur with the sync version. """ async with a_session_maker() as session: result = await session.execute( select(User) .options(joinedload(User.org_members)) .filter(User.id == uuid.UUID(user_id)) ) user = result.scalars().first() if user: return user # Check if we need to migrate from user_settings while not await UserStore._acquire_user_creation_lock(user_id): # The user is already being created in another thread / process logger.info( 'user_store:get_user_by_id_async:waiting_for_lock', extra={'user_id': user_id}, ) await asyncio.sleep(_RETRY_LOAD_DELAY_SECONDS) try: # Check for user again as migration could have happened while trying to get the lock. result = await session.execute( select(User) .options(joinedload(User.org_members)) .filter(User.id == uuid.UUID(user_id)) ) user = result.scalars().first() if user: return user logger.info( 'user_store:get_user_by_id_async:start_migration', extra={'user_id': user_id}, ) result = await session.execute( select(UserSettings).filter( UserSettings.keycloak_user_id == user_id, UserSettings.already_migrated.is_(False), ) ) user_settings = result.scalars().first() if user_settings: token_manager = TokenManager() user_info = await token_manager.get_user_info_from_user_id(user_id) logger.info( 'user_store:get_user_by_id_async:calling_migrate_user', extra={'user_id': user_id}, ) user = await UserStore.migrate_user( user_id, user_settings, user_info, ) return user else: return None finally: await UserStore._release_user_creation_lock(user_id) @staticmethod async def get_user_by_email_async(email: str) -> Optional[User]: """Get user by email address (async version). This method looks up a user by their email address. Note that email addresses may not be unique across all users in rare cases. Args: email: The email address to search for Returns: User: The user with the matching email, or None if not found """ if not email: return None async with a_session_maker() as session: result = await session.execute( select(User) .options(joinedload(User.org_members)) .filter(User.email == email.lower().strip()) ) return result.scalars().first() @staticmethod def list_users() -> list[User]: """List all users.""" with session_maker() as session: return session.query(User).all() @staticmethod def update_current_org(user_id: str, org_id: UUID) -> Optional[User]: """Update the user's current organization. Args: user_id: The user's ID (Keycloak user ID) org_id: The organization ID to set as current Returns: User: The updated user object, or None if user not found """ with session_maker() as session: user = ( session.query(User) .filter(User.id == uuid.UUID(user_id)) .with_for_update() .first() ) if not user: return None user.current_org_id = org_id session.commit() session.refresh(user) return user @staticmethod async def backfill_contact_name(user_id: str, user_info: dict) -> None: """Update contact_name on the personal org if it still has a username-style value. Called during login to gradually fix existing users whose contact_name was stored as their username (before the resolve_display_name fix). Preserves custom values that were set via the PATCH endpoint. """ real_name = resolve_display_name(user_info) if not real_name: logger.debug( 'backfill_contact_name:no_real_name', extra={'user_id': user_id}, ) return preferred_username = user_info.get('preferred_username', '') username = user_info.get('username', '') async with a_session_maker() as session: result = await session.execute( select(Org).filter(Org.id == uuid.UUID(user_id)) ) org = result.scalars().first() if not org: logger.debug( 'backfill_contact_name:org_not_found', extra={'user_id': user_id}, ) return if org.contact_name in (preferred_username, username): logger.info( 'backfill_contact_name:updated', extra={ 'user_id': user_id, 'old': org.contact_name, 'new': real_name, }, ) org.contact_name = real_name await session.commit() @staticmethod async def update_user_email( user_id: str, email: str | None = None, email_verified: bool | None = None, ) -> None: """Unconditionally update User.email and/or email_verified. Unlike backfill_user_email(), this overwrites existing values. No-op when both arguments are None. Missing user is logged as a warning and ignored. """ if email is None and email_verified is None: return async with a_session_maker() as session: result = await session.execute( select(User).filter(User.id == uuid.UUID(user_id)) ) user = result.scalars().first() if not user: logger.warning( 'update_user_email:user_not_found', extra={'user_id': user_id}, ) return if email is not None: user.email = email if email_verified is not None: user.email_verified = email_verified logger.info( 'update_user_email:updated', extra={ 'user_id': user_id, 'email_set': email is not None, 'email_verified_set': email_verified is not None, }, ) await session.commit() @staticmethod async def backfill_user_email(user_id: str, user_info: dict) -> None: """Set User.email and email_verified from IDP if they are still NULL. Called during login to gradually fix existing users whose email was never persisted on the User record. Preserves non-NULL values (e.g. if a user manually changed their email). """ async with a_session_maker() as session: result = await session.execute( select(User).filter(User.id == uuid.UUID(user_id)) ) user = result.scalars().first() if not user: logger.debug( 'backfill_user_email:user_not_found', extra={'user_id': user_id}, ) return updated = False if user.email is None: user.email = user_info.get('email') updated = True if user.email_verified is None: user.email_verified = user_info.get('email_verified', False) updated = True if updated: logger.info( 'backfill_user_email:updated', extra={ 'user_id': user_id, 'email_set': user.email is not None, 'email_verified_set': user.email_verified is not None, }, ) await session.commit() # Prevent circular imports from typing import TYPE_CHECKING if TYPE_CHECKING: from openhands.storage.data_models.settings import Settings @staticmethod async def create_default_settings( org_id: str, user_id: str, create_user: bool = True ) -> Optional['Settings']: logger.info( 'UserStore:create_default_settings:start', extra={'org_id': org_id, 'user_id': user_id}, ) # You must log in before you get default settings if not org_id: return None from openhands.storage.data_models.settings import Settings settings = Settings(language='en', enable_proactive_conversation_starters=True) from storage.lite_llm_manager import LiteLlmManager settings = await LiteLlmManager.create_entries( org_id, user_id, settings, create_user ) if not settings: logger.info( 'UserStore:create_default_settings:litellm_create_failed', extra={'org_id': org_id}, ) return None return settings @staticmethod def get_kwargs_from_settings(settings: 'Settings'): kwargs = { normalized: getattr(settings, normalized) for c in User.__table__.columns if (normalized := c.name.lstrip('_')) and hasattr(settings, normalized) } return kwargs @staticmethod def get_kwargs_from_user_settings(user_settings: UserSettings): kwargs = { normalized: getattr(user_settings, normalized) for c in User.__table__.columns if (normalized := c.name.lstrip('_')) and hasattr(user_settings, normalized) } return kwargs @staticmethod def _create_user_settings_from_entities( user_id: str, org_member: OrgMember, user: User, org: Org ) -> UserSettings: """Create UserSettings from OrgMember, User, and Org data. Uses OrgMember values first. If an OrgMember field is None and there's a corresponding "default_" field in Org, use the Org value. Also pulls relevant fields from User. Args: user_id: The Keycloak user ID org_member: The OrgMember entity user: The User entity org: The Org entity Returns: A new UserSettings object populated from the entities """ # Mapping from OrgMember fields to corresponding Org "default_" fields org_member_to_org_default = { 'llm_model': 'default_llm_model', 'llm_base_url': 'default_llm_base_url', 'max_iterations': 'default_max_iterations', } def get_value_with_org_fallback(field_name: str, org_member_value): """Get value from OrgMember, falling back to Org default if None.""" if org_member_value is not None: return org_member_value org_default_field = org_member_to_org_default.get(field_name) if org_default_field and hasattr(org, org_default_field): return getattr(org, org_default_field) return None # Get values from OrgMember with Org fallback for fields with default_ prefix llm_model = get_value_with_org_fallback('llm_model', org_member.llm_model) llm_base_url = get_value_with_org_fallback( 'llm_base_url', org_member.llm_base_url ) max_iterations = get_value_with_org_fallback( 'max_iterations', org_member.max_iterations ) return UserSettings( keycloak_user_id=user_id, # OrgMember fields llm_api_key=org_member.llm_api_key.get_secret_value() if org_member.llm_api_key else None, llm_api_key_for_byor=org_member.llm_api_key_for_byor.get_secret_value() if org_member.llm_api_key_for_byor else None, llm_model=llm_model, llm_base_url=llm_base_url, max_iterations=max_iterations, # User fields accepted_tos=user.accepted_tos, enable_sound_notifications=user.enable_sound_notifications, language=user.language, user_consents_to_analytics=user.user_consents_to_analytics, email=user.email, email_verified=user.email_verified, git_user_name=user.git_user_name, git_user_email=user.git_user_email, # Org fields agent=org.agent, security_analyzer=org.security_analyzer, confirmation_mode=org.confirmation_mode, remote_runtime_resource_factor=org.remote_runtime_resource_factor, enable_default_condenser=org.enable_default_condenser, billing_margin=org.billing_margin, enable_proactive_conversation_starters=org.enable_proactive_conversation_starters, sandbox_base_container_image=org.sandbox_base_container_image, sandbox_runtime_container_image=org.sandbox_runtime_container_image, user_version=org.org_version, mcp_config=org.mcp_config, search_api_key=org.search_api_key.get_secret_value() if org.search_api_key else None, sandbox_api_key=org.sandbox_api_key.get_secret_value() if org.sandbox_api_key else None, max_budget_per_task=org.max_budget_per_task, enable_solvability_analysis=org.enable_solvability_analysis, v1_enabled=org.v1_enabled, condenser_max_size=org.condenser_max_size, already_migrated=False, ) @staticmethod def _has_custom_settings( user_settings: UserSettings, old_user_version: int | None ) -> bool: """ Check if user has custom LLM settings that should be preserved. Returns True if user customized either model or base_url. Args: settings: The user's current settings old_user_version: The user's old settings version, if any Returns: True if user has custom settings, False if using old defaults """ # Normalize values user_model = ( user_settings.llm_model.strip() or None if user_settings.llm_model else None ) user_base_url = ( user_settings.llm_base_url.strip() or None if user_settings.llm_base_url else None ) # Custom base_url = definitely custom settings (BYOK) if user_base_url and user_base_url != LITE_LLM_API_URL: return True # No model set = using defaults if not user_model: return False # Check if model matches old version's default if ( old_user_version and old_user_version <= ORG_SETTINGS_VERSION and old_user_version in PERSONAL_WORKSPACE_VERSION_TO_MODEL ): old_default_base = PERSONAL_WORKSPACE_VERSION_TO_MODEL[old_user_version] user_model_base = user_model.split('/')[-1] if user_model_base == old_default_base: return False # Matches old default return True # Custom model def _is_legacy_value_encrypted(value: str) -> bool: """Check if a legacy value is encrypted by trying to decrypt it""" try: decrypt_legacy_value(value) return True except Exception: return False