mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 13:47:19 +08:00
fix(backend): ensure conversation events are written back to google cloud (#12571)
This commit is contained in:
@@ -26,6 +26,7 @@ from server.sharing.shared_conversation_models import (
|
||||
)
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
|
||||
|
||||
from openhands.app_server.app_conversation.sql_app_conversation_info_service import (
|
||||
StoredConversationMetadata,
|
||||
@@ -57,7 +58,7 @@ class SQLSharedConversationInfoService(SharedConversationInfoService):
|
||||
include_sub_conversations: bool = False,
|
||||
) -> SharedConversationPage:
|
||||
"""Search for shared conversations."""
|
||||
query = self._public_select()
|
||||
query = self._public_select_with_saas_metadata()
|
||||
|
||||
# Conditionally exclude sub-conversations based on the parameter
|
||||
if not include_sub_conversations:
|
||||
@@ -104,14 +105,17 @@ class SQLSharedConversationInfoService(SharedConversationInfoService):
|
||||
query = query.limit(limit + 1)
|
||||
|
||||
result = await self.db_session.execute(query)
|
||||
rows = result.scalars().all()
|
||||
rows = result.all()
|
||||
|
||||
# Check if there are more results
|
||||
has_more = len(rows) > limit
|
||||
if has_more:
|
||||
rows = rows[:limit]
|
||||
|
||||
items = [self._to_shared_conversation(row) for row in rows]
|
||||
items = [
|
||||
self._to_shared_conversation(stored, saas_metadata=saas_metadata)
|
||||
for stored, saas_metadata in rows
|
||||
]
|
||||
|
||||
# Calculate next page ID
|
||||
next_page_id = None
|
||||
@@ -152,17 +156,18 @@ class SQLSharedConversationInfoService(SharedConversationInfoService):
|
||||
self, conversation_id: UUID
|
||||
) -> SharedConversation | None:
|
||||
"""Get a single public conversation info, returning None if missing or not shared."""
|
||||
query = self._public_select().where(
|
||||
query = self._public_select_with_saas_metadata().where(
|
||||
StoredConversationMetadata.conversation_id == str(conversation_id)
|
||||
)
|
||||
|
||||
result = await self.db_session.execute(query)
|
||||
stored = result.scalar_one_or_none()
|
||||
row = result.first()
|
||||
|
||||
if stored is None:
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return self._to_shared_conversation(stored)
|
||||
stored, saas_metadata = row
|
||||
return self._to_shared_conversation(stored, saas_metadata=saas_metadata)
|
||||
|
||||
def _public_select(self):
|
||||
"""Create a select query that only returns public conversations."""
|
||||
@@ -173,6 +178,25 @@ class SQLSharedConversationInfoService(SharedConversationInfoService):
|
||||
query = query.where(StoredConversationMetadata.public == True) # noqa: E712
|
||||
return query
|
||||
|
||||
def _public_select_with_saas_metadata(self):
|
||||
"""Create a select query that returns public conversations with SAAS metadata.
|
||||
|
||||
This joins with conversation_metadata_saas to retrieve the user_id needed
|
||||
for constructing the correct event storage path. Uses LEFT OUTER JOIN to
|
||||
support conversations that may not have SAAS metadata (e.g., in tests).
|
||||
"""
|
||||
query = (
|
||||
select(StoredConversationMetadata, StoredConversationMetadataSaas)
|
||||
.outerjoin(
|
||||
StoredConversationMetadataSaas,
|
||||
StoredConversationMetadata.conversation_id
|
||||
== StoredConversationMetadataSaas.conversation_id,
|
||||
)
|
||||
.where(StoredConversationMetadata.conversation_version == 'V1')
|
||||
.where(StoredConversationMetadata.public == True) # noqa: E712
|
||||
)
|
||||
return query
|
||||
|
||||
def _apply_filters(
|
||||
self,
|
||||
query,
|
||||
@@ -211,9 +235,16 @@ class SQLSharedConversationInfoService(SharedConversationInfoService):
|
||||
def _to_shared_conversation(
|
||||
self,
|
||||
stored: StoredConversationMetadata,
|
||||
saas_metadata: StoredConversationMetadataSaas | None = None,
|
||||
sub_conversation_ids: list[UUID] | None = None,
|
||||
) -> SharedConversation:
|
||||
"""Convert StoredConversationMetadata to SharedConversation."""
|
||||
"""Convert StoredConversationMetadata to SharedConversation.
|
||||
|
||||
Args:
|
||||
stored: The base conversation metadata from conversation_metadata table.
|
||||
saas_metadata: Optional SAAS metadata containing user_id and org_id.
|
||||
sub_conversation_ids: Optional list of sub-conversation IDs.
|
||||
"""
|
||||
# V1 conversations should always have a sandbox_id
|
||||
sandbox_id = stored.sandbox_id
|
||||
assert sandbox_id is not None
|
||||
@@ -239,9 +270,16 @@ class SQLSharedConversationInfoService(SharedConversationInfoService):
|
||||
created_at = self._fix_timezone(stored.created_at)
|
||||
updated_at = self._fix_timezone(stored.last_updated_at)
|
||||
|
||||
# Get user_id from SAAS metadata if available
|
||||
created_by_user_id = (
|
||||
str(saas_metadata.user_id)
|
||||
if saas_metadata and saas_metadata.user_id
|
||||
else None
|
||||
)
|
||||
|
||||
return SharedConversation(
|
||||
id=UUID(stored.conversation_id),
|
||||
created_by_user_id=None, # user_id is no longer stored in conversation metadata
|
||||
created_by_user_id=created_by_user_id,
|
||||
sandbox_id=stored.sandbox_id,
|
||||
selected_repository=stored.selected_repository,
|
||||
selected_branch=stored.selected_branch,
|
||||
|
||||
Reference in New Issue
Block a user