Rohit Malhotra eb616dfae4
Refactor: rename user secrets table to custom secrets (#11525)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-10-27 16:58:07 +00:00

227 lines
9.0 KiB
Python

from dataclasses import dataclass
from integrations.linear.linear_types import LinearViewInterface, StartingConvoException
from integrations.models import JobContext
from integrations.utils import CONVERSATION_URL, get_final_agent_observation
from jinja2 import Environment
from storage.linear_conversation import LinearConversation
from storage.linear_integration_store import LinearIntegrationStore
from storage.linear_user import LinearUser
from storage.linear_workspace import LinearWorkspace
from openhands.core.logger import openhands_logger as logger
from openhands.core.schema.agent import AgentState
from openhands.events.action import MessageAction
from openhands.events.serialization.event import event_to_dict
from openhands.server.services.conversation_service import (
create_new_conversation,
setup_init_conversation_settings,
)
from openhands.server.shared import ConversationStoreImpl, config, conversation_manager
from openhands.server.user_auth.user_auth import UserAuth
from openhands.storage.data_models.conversation_metadata import ConversationTrigger
integration_store = LinearIntegrationStore.get_instance()
@dataclass
class LinearNewConversationView(LinearViewInterface):
job_context: JobContext
saas_user_auth: UserAuth
linear_user: LinearUser
linear_workspace: LinearWorkspace
selected_repo: str | None
conversation_id: str
def _get_instructions(self, jinja_env: Environment) -> tuple[str, str]:
"""Instructions passed when conversation is first initialized"""
instructions_template = jinja_env.get_template('linear_instructions.j2')
instructions = instructions_template.render()
user_msg_template = jinja_env.get_template('linear_new_conversation.j2')
user_msg = user_msg_template.render(
issue_key=self.job_context.issue_key,
issue_title=self.job_context.issue_title,
issue_description=self.job_context.issue_description,
user_message=self.job_context.user_msg or '',
)
return instructions, user_msg
async def create_or_update_conversation(self, jinja_env: Environment) -> str:
"""Create a new Linear conversation"""
if not self.selected_repo:
raise StartingConvoException('No repository selected for this conversation')
provider_tokens = await self.saas_user_auth.get_provider_tokens()
user_secrets = await self.saas_user_auth.get_secrets()
instructions, user_msg = self._get_instructions(jinja_env)
try:
agent_loop_info = await create_new_conversation(
user_id=self.linear_user.keycloak_user_id,
git_provider_tokens=provider_tokens,
selected_repository=self.selected_repo,
selected_branch=None,
initial_user_msg=user_msg,
conversation_instructions=instructions,
image_urls=None,
replay_json=None,
conversation_trigger=ConversationTrigger.LINEAR,
custom_secrets=user_secrets.custom_secrets if user_secrets else None,
)
self.conversation_id = agent_loop_info.conversation_id
logger.info(f'[Linear] Created conversation {self.conversation_id}')
# Store Linear conversation mapping
linear_conversation = LinearConversation(
conversation_id=self.conversation_id,
issue_id=self.job_context.issue_id,
issue_key=self.job_context.issue_key,
linear_user_id=self.linear_user.id,
)
await integration_store.create_conversation(linear_conversation)
return self.conversation_id
except Exception as e:
logger.error(
f'[Linear] Failed to create conversation: {str(e)}', exc_info=True
)
raise StartingConvoException(f'Failed to create conversation: {str(e)}')
def get_response_msg(self) -> str:
"""Get the response message to send back to Linear"""
conversation_link = CONVERSATION_URL.format(self.conversation_id)
return f"I'm on it! {self.job_context.display_name} can [track my progress here]({conversation_link})."
@dataclass
class LinearExistingConversationView(LinearViewInterface):
job_context: JobContext
saas_user_auth: UserAuth
linear_user: LinearUser
linear_workspace: LinearWorkspace
selected_repo: str | None
conversation_id: str
def _get_instructions(self, jinja_env: Environment) -> tuple[str, str]:
"""Instructions passed when conversation is first initialized"""
user_msg_template = jinja_env.get_template('linear_existing_conversation.j2')
user_msg = user_msg_template.render(
issue_key=self.job_context.issue_key,
user_message=self.job_context.user_msg or '',
issue_title=self.job_context.issue_title,
issue_description=self.job_context.issue_description,
)
return '', user_msg
async def create_or_update_conversation(self, jinja_env: Environment) -> str:
"""Update an existing Linear conversation"""
user_id = self.linear_user.keycloak_user_id
try:
conversation_store = await ConversationStoreImpl.get_instance(
config, user_id
)
try:
await conversation_store.get_metadata(self.conversation_id)
except FileNotFoundError:
raise StartingConvoException('Conversation no longer exists.')
provider_tokens = await self.saas_user_auth.get_provider_tokens()
if provider_tokens is None:
raise ValueError('Could not load provider tokens')
providers_set = list(provider_tokens.keys())
conversation_init_data = await setup_init_conversation_settings(
user_id, self.conversation_id, providers_set
)
# Either join ongoing conversation, or restart the conversation
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
self.conversation_id, conversation_init_data, user_id
)
final_agent_observation = get_final_agent_observation(
agent_loop_info.event_store
)
agent_state = (
None
if len(final_agent_observation) == 0
else final_agent_observation[0].agent_state
)
if not agent_state or agent_state == AgentState.LOADING:
raise StartingConvoException('Conversation is still starting')
_, user_msg = self._get_instructions(jinja_env)
user_message_event = MessageAction(content=user_msg)
await conversation_manager.send_event_to_conversation(
self.conversation_id, event_to_dict(user_message_event)
)
return self.conversation_id
except Exception as e:
logger.error(
f'[Linear] Failed to create conversation: {str(e)}', exc_info=True
)
raise StartingConvoException(f'Failed to create conversation: {str(e)}')
def get_response_msg(self) -> str:
"""Get the response message to send back to Linear"""
conversation_link = CONVERSATION_URL.format(self.conversation_id)
return f"I'm on it! {self.job_context.display_name} can [continue tracking my progress here]({conversation_link})."
class LinearFactory:
"""Factory for creating Linear views based on message content"""
@staticmethod
async def create_linear_view_from_payload(
job_context: JobContext,
saas_user_auth: UserAuth,
linear_user: LinearUser,
linear_workspace: LinearWorkspace,
) -> LinearViewInterface:
"""Create appropriate Linear view based on the message and user state"""
if not linear_user or not saas_user_auth or not linear_workspace:
raise StartingConvoException(
'User not authenticated with Linear integration'
)
conversation = await integration_store.get_user_conversations_by_issue_id(
job_context.issue_id, linear_user.id
)
if conversation:
logger.info(
f'[Linear] Found existing conversation for issue {job_context.issue_id}'
)
return LinearExistingConversationView(
job_context=job_context,
saas_user_auth=saas_user_auth,
linear_user=linear_user,
linear_workspace=linear_workspace,
selected_repo=None,
conversation_id=conversation.conversation_id,
)
return LinearNewConversationView(
job_context=job_context,
saas_user_auth=saas_user_auth,
linear_user=linear_user,
linear_workspace=linear_workspace,
selected_repo=None, # Will be set later after repo inference
conversation_id='', # Will be set when conversation is created
)