Add v1 column to SlackConversation and conditionally add SlackCallbackProcessor

- Add migration 084 to add nullable boolean 'v1' column to slack_conversation table
- Update SlackConversation model to include v1 column
- Modify save_slack_convo method to record v1 conversation status
- Add get_slack_conversation_by_id method to SlackConversationStore
- Update slack manager to only add SlackCallbackProcessor for non-v1 conversations

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
openhands 2025-12-02 15:33:26 +00:00
parent 5e0d103f07
commit ae0d75975d
5 changed files with 80 additions and 28 deletions

View File

@ -244,13 +244,11 @@ class SlackManager(Manager):
async def is_job_requested(
self, message: Message, slack_view: SlackViewInterface
) -> bool:
"""
A job is always request we only receive webhooks for events associated with the slack bot
"""A job is always request we only receive webhooks for events associated with the slack bot
This method really just checks
1. Is the user is authenticated
2. Do we have the necessary information to start a job (either by inferring the selected repo, otherwise asking the user)
"""
# Infer repo from user message is not needed; user selected repo from the form or is updating existing convo
if isinstance(slack_view, SlackUpdateExistingConversationView):
return True
@ -321,20 +319,32 @@ class SlackManager(Manager):
# We don't re-subscribe for follow up messages from slack.
# Summaries are generated for every messages anyways, we only need to do
# this subscription once for the event which kicked off the job.
processor = SlackCallbackProcessor(
slack_user_id=slack_view.slack_user_id,
channel_id=slack_view.channel_id,
message_ts=slack_view.message_ts,
thread_ts=slack_view.thread_ts,
team_id=slack_view.team_id,
)
# Check if this is a v1 conversation - only add SlackCallbackProcessor for non-v1 conversations
from storage.slack_conversation_store import SlackConversationStore
slack_conversation_store = SlackConversationStore.get_instance()
slack_conversation = await slack_conversation_store.get_slack_conversation_by_id(conversation_id)
# Only add SlackCallbackProcessor if the conversation is not v1
if slack_conversation and not slack_conversation.v1:
processor = SlackCallbackProcessor(
slack_user_id=slack_view.slack_user_id,
channel_id=slack_view.channel_id,
message_ts=slack_view.message_ts,
thread_ts=slack_view.thread_ts,
team_id=slack_view.team_id,
)
# Register the callback processor
register_callback_processor(conversation_id, processor)
# Register the callback processor
register_callback_processor(conversation_id, processor)
logger.info(
f'[Slack] Created callback processor for conversation {conversation_id}'
)
logger.info(
f'[Slack] Created callback processor for conversation {conversation_id}'
)
else:
logger.info(
f'[Slack] Skipping callback processor for v1 conversation {conversation_id}'
)
msg_info = slack_view.get_response_msg()

View File

@ -113,8 +113,7 @@ class SlackNewConversationView(SlackViewInterface):
return ''
def _get_instructions(self, jinja_env: Environment) -> tuple[str, str]:
"Instructions passed when conversation is first initialized"
"""Instructions passed when conversation is first initialized"""
user_info: SlackUser = self.slack_to_openhands_user
messages = []
@ -174,7 +173,7 @@ class SlackNewConversationView(SlackViewInterface):
'Attempting to start conversation without confirming selected repo from user'
)
async def save_slack_convo(self):
async def save_slack_convo(self, is_v1: bool = False):
if self.slack_to_openhands_user:
user_info: SlackUser = self.slack_to_openhands_user
@ -185,6 +184,7 @@ class SlackNewConversationView(SlackViewInterface):
'conversation_id': self.conversation_id,
'keycloak_user_id': user_info.keycloak_user_id,
'parent_id': self.thread_ts or self.message_ts,
'v1': is_v1,
},
)
slack_conversation = SlackConversation(
@ -193,6 +193,7 @@ class SlackNewConversationView(SlackViewInterface):
keycloak_user_id=user_info.keycloak_user_id,
parent_id=self.thread_ts
or self.message_ts, # conversations can start in a thread reply as well; we should always references the parent's (root level msg's) message ID
v1=is_v1,
)
await slack_conversation_store.create_slack_conversation(slack_conversation)
@ -211,8 +212,7 @@ class SlackNewConversationView(SlackViewInterface):
)
async def create_or_update_conversation(self, jinja: Environment) -> str:
"""
Only creates a new conversation
"""Only creates a new conversation
"""
self._verify_necessary_values_are_set()
@ -283,13 +283,12 @@ class SlackNewConversationView(SlackViewInterface):
self.conversation_id = agent_loop_info.conversation_id
logger.info(f'[Slack]: Created V0 conversation: {self.conversation_id}')
await self.save_slack_convo()
await self.save_slack_convo(is_v1=False)
async def _create_v1_conversation(
self, jinja: Environment, provider_tokens, user_secrets
) -> None:
"""Create conversation using the new V1 app conversation system."""
await self.initialize_new_conversation()
user_instructions, conversation_instructions = self._get_instructions(jinja)
@ -346,7 +345,7 @@ class SlackNewConversationView(SlackViewInterface):
)
logger.info(f'[Slack V1]: Created new conversation: {self.conversation_id}')
await self.save_slack_convo()
await self.save_slack_convo(is_v1=True)
def get_callback_id(self) -> str:
return f'slack_{self.channel_id}_{self.message_ts}'
@ -387,8 +386,7 @@ class SlackUpdateExistingConversationView(SlackNewConversationView):
return user_message, ''
async def create_or_update_conversation(self, jinja: Environment) -> str:
"""
Send new user message to converation
"""Send new user message to converation
"""
user_info: SlackUser = self.slack_to_openhands_user
saas_user_auth: UserAuth = self.saas_user_auth

View File

@ -0,0 +1,30 @@
"""add v1 column to slack conversation table
Revision ID: 084
Revises: 083
Create Date: 2025-12-02 15:30:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '084'
down_revision: Union[str, None] = '083'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add v1 column
op.add_column(
'slack_conversation', sa.Column('v1', sa.Boolean(), nullable=True)
)
def downgrade() -> None:
# Drop v1 column
op.drop_column('slack_conversation', 'v1')

View File

@ -1,4 +1,4 @@
from sqlalchemy import Column, Identity, Integer, String
from sqlalchemy import Boolean, Column, Identity, Integer, String
from storage.base import Base
@ -9,3 +9,4 @@ class SlackConversation(Base): # type: ignore
channel_id = Column(String, nullable=False)
keycloak_user_id = Column(String, nullable=False)
parent_id = Column(String, nullable=True, index=True)
v1 = Column(Boolean, nullable=True)

View File

@ -14,8 +14,7 @@ class SlackConversationStore:
async def get_slack_conversation(
self, channel_id: str, parent_id: str
) -> SlackConversation | None:
"""
Get a slack conversation by channel_id and message_ts.
"""Get a slack conversation by channel_id and message_ts.
Both parameters are required to match for a conversation to be returned.
"""
with session_maker() as session:
@ -28,6 +27,20 @@ class SlackConversationStore:
return conversation
async def get_slack_conversation_by_id(
self, conversation_id: str
) -> SlackConversation | None:
"""Get a slack conversation by conversation_id.
"""
with session_maker() as session:
conversation = (
session.query(SlackConversation)
.filter(SlackConversation.conversation_id == conversation_id)
.first()
)
return conversation
async def create_slack_conversation(
self, slack_converstion: SlackConversation
) -> None: