mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
227 lines
9.0 KiB
Python
227 lines
9.0 KiB
Python
from dataclasses import dataclass
|
|
|
|
from integrations.linear.linear_types import LinearViewInterface, StartingConvoException
|
|
from integrations.models import JobContext
|
|
from integrations.utils import CONVERSATION_URL, get_final_agent_observation
|
|
from jinja2 import Environment
|
|
from storage.linear_conversation import LinearConversation
|
|
from storage.linear_integration_store import LinearIntegrationStore
|
|
from storage.linear_user import LinearUser
|
|
from storage.linear_workspace import LinearWorkspace
|
|
|
|
from openhands.core.logger import openhands_logger as logger
|
|
from openhands.core.schema.agent import AgentState
|
|
from openhands.events.action import MessageAction
|
|
from openhands.events.serialization.event import event_to_dict
|
|
from openhands.server.services.conversation_service import (
|
|
create_new_conversation,
|
|
setup_init_conversation_settings,
|
|
)
|
|
from openhands.server.shared import ConversationStoreImpl, config, conversation_manager
|
|
from openhands.server.user_auth.user_auth import UserAuth
|
|
from openhands.storage.data_models.conversation_metadata import ConversationTrigger
|
|
|
|
integration_store = LinearIntegrationStore.get_instance()
|
|
|
|
|
|
@dataclass
|
|
class LinearNewConversationView(LinearViewInterface):
|
|
job_context: JobContext
|
|
saas_user_auth: UserAuth
|
|
linear_user: LinearUser
|
|
linear_workspace: LinearWorkspace
|
|
selected_repo: str | None
|
|
conversation_id: str
|
|
|
|
def _get_instructions(self, jinja_env: Environment) -> tuple[str, str]:
|
|
"""Instructions passed when conversation is first initialized"""
|
|
|
|
instructions_template = jinja_env.get_template('linear_instructions.j2')
|
|
instructions = instructions_template.render()
|
|
|
|
user_msg_template = jinja_env.get_template('linear_new_conversation.j2')
|
|
|
|
user_msg = user_msg_template.render(
|
|
issue_key=self.job_context.issue_key,
|
|
issue_title=self.job_context.issue_title,
|
|
issue_description=self.job_context.issue_description,
|
|
user_message=self.job_context.user_msg or '',
|
|
)
|
|
|
|
return instructions, user_msg
|
|
|
|
async def create_or_update_conversation(self, jinja_env: Environment) -> str:
|
|
"""Create a new Linear conversation"""
|
|
|
|
if not self.selected_repo:
|
|
raise StartingConvoException('No repository selected for this conversation')
|
|
|
|
provider_tokens = await self.saas_user_auth.get_provider_tokens()
|
|
user_secrets = await self.saas_user_auth.get_secrets()
|
|
instructions, user_msg = self._get_instructions(jinja_env)
|
|
|
|
try:
|
|
agent_loop_info = await create_new_conversation(
|
|
user_id=self.linear_user.keycloak_user_id,
|
|
git_provider_tokens=provider_tokens,
|
|
selected_repository=self.selected_repo,
|
|
selected_branch=None,
|
|
initial_user_msg=user_msg,
|
|
conversation_instructions=instructions,
|
|
image_urls=None,
|
|
replay_json=None,
|
|
conversation_trigger=ConversationTrigger.LINEAR,
|
|
custom_secrets=user_secrets.custom_secrets if user_secrets else None,
|
|
)
|
|
|
|
self.conversation_id = agent_loop_info.conversation_id
|
|
|
|
logger.info(f'[Linear] Created conversation {self.conversation_id}')
|
|
|
|
# Store Linear conversation mapping
|
|
linear_conversation = LinearConversation(
|
|
conversation_id=self.conversation_id,
|
|
issue_id=self.job_context.issue_id,
|
|
issue_key=self.job_context.issue_key,
|
|
linear_user_id=self.linear_user.id,
|
|
)
|
|
|
|
await integration_store.create_conversation(linear_conversation)
|
|
|
|
return self.conversation_id
|
|
except Exception as e:
|
|
logger.error(
|
|
f'[Linear] Failed to create conversation: {str(e)}', exc_info=True
|
|
)
|
|
raise StartingConvoException(f'Failed to create conversation: {str(e)}')
|
|
|
|
def get_response_msg(self) -> str:
|
|
"""Get the response message to send back to Linear"""
|
|
conversation_link = CONVERSATION_URL.format(self.conversation_id)
|
|
return f"I'm on it! {self.job_context.display_name} can [track my progress here]({conversation_link})."
|
|
|
|
|
|
@dataclass
|
|
class LinearExistingConversationView(LinearViewInterface):
|
|
job_context: JobContext
|
|
saas_user_auth: UserAuth
|
|
linear_user: LinearUser
|
|
linear_workspace: LinearWorkspace
|
|
selected_repo: str | None
|
|
conversation_id: str
|
|
|
|
def _get_instructions(self, jinja_env: Environment) -> tuple[str, str]:
|
|
"""Instructions passed when conversation is first initialized"""
|
|
|
|
user_msg_template = jinja_env.get_template('linear_existing_conversation.j2')
|
|
user_msg = user_msg_template.render(
|
|
issue_key=self.job_context.issue_key,
|
|
user_message=self.job_context.user_msg or '',
|
|
issue_title=self.job_context.issue_title,
|
|
issue_description=self.job_context.issue_description,
|
|
)
|
|
|
|
return '', user_msg
|
|
|
|
async def create_or_update_conversation(self, jinja_env: Environment) -> str:
|
|
"""Update an existing Linear conversation"""
|
|
|
|
user_id = self.linear_user.keycloak_user_id
|
|
|
|
try:
|
|
conversation_store = await ConversationStoreImpl.get_instance(
|
|
config, user_id
|
|
)
|
|
|
|
try:
|
|
await conversation_store.get_metadata(self.conversation_id)
|
|
except FileNotFoundError:
|
|
raise StartingConvoException('Conversation no longer exists.')
|
|
|
|
provider_tokens = await self.saas_user_auth.get_provider_tokens()
|
|
if provider_tokens is None:
|
|
raise ValueError('Could not load provider tokens')
|
|
providers_set = list(provider_tokens.keys())
|
|
|
|
conversation_init_data = await setup_init_conversation_settings(
|
|
user_id, self.conversation_id, providers_set
|
|
)
|
|
|
|
# Either join ongoing conversation, or restart the conversation
|
|
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
|
|
self.conversation_id, conversation_init_data, user_id
|
|
)
|
|
|
|
final_agent_observation = get_final_agent_observation(
|
|
agent_loop_info.event_store
|
|
)
|
|
agent_state = (
|
|
None
|
|
if len(final_agent_observation) == 0
|
|
else final_agent_observation[0].agent_state
|
|
)
|
|
|
|
if not agent_state or agent_state == AgentState.LOADING:
|
|
raise StartingConvoException('Conversation is still starting')
|
|
|
|
_, user_msg = self._get_instructions(jinja_env)
|
|
user_message_event = MessageAction(content=user_msg)
|
|
await conversation_manager.send_event_to_conversation(
|
|
self.conversation_id, event_to_dict(user_message_event)
|
|
)
|
|
|
|
return self.conversation_id
|
|
except Exception as e:
|
|
logger.error(
|
|
f'[Linear] Failed to create conversation: {str(e)}', exc_info=True
|
|
)
|
|
raise StartingConvoException(f'Failed to create conversation: {str(e)}')
|
|
|
|
def get_response_msg(self) -> str:
|
|
"""Get the response message to send back to Linear"""
|
|
conversation_link = CONVERSATION_URL.format(self.conversation_id)
|
|
return f"I'm on it! {self.job_context.display_name} can [continue tracking my progress here]({conversation_link})."
|
|
|
|
|
|
class LinearFactory:
|
|
"""Factory for creating Linear views based on message content"""
|
|
|
|
@staticmethod
|
|
async def create_linear_view_from_payload(
|
|
job_context: JobContext,
|
|
saas_user_auth: UserAuth,
|
|
linear_user: LinearUser,
|
|
linear_workspace: LinearWorkspace,
|
|
) -> LinearViewInterface:
|
|
"""Create appropriate Linear view based on the message and user state"""
|
|
|
|
if not linear_user or not saas_user_auth or not linear_workspace:
|
|
raise StartingConvoException(
|
|
'User not authenticated with Linear integration'
|
|
)
|
|
|
|
conversation = await integration_store.get_user_conversations_by_issue_id(
|
|
job_context.issue_id, linear_user.id
|
|
)
|
|
if conversation:
|
|
logger.info(
|
|
f'[Linear] Found existing conversation for issue {job_context.issue_id}'
|
|
)
|
|
return LinearExistingConversationView(
|
|
job_context=job_context,
|
|
saas_user_auth=saas_user_auth,
|
|
linear_user=linear_user,
|
|
linear_workspace=linear_workspace,
|
|
selected_repo=None,
|
|
conversation_id=conversation.conversation_id,
|
|
)
|
|
|
|
return LinearNewConversationView(
|
|
job_context=job_context,
|
|
saas_user_auth=saas_user_auth,
|
|
linear_user=linear_user,
|
|
linear_workspace=linear_workspace,
|
|
selected_repo=None, # Will be set later after repo inference
|
|
conversation_id='', # Will be set when conversation is created
|
|
)
|