This commit is contained in:
Chuck Butkus 2025-10-28 14:43:26 -04:00
parent 4646439108
commit 9e7b74ea32
13 changed files with 5209 additions and 4880 deletions

View File

@ -13,8 +13,8 @@ from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '077'
down_revision: Union[str, None] = '076'
revision: str = '080'
down_revision: Union[str, None] = '079'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
@ -144,17 +144,19 @@ def upgrade() -> None:
'billing_sessions_org_fkey', 'billing_sessions', 'org', ['org_id'], ['id']
)
# conversation_metadata
op.add_column(
'conversation_metadata',
sa.Column('org_id', postgresql.UUID(as_uuid=True), nullable=True),
)
op.create_foreign_key(
'conversation_metadata_org_fkey',
'conversation_metadata',
'org',
['org_id'],
['id'],
# Create conversation_metadata_saas table
op.create_table(
'conversation_metadata_saas',
sa.Column('conversation_id', sa.String(), nullable=False),
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('org_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(
['user_id'], ['user.id'], name='conversation_metadata_saas_user_fkey'
),
sa.ForeignKeyConstraint(
['org_id'], ['org.id'], name='conversation_metadata_saas_org_fkey'
),
sa.PrimaryKeyConstraint('conversation_id'),
)
# custom_secrets
@ -235,10 +237,8 @@ def downgrade() -> None:
op.drop_constraint('custom_secrets_org_fkey', 'custom_secrets', type_='foreignkey')
op.drop_column('custom_secrets', 'org_id')
op.drop_constraint(
'conversation_metadata_org_fkey', 'conversation_metadata', type_='foreignkey'
)
op.drop_column('conversation_metadata', 'org_id')
# Drop conversation_metadata_saas table
op.drop_table('conversation_metadata_saas')
op.drop_constraint(
'billing_sessions_org_fkey', 'billing_sessions', type_='foreignkey'

View File

@ -1,81 +0,0 @@
"""separate saas fields from conversation metadata
Revision ID: 081
Revises: 080
Create Date: 2025-01-27 00:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '081'
down_revision: Union[str, None] = '080'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create conversation_metadata_saas table
op.create_table(
'conversation_metadata_saas',
sa.Column('conversation_id', sa.String(), nullable=False),
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('org_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], name='conversation_metadata_saas_user_fkey'),
sa.ForeignKeyConstraint(['org_id'], ['org.id'], name='conversation_metadata_saas_org_fkey'),
sa.PrimaryKeyConstraint('conversation_id'),
)
# Migrate existing data from conversation_metadata to conversation_metadata_saas
# First, we need to handle the case where user_id might be a string that needs to be converted to UUID
op.execute("""
INSERT INTO conversation_metadata_saas (conversation_id, user_id, org_id)
SELECT
conversation_id,
CASE
WHEN user_id ~ '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
THEN user_id::uuid
ELSE gen_random_uuid() -- Generate a new UUID for invalid user_id values
END as user_id,
COALESCE(org_id, gen_random_uuid()) as org_id -- Use existing org_id or generate new one
FROM conversation_metadata
WHERE user_id IS NOT NULL
""")
# Remove columns from conversation_metadata table
op.drop_constraint('conversation_metadata_org_fkey', 'conversation_metadata', type_='foreignkey')
op.drop_column('conversation_metadata', 'github_user_id')
op.drop_column('conversation_metadata', 'user_id')
op.drop_column('conversation_metadata', 'org_id')
def downgrade() -> None:
# Add columns back to conversation_metadata table
op.add_column('conversation_metadata', sa.Column('github_user_id', sa.String(), nullable=True))
op.add_column('conversation_metadata', sa.Column('user_id', sa.String(), nullable=False))
op.add_column('conversation_metadata', sa.Column('org_id', postgresql.UUID(as_uuid=True), nullable=True))
# Recreate foreign key constraint
op.create_foreign_key(
'conversation_metadata_org_fkey',
'conversation_metadata',
'org',
['org_id'],
['id'],
)
# Migrate data back from conversation_metadata_saas to conversation_metadata
op.execute("""
UPDATE conversation_metadata
SET user_id = cms.user_id::text, org_id = cms.org_id
FROM conversation_metadata_saas cms
WHERE conversation_metadata.conversation_id = cms.conversation_id
""")
# Drop conversation_metadata_saas table
op.drop_table('conversation_metadata_saas')

9793
enterprise/poetry.lock generated

File diff suppressed because one or more lines are too long

View File

@ -7,10 +7,9 @@ from uuid import uuid4
import socketio
from server.logger import logger
from server.utils.conversation_callback_utils import invoke_conversation_callbacks
from storage.conversation_metadata_saas import ConversationMetadataSaas
from storage.database import session_maker
from storage.saas_settings_store import SaasSettingsStore
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from openhands.core.config import LLMConfig
from openhands.core.config.openhands_config import OpenHandsConfig
@ -527,15 +526,17 @@ class ClusteredConversationManager(StandaloneConversationManager):
# Look up the user_id from the database
with session_maker() as session:
conversation_metadata_saas = (
session.query(ConversationMetadataSaas)
session.query(StoredConversationMetadataSaas)
.filter(
ConversationMetadataSaas.conversation_id
StoredConversationMetadataSaas.conversation_id
== conversation_id
)
.first()
)
user_id = (
str(conversation_metadata_saas.user_id) if conversation_metadata_saas else None
str(conversation_metadata_saas.user_id)
if conversation_metadata_saas
else None
)
# Handle the stopped conversation asynchronously
asyncio.create_task(

View File

@ -20,9 +20,8 @@ from server.utils.conversation_callback_utils import (
update_conversation_metadata,
update_conversation_stats,
)
from storage.conversation_metadata_saas import ConversationMetadataSaas
from storage.database import session_maker
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from openhands.server.shared import conversation_manager
@ -228,8 +227,8 @@ def _parse_conversation_id_and_subpath(path: str) -> Tuple[str, str]:
def _get_user_id(conversation_id: str) -> str:
with session_maker() as session:
conversation_metadata_saas = (
session.query(ConversationMetadataSaas)
.filter(ConversationMetadataSaas.conversation_id == conversation_id)
session.query(StoredConversationMetadataSaas)
.filter(StoredConversationMetadataSaas.conversation_id == conversation_id)
.first()
)
return str(conversation_metadata_saas.user_id)

View File

@ -3,10 +3,9 @@ from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy.future import select
from storage.conversation_metadata_saas import ConversationMetadataSaas
from storage.database import session_maker
from storage.feedback import ConversationFeedback
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from openhands.events.event_store import EventStore
from openhands.server.shared import file_store
@ -34,10 +33,10 @@ async def get_event_ids(conversation_id: str, user_id: str) -> List[int]:
def _verify_conversation():
with session_maker() as session:
metadata = (
session.query(ConversationMetadataSaas)
session.query(StoredConversationMetadataSaas)
.filter(
ConversationMetadataSaas.conversation_id == conversation_id,
ConversationMetadataSaas.user_id == user_id,
StoredConversationMetadataSaas.conversation_id == conversation_id,
StoredConversationMetadataSaas.user_id == user_id,
)
.first()
)

View File

@ -20,8 +20,8 @@ from server.utils.conversation_callback_utils import (
from sqlalchemy import orm
from storage.api_key_store import ApiKeyStore
from storage.database import session_maker
from storage.conversation_metadata_saas import ConversationMetadataSaas
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from openhands.controller.agent import Agent
from openhands.core.config import LLMConfig, OpenHandsConfig
@ -524,8 +524,10 @@ class SaasNestedConversationManager(ConversationManager):
with session_maker() as session:
conversation_metadata_saas = (
session.query(ConversationMetadataSaas)
.filter(ConversationMetadataSaas.conversation_id == conversation_id)
session.query(StoredConversationMetadataSaas)
.filter(
StoredConversationMetadataSaas.conversation_id == conversation_id
)
.first()
)
@ -854,12 +856,17 @@ class SaasNestedConversationManager(ConversationManager):
with session_maker() as session:
# Only include conversations updated in the past week
one_week_ago = datetime.now(UTC) - timedelta(days=7)
query = session.query(StoredConversationMetadata.conversation_id).join(
ConversationMetadataSaas,
StoredConversationMetadata.conversation_id == ConversationMetadataSaas.conversation_id
).filter(
ConversationMetadataSaas.user_id == user_id,
StoredConversationMetadata.last_updated_at >= one_week_ago,
query = (
session.query(StoredConversationMetadata.conversation_id)
.join(
StoredConversationMetadataSaas,
StoredConversationMetadata.conversation_id
== StoredConversationMetadataSaas.conversation_id,
)
.filter(
StoredConversationMetadataSaas.user_id == user_id,
StoredConversationMetadata.last_updated_at >= one_week_ago,
)
)
user_conversation_ids = set(query)
return user_conversation_ids
@ -933,11 +940,16 @@ class SaasNestedConversationManager(ConversationManager):
.filter(StoredConversationMetadata.conversation_id == conversation_id)
.first()
)
if conversation_metadata is None:
conversation_metadata_saas = (
session.query(StoredConversationMetadataSaas)
.filter(StoredConversationMetadataSaas.conversation_id == conversation_id)
.first()
)
if conversation_metadata is None or conversation_metadata_saas is None:
# Conversation is running in different server
return
user_id = conversation_metadata.user_id
user_id = conversation_metadata_saas.user_id
# Get the id of the next event which is not present
events_dir = get_conversation_events_dir(

View File

@ -3,7 +3,6 @@ from storage.auth_tokens import AuthTokens
from storage.billing_session import BillingSession
from storage.billing_session_type import BillingSessionType
from storage.conversation_callback import CallbackStatus, ConversationCallback
from storage.conversation_metadata_saas import ConversationMetadataSaas
from storage.conversation_work import ConversationWork
from storage.experiment_assignment import ExperimentAssignment
from storage.feedback import ConversationFeedback, Feedback
@ -28,9 +27,10 @@ from storage.slack_conversation import SlackConversation
from storage.slack_team import SlackTeam
from storage.slack_user import SlackUser
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from storage.stored_custom_secrets import StoredCustomSecrets
from storage.stored_offline_token import StoredOfflineToken
from storage.stored_repository import StoredRepository
from storage.stored_user_secrets import StoredUserSecrets
from storage.stripe_customer import StripeCustomer
from storage.subscription_access import SubscriptionAccess
from storage.subscription_access_status import SubscriptionAccessStatus
@ -46,7 +46,7 @@ __all__ = [
'CallbackStatus',
'ConversationCallback',
'ConversationFeedback',
'ConversationMetadataSaas',
'StoredConversationMetadataSaas',
'ConversationWork',
'ExperimentAssignment',
'Feedback',
@ -74,7 +74,7 @@ __all__ = [
'StoredConversationMetadata',
'StoredOfflineToken',
'StoredRepository',
'StoredUserSecrets',
'StoredCustomSecrets',
'StripeCustomer',
'SubscriptionAccess',
'SubscriptionAccessStatus',

View File

@ -48,11 +48,10 @@ class Org(Base): # type: ignore
org_members = relationship('OrgMember', back_populates='org')
current_users = relationship('User', back_populates='current_org')
billing_sessions = relationship('BillingSession', back_populates='org')
conversation_metadata = relationship(
'StoredConversationMetadata', back_populates='org'
stored_conversation_metadata_saas = relationship(
'StoredConversationMetadataSaas', back_populates='org'
)
conversation_metadata_saas = relationship('ConversationMetadataSaas', back_populates='org')
user_secrets = relationship('StoredUserSecrets', back_populates='org')
user_secrets = relationship('StoredCustomSecrets', back_populates='org')
api_keys = relationship('ApiKey', back_populates='org')
slack_conversations = relationship('SlackConversation', back_populates='org')
slack_users = relationship('SlackUser', back_populates='org')

View File

@ -6,12 +6,6 @@ from dataclasses import dataclass
from datetime import UTC
from uuid import UUID
from sqlalchemy.orm import sessionmaker
from storage.conversation_metadata_saas import ConversationMetadataSaas
from storage.database import session_maker
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.user_store import UserStore
from openhands.core.config.openhands_config import OpenHandsConfig
from openhands.integrations.provider import ProviderType
from openhands.storage.conversation.conversation_store import ConversationStore
@ -24,6 +18,12 @@ from openhands.storage.data_models.conversation_metadata_result_set import (
)
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.search_utils import offset_to_page_id, page_id_to_offset
from sqlalchemy.orm import sessionmaker
from storage.database import session_maker
from storage.stored_conversation_metadata import StoredConversationMetadata
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from storage.user_store import UserStore
logger = logging.getLogger(__name__)
@ -48,11 +48,12 @@ class SaasConversationStore(ConversationStore):
return (
session.query(StoredConversationMetadata)
.join(
ConversationMetadataSaas,
StoredConversationMetadata.conversation_id == ConversationMetadataSaas.conversation_id
StoredConversationMetadataSaas,
StoredConversationMetadata.conversation_id
== StoredConversationMetadataSaas.conversation_id,
)
.filter(ConversationMetadataSaas.user_id == self.user_id)
.filter(ConversationMetadataSaas.org_id == self.org_id)
.filter(StoredConversationMetadataSaas.user_id == self.user_id)
.filter(StoredConversationMetadataSaas.org_id == self.org_id)
.filter(StoredConversationMetadata.conversation_id == conversation_id)
)
@ -70,6 +71,8 @@ class SaasConversationStore(ConversationStore):
# Convert string to ProviderType enum
kwargs['git_provider'] = ProviderType(kwargs['git_provider'])
kwargs['user_id'] = self.user_id
# Remove V1 attributes
kwargs.pop('max_budget_per_task', None)
kwargs.pop('cache_read_tokens', None)
@ -82,7 +85,7 @@ class SaasConversationStore(ConversationStore):
async def save_metadata(self, metadata: ConversationMetadata):
kwargs = dataclasses.asdict(metadata)
# Remove user_id and org_id from kwargs since they're no longer in StoredConversationMetadata
kwargs.pop('user_id', None)
kwargs.pop('org_id', None)
@ -101,24 +104,29 @@ class SaasConversationStore(ConversationStore):
with self.session_maker() as session:
# Save the main conversation metadata
session.merge(stored_metadata)
# Create or update the SaaS metadata record
saas_metadata = session.query(ConversationMetadataSaas).filter(
ConversationMetadataSaas.conversation_id == stored_metadata.conversation_id
).first()
saas_metadata = (
session.query(StoredConversationMetadataSaas)
.filter(
StoredConversationMetadataSaas.conversation_id
== stored_metadata.conversation_id
)
.first()
)
if not saas_metadata:
saas_metadata = ConversationMetadataSaas(
saas_metadata = StoredConversationMetadataSaas(
conversation_id=stored_metadata.conversation_id,
user_id=self.user_id,
org_id=self.org_id
org_id=self.org_id,
)
session.add(saas_metadata)
else:
# Update existing record
saas_metadata.user_id = self.user_id
saas_metadata.org_id = self.org_id
session.commit()
await call_sync_from_async(_save_metadata)
@ -140,14 +148,14 @@ class SaasConversationStore(ConversationStore):
with self.session_maker() as session:
# Delete the main conversation metadata
self._select_by_id(session, conversation_id).delete()
# Delete the SaaS metadata record
session.query(ConversationMetadataSaas).filter(
ConversationMetadataSaas.conversation_id == conversation_id,
ConversationMetadataSaas.user_id == self.user_id,
ConversationMetadataSaas.org_id == self.org_id
session.query(StoredConversationMetadataSaas).filter(
StoredConversationMetadataSaas.conversation_id == conversation_id,
StoredConversationMetadataSaas.user_id == self.user_id,
StoredConversationMetadataSaas.org_id == self.org_id,
).delete()
session.commit()
await call_sync_from_async(_delete_metadata)
@ -172,11 +180,12 @@ class SaasConversationStore(ConversationStore):
conversations = (
session.query(StoredConversationMetadata)
.join(
ConversationMetadataSaas,
StoredConversationMetadata.conversation_id == ConversationMetadataSaas.conversation_id
StoredConversationMetadataSaas,
StoredConversationMetadata.conversation_id
== StoredConversationMetadataSaas.conversation_id,
)
.filter(ConversationMetadataSaas.user_id == self.user_id)
.filter(ConversationMetadataSaas.org_id == self.org_id)
.filter(StoredConversationMetadataSaas.user_id == self.user_id)
.filter(StoredConversationMetadataSaas.org_id == self.org_id)
.order_by(StoredConversationMetadata.created_at.desc())
.offset(offset)
.limit(limit + 1)

View File

@ -5,14 +5,14 @@ This model stores the SaaS-specific metadata for conversations,
containing only the conversation_id, user_id, and org_id.
"""
from uuid import UUID
from sqlalchemy import UUID as SQL_UUID, Column, ForeignKey, String
from sqlalchemy import UUID as SQL_UUID
from sqlalchemy import Column, ForeignKey, String
from sqlalchemy.orm import relationship
from storage.base import Base
class ConversationMetadataSaas(Base): # type: ignore
class StoredConversationMetadataSaas(Base): # type: ignore
"""SaaS conversation metadata model containing user and org associations."""
__tablename__ = 'conversation_metadata_saas'
@ -22,8 +22,8 @@ class ConversationMetadataSaas(Base): # type: ignore
org_id = Column(SQL_UUID(as_uuid=True), ForeignKey('org.id'), nullable=False)
# Relationships
user = relationship('User', back_populates='conversation_metadata_saas')
org = relationship('Org', back_populates='conversation_metadata_saas')
user = relationship('User', back_populates='stored_conversation_metadata_saas')
org = relationship('Org', back_populates='stored_conversation_metadata_saas')
__all__ = ['ConversationMetadataSaas']
__all__ = ['StoredConversationMetadataSaas']

View File

@ -36,4 +36,6 @@ class User(Base): # type: ignore
role = relationship('Role', back_populates='users')
org_members = relationship('OrgMember', back_populates='user')
current_org = relationship('Org', back_populates='current_users')
conversation_metadata_saas = relationship('ConversationMetadataSaas', back_populates='user')
stored_conversation_metadata_saas = relationship(
'StoredConversationMetadataSaas', back_populates='user'
)

View File

@ -5,6 +5,8 @@ Store class for managing users.
import uuid
from typing import Optional
from sqlalchemy import text
from integrations.stripe_service import migrate_customer
from server.logger import logger
from sqlalchemy.orm import joinedload
@ -151,6 +153,24 @@ class UserStore:
# Mark the old user_settings as migrated instead of deleting
user_settings.migration_status = True
# need to migrate conversation metadata
session.execute(
text("""
INSERT INTO conversation_metadata_saas (conversation_id, user_id, org_id)
SELECT
conversation_id,
CASE
WHEN user_id ~ '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
THEN user_id::uuid
ELSE gen_random_uuid()
END AS user_id,
COALESCE(org_id, gen_random_uuid()) AS org_id
FROM conversation_metadata
WHERE user_id IS NOT NULL
""")
)
session.commit()
session.refresh(user)
user.org_members # load org_members