mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
524 lines
19 KiB
Python
524 lines
19 KiB
Python
import hashlib
|
|
import hmac
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
import httpx
|
|
from fastapi import Request
|
|
from integrations.linear.linear_types import LinearViewInterface
|
|
from integrations.linear.linear_view import (
|
|
LinearExistingConversationView,
|
|
LinearFactory,
|
|
LinearNewConversationView,
|
|
)
|
|
from integrations.manager import Manager
|
|
from integrations.models import JobContext, Message
|
|
from integrations.utils import (
|
|
HOST_URL,
|
|
OPENHANDS_RESOLVER_TEMPLATES_DIR,
|
|
filter_potential_repos_by_user_msg,
|
|
)
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from server.auth.saas_user_auth import get_user_auth_from_keycloak_id
|
|
from server.auth.token_manager import TokenManager
|
|
from server.utils.conversation_callback_utils import register_callback_processor
|
|
from storage.linear_integration_store import LinearIntegrationStore
|
|
from storage.linear_user import LinearUser
|
|
from storage.linear_workspace import LinearWorkspace
|
|
|
|
from openhands.core.logger import openhands_logger as logger
|
|
from openhands.integrations.provider import ProviderHandler
|
|
from openhands.integrations.service_types import Repository
|
|
from openhands.server.shared import server_config
|
|
from openhands.server.types import LLMAuthenticationError, MissingSettingsError
|
|
from openhands.server.user_auth.user_auth import UserAuth
|
|
from openhands.utils.http_session import httpx_verify_option
|
|
|
|
|
|
class LinearManager(Manager):
|
|
def __init__(self, token_manager: TokenManager):
|
|
self.token_manager = token_manager
|
|
self.integration_store = LinearIntegrationStore.get_instance()
|
|
self.api_url = 'https://api.linear.app/graphql'
|
|
self.jinja_env = Environment(
|
|
loader=FileSystemLoader(OPENHANDS_RESOLVER_TEMPLATES_DIR + 'linear')
|
|
)
|
|
|
|
async def authenticate_user(
|
|
self, linear_user_id: str, workspace_id: int
|
|
) -> tuple[LinearUser | None, UserAuth | None]:
|
|
"""Authenticate Linear user and get their OpenHands user auth."""
|
|
|
|
# Find active Linear user by Linear user ID and workspace ID
|
|
linear_user = await self.integration_store.get_active_user(
|
|
linear_user_id, workspace_id
|
|
)
|
|
|
|
if not linear_user:
|
|
logger.warning(
|
|
f'[Linear] No active Linear user found for {linear_user_id} in workspace {workspace_id}'
|
|
)
|
|
return None, None
|
|
|
|
saas_user_auth = await get_user_auth_from_keycloak_id(
|
|
linear_user.keycloak_user_id
|
|
)
|
|
return linear_user, saas_user_auth
|
|
|
|
async def _get_repositories(self, user_auth: UserAuth) -> list[Repository]:
|
|
"""Get repositories that the user has access to."""
|
|
provider_tokens = await user_auth.get_provider_tokens()
|
|
if provider_tokens is None:
|
|
return []
|
|
access_token = await user_auth.get_access_token()
|
|
user_id = await user_auth.get_user_id()
|
|
client = ProviderHandler(
|
|
provider_tokens=provider_tokens,
|
|
external_auth_token=access_token,
|
|
external_auth_id=user_id,
|
|
)
|
|
repos: list[Repository] = await client.get_repositories(
|
|
'pushed', server_config.app_mode, None, None, None, None
|
|
)
|
|
return repos
|
|
|
|
async def validate_request(
|
|
self, request: Request
|
|
) -> Tuple[bool, Optional[str], Optional[Dict]]:
|
|
"""Verify Linear webhook signature."""
|
|
signature = request.headers.get('linear-signature')
|
|
body = await request.body()
|
|
payload = await request.json()
|
|
actor_url = payload.get('actor', {}).get('url', '')
|
|
workspace_name = ''
|
|
|
|
# Extract workspace name from actor URL
|
|
# Format: https://linear.app/{workspace}/profiles/{user}
|
|
if actor_url.startswith('https://linear.app/'):
|
|
url_parts = actor_url.split('/')
|
|
if len(url_parts) >= 4:
|
|
workspace_name = url_parts[3] # Extract workspace name
|
|
else:
|
|
logger.warning(f'[Linear] Invalid actor URL format: {actor_url}')
|
|
return False, None, None
|
|
else:
|
|
logger.warning(
|
|
f'[Linear] Actor URL does not match expected format: {actor_url}'
|
|
)
|
|
return False, None, None
|
|
|
|
if not workspace_name:
|
|
logger.warning('[Linear] No workspace name found in webhook payload')
|
|
return False, None, None
|
|
|
|
if not signature:
|
|
logger.warning('[Linear] No signature found in webhook headers')
|
|
return False, None, None
|
|
|
|
workspace = await self.integration_store.get_workspace_by_name(workspace_name)
|
|
|
|
if not workspace:
|
|
logger.warning('[Linear] Could not identify workspace for webhook')
|
|
return False, None, None
|
|
|
|
if workspace.status != 'active':
|
|
logger.warning(f'[Linear] Workspace {workspace.id} is not active')
|
|
return False, None, None
|
|
|
|
webhook_secret = self.token_manager.decrypt_text(workspace.webhook_secret)
|
|
digest = hmac.new(webhook_secret.encode(), body, hashlib.sha256).hexdigest()
|
|
|
|
if hmac.compare_digest(signature, digest):
|
|
logger.info('[Linear] Webhook signature verified successfully')
|
|
return True, signature, payload
|
|
|
|
return False, None, None
|
|
|
|
def parse_webhook(self, payload: Dict) -> JobContext | None:
|
|
action = payload.get('action')
|
|
type = payload.get('type')
|
|
|
|
if action == 'create' and type == 'Comment':
|
|
data = payload.get('data', {})
|
|
comment = data.get('body', '')
|
|
|
|
if '@openhands' not in comment:
|
|
return None
|
|
|
|
issue_data = data.get('issue', {})
|
|
issue_id = issue_data.get('id', '')
|
|
issue_key = issue_data.get('identifier', '')
|
|
elif action == 'update' and type == 'Issue':
|
|
data = payload.get('data', {})
|
|
labels = data.get('labels', [])
|
|
|
|
has_openhands_label = False
|
|
label_id = ''
|
|
for label in labels:
|
|
if label.get('name') == 'openhands':
|
|
label_id = label.get('id', '')
|
|
has_openhands_label = True
|
|
break
|
|
|
|
if not has_openhands_label and not label_id:
|
|
return None
|
|
|
|
labelIdChanges = data.get('updatedFrom', {}).get('labelIds', [])
|
|
|
|
if labelIdChanges and label_id in labelIdChanges:
|
|
return None # Label was added previously, ignore this webhook
|
|
|
|
issue_id = data.get('id', '')
|
|
issue_key = data.get('identifier', '')
|
|
comment = ''
|
|
|
|
else:
|
|
return None
|
|
|
|
actor = payload.get('actor', {})
|
|
display_name = actor.get('name', '')
|
|
user_email = actor.get('email', '')
|
|
actor_url = actor.get('url', '')
|
|
actor_id = actor.get('id', '')
|
|
workspace_name = ''
|
|
|
|
if actor_url.startswith('https://linear.app/'):
|
|
url_parts = actor_url.split('/')
|
|
if len(url_parts) >= 4:
|
|
workspace_name = url_parts[3] # Extract workspace name
|
|
else:
|
|
logger.warning(f'[Linear] Invalid actor URL format: {actor_url}')
|
|
return None
|
|
else:
|
|
logger.warning(
|
|
f'[Linear] Actor URL does not match expected format: {actor_url}'
|
|
)
|
|
return None
|
|
|
|
if not all(
|
|
[issue_id, issue_key, display_name, user_email, actor_id, workspace_name]
|
|
):
|
|
logger.warning('[Linear] Missing required fields in webhook payload')
|
|
return None
|
|
|
|
return JobContext(
|
|
issue_id=issue_id,
|
|
issue_key=issue_key,
|
|
user_msg=comment,
|
|
user_email=user_email,
|
|
platform_user_id=actor_id,
|
|
workspace_name=workspace_name,
|
|
display_name=display_name,
|
|
)
|
|
|
|
async def receive_message(self, message: Message):
|
|
"""Process incoming Linear webhook message."""
|
|
payload = message.message.get('payload', {})
|
|
job_context = self.parse_webhook(payload)
|
|
|
|
if not job_context:
|
|
logger.info('[Linear] Webhook does not match trigger conditions')
|
|
return
|
|
|
|
# Get workspace by user email domain
|
|
workspace = await self.integration_store.get_workspace_by_name(
|
|
job_context.workspace_name
|
|
)
|
|
if not workspace:
|
|
logger.warning(
|
|
f'[Linear] No workspace found for email domain: {job_context.workspace_name}'
|
|
)
|
|
await self._send_error_comment(
|
|
job_context.issue_id,
|
|
'Your workspace is not configured with Linear integration.',
|
|
None,
|
|
)
|
|
return
|
|
|
|
# Prevent any recursive triggers from the service account
|
|
if job_context.user_email == workspace.svc_acc_email:
|
|
return
|
|
|
|
if workspace.status != 'active':
|
|
logger.warning(f'[Linear] Workspace {workspace.id} is not active')
|
|
await self._send_error_comment(
|
|
job_context.issue_id,
|
|
'Linear integration is not active for your workspace.',
|
|
workspace,
|
|
)
|
|
return
|
|
|
|
# Authenticate user
|
|
linear_user, saas_user_auth = await self.authenticate_user(
|
|
job_context.platform_user_id, workspace.id
|
|
)
|
|
if not linear_user or not saas_user_auth:
|
|
logger.warning(
|
|
f'[Linear] User authentication failed for {job_context.user_email}'
|
|
)
|
|
await self._send_error_comment(
|
|
job_context.issue_id,
|
|
f'User {job_context.user_email} is not authenticated or active in the Linear integration.',
|
|
workspace,
|
|
)
|
|
return
|
|
|
|
# Get issue details
|
|
try:
|
|
api_key = self.token_manager.decrypt_text(workspace.svc_acc_api_key)
|
|
issue_title, issue_description = await self.get_issue_details(
|
|
job_context.issue_id, api_key
|
|
)
|
|
job_context.issue_title = issue_title
|
|
job_context.issue_description = issue_description
|
|
except Exception as e:
|
|
logger.error(f'[Linear] Failed to get issue context: {str(e)}')
|
|
await self._send_error_comment(
|
|
job_context.issue_id,
|
|
'Failed to retrieve issue details. Please check the issue ID and try again.',
|
|
workspace,
|
|
)
|
|
return
|
|
|
|
try:
|
|
# Create Linear view
|
|
linear_view = await LinearFactory.create_linear_view_from_payload(
|
|
job_context,
|
|
saas_user_auth,
|
|
linear_user,
|
|
workspace,
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f'[Linear] Failed to create linear view: {str(e)}', exc_info=True
|
|
)
|
|
await self._send_error_comment(
|
|
job_context.issue_id,
|
|
'Failed to initialize conversation. Please try again.',
|
|
workspace,
|
|
)
|
|
return
|
|
|
|
if not await self.is_job_requested(message, linear_view):
|
|
return
|
|
|
|
await self.start_job(linear_view)
|
|
|
|
async def is_job_requested(
|
|
self, message: Message, linear_view: LinearViewInterface
|
|
) -> bool:
|
|
"""
|
|
Check if a job is requested and handle repository selection.
|
|
"""
|
|
|
|
if isinstance(linear_view, LinearExistingConversationView):
|
|
return True
|
|
|
|
try:
|
|
# Get user repositories
|
|
user_repos: list[Repository] = await self._get_repositories(
|
|
linear_view.saas_user_auth
|
|
)
|
|
|
|
target_str = f'{linear_view.job_context.issue_description}\n{linear_view.job_context.user_msg}'
|
|
|
|
# Try to infer repository from issue description
|
|
match, repos = filter_potential_repos_by_user_msg(target_str, user_repos)
|
|
|
|
if match:
|
|
# Found exact repository match
|
|
linear_view.selected_repo = repos[0].full_name
|
|
logger.info(f'[Linear] Inferred repository: {repos[0].full_name}')
|
|
return True
|
|
else:
|
|
# No clear match - send repository selection comment
|
|
await self._send_repo_selection_comment(linear_view)
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f'[Linear] Error in is_job_requested: {str(e)}')
|
|
return False
|
|
|
|
async def start_job(self, linear_view: LinearViewInterface):
|
|
"""Start a Linear job/conversation."""
|
|
# Import here to prevent circular import
|
|
from server.conversation_callback_processor.linear_callback_processor import (
|
|
LinearCallbackProcessor,
|
|
)
|
|
|
|
try:
|
|
user_info: LinearUser = linear_view.linear_user
|
|
logger.info(
|
|
f'[Linear] Starting job for user {user_info.keycloak_user_id} '
|
|
f'issue {linear_view.job_context.issue_key}',
|
|
)
|
|
|
|
# Create conversation
|
|
conversation_id = await linear_view.create_or_update_conversation(
|
|
self.jinja_env
|
|
)
|
|
|
|
logger.info(
|
|
f'[Linear] Created/Updated conversation {conversation_id} for issue {linear_view.job_context.issue_key}'
|
|
)
|
|
|
|
if isinstance(linear_view, LinearNewConversationView):
|
|
# Register callback processor for updates
|
|
processor = LinearCallbackProcessor(
|
|
issue_id=linear_view.job_context.issue_id,
|
|
issue_key=linear_view.job_context.issue_key,
|
|
workspace_name=linear_view.linear_workspace.name,
|
|
)
|
|
|
|
# Register the callback processor
|
|
register_callback_processor(conversation_id, processor)
|
|
|
|
logger.info(
|
|
f'[Linear] Created callback processor for conversation {conversation_id}'
|
|
)
|
|
|
|
# Send initial response
|
|
msg_info = linear_view.get_response_msg()
|
|
|
|
except MissingSettingsError as e:
|
|
logger.warning(f'[Linear] Missing settings error: {str(e)}')
|
|
msg_info = f'Please re-login into [OpenHands Cloud]({HOST_URL}) before starting a job.'
|
|
|
|
except LLMAuthenticationError as e:
|
|
logger.warning(f'[Linear] LLM authentication error: {str(e)}')
|
|
msg_info = f'Please set a valid LLM API key in [OpenHands Cloud]({HOST_URL}) before starting a job.'
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f'[Linear] Unexpected error starting job: {str(e)}', exc_info=True
|
|
)
|
|
msg_info = 'Sorry, there was an unexpected error starting the job. Please try again.'
|
|
|
|
# Send response comment
|
|
try:
|
|
api_key = self.token_manager.decrypt_text(
|
|
linear_view.linear_workspace.svc_acc_api_key
|
|
)
|
|
await self.send_message(
|
|
self.create_outgoing_message(msg=msg_info),
|
|
linear_view.job_context.issue_id,
|
|
api_key,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f'[Linear] Failed to send response message: {str(e)}')
|
|
|
|
async def _query_api(self, query: str, variables: Dict, api_key: str) -> Dict:
|
|
"""Query Linear GraphQL API."""
|
|
headers = {'Authorization': api_key}
|
|
async with httpx.AsyncClient(verify=httpx_verify_option()) as client:
|
|
response = await client.post(
|
|
self.api_url,
|
|
headers=headers,
|
|
json={'query': query, 'variables': variables},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_issue_details(self, issue_id: str, api_key: str) -> Tuple[str, str]:
|
|
"""Get issue details from Linear API."""
|
|
query = """
|
|
query Issue($issueId: String!) {
|
|
issue(id: $issueId) {
|
|
id
|
|
identifier
|
|
title
|
|
description
|
|
syncedWith {
|
|
metadata {
|
|
... on ExternalEntityInfoGithubMetadata {
|
|
owner
|
|
repo
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
issue_payload = await self._query_api(query, {'issueId': issue_id}, api_key)
|
|
|
|
if not issue_payload:
|
|
raise ValueError(f'Issue with ID {issue_id} not found.')
|
|
|
|
issue_data = issue_payload.get('data', {}).get('issue', {})
|
|
title = issue_data.get('title', '')
|
|
description = issue_data.get('description', '')
|
|
synced_with = issue_data.get('syncedWith', [])
|
|
owner = ''
|
|
repo = ''
|
|
if synced_with:
|
|
owner = synced_with[0].get('metadata', {}).get('owner', '')
|
|
repo = synced_with[0].get('metadata', {}).get('repo', '')
|
|
|
|
if not title:
|
|
raise ValueError(f'Issue with ID {issue_id} does not have a title.')
|
|
|
|
if not description:
|
|
raise ValueError(f'Issue with ID {issue_id} does not have a description.')
|
|
|
|
if owner and repo:
|
|
description += f'\n\nGit Repo: {owner}/{repo}'
|
|
|
|
return title, description
|
|
|
|
async def send_message(self, message: Message, issue_id: str, api_key: str):
|
|
"""Send message/comment to Linear issue."""
|
|
query = """
|
|
mutation CommentCreate($input: CommentCreateInput!) {
|
|
commentCreate(input: $input) {
|
|
success
|
|
comment {
|
|
id
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
variables = {'input': {'issueId': issue_id, 'body': message.message}}
|
|
return await self._query_api(query, variables, api_key)
|
|
|
|
async def _send_error_comment(
|
|
self, issue_id: str, error_msg: str, workspace: LinearWorkspace | None
|
|
):
|
|
"""Send error comment to Linear issue."""
|
|
if not workspace:
|
|
logger.error('[Linear] Cannot send error comment - no workspace available')
|
|
return
|
|
|
|
try:
|
|
api_key = self.token_manager.decrypt_text(workspace.svc_acc_api_key)
|
|
await self.send_message(
|
|
self.create_outgoing_message(msg=error_msg), issue_id, api_key
|
|
)
|
|
except Exception as e:
|
|
logger.error(f'[Linear] Failed to send error comment: {str(e)}')
|
|
|
|
async def _send_repo_selection_comment(self, linear_view: LinearViewInterface):
|
|
"""Send a comment with repository options for the user to choose."""
|
|
try:
|
|
comment_msg = (
|
|
'I need to know which repository to work with. '
|
|
'Please add it to your issue description or send a followup comment.'
|
|
)
|
|
|
|
api_key = self.token_manager.decrypt_text(
|
|
linear_view.linear_workspace.svc_acc_api_key
|
|
)
|
|
|
|
await self.send_message(
|
|
self.create_outgoing_message(msg=comment_msg),
|
|
linear_view.job_context.issue_id,
|
|
api_key,
|
|
)
|
|
|
|
logger.info(
|
|
f'[Linear] Sent repository selection comment for issue {linear_view.job_context.issue_key}'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f'[Linear] Failed to send repository selection comment: {str(e)}'
|
|
)
|