OpenHands/openhands/integrations/bitbucket/bitbucket_service.py
Rohit Malhotra edc95141f7
Implement branch pagination for repository selection and improve UI async dropdown behaviour (#10588)
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: amanape <83104063+amanape@users.noreply.github.com>
2025-08-29 03:38:42 +00:00

764 lines
28 KiB
Python

import base64
import os
import re
from typing import Any
import httpx
from pydantic import SecretStr
from openhands.core.logger import openhands_logger as logger
from openhands.integrations.service_types import (
BaseGitService,
Branch,
GitService,
InstallationsService,
OwnerType,
PaginatedBranchesResponse,
ProviderType,
Repository,
RequestMethod,
ResourceNotFoundError,
SuggestedTask,
User,
)
from openhands.microagent.types import MicroagentContentResponse
from openhands.server.types import AppMode
from openhands.utils.import_utils import get_impl
class BitBucketService(BaseGitService, GitService, InstallationsService):
"""Default implementation of GitService for Bitbucket integration.
This is an extension point in OpenHands that allows applications to customize Bitbucket
integration behavior. Applications can substitute their own implementation by:
1. Creating a class that inherits from GitService
2. Implementing all required methods
3. Setting server_config.bitbucket_service_class to the fully qualified name of the class
The class is instantiated via get_impl() in openhands.server.shared.py.
"""
BASE_URL = 'https://api.bitbucket.org/2.0'
token: SecretStr = SecretStr('')
refresh = False
def __init__(
self,
user_id: str | None = None,
external_auth_id: str | None = None,
external_auth_token: SecretStr | None = None,
token: SecretStr | None = None,
external_token_manager: bool = False,
base_domain: str | None = None,
):
self.user_id = user_id
self.external_token_manager = external_token_manager
self.external_auth_id = external_auth_id
self.external_auth_token = external_auth_token
self.base_domain = base_domain or 'bitbucket.org'
if token:
self.token = token
if base_domain:
self.BASE_URL = f'https://api.{base_domain}/2.0'
@property
def provider(self) -> str:
return ProviderType.BITBUCKET.value
def _extract_owner_and_repo(self, repository: str) -> tuple[str, str]:
"""Extract owner and repo from repository string.
Args:
repository: Repository name in format 'workspace/repo_slug'
Returns:
Tuple of (owner, repo)
Raises:
ValueError: If repository format is invalid
"""
parts = repository.split('/')
if len(parts) < 2:
raise ValueError(f'Invalid repository name: {repository}')
return parts[-2], parts[-1]
async def get_latest_token(self) -> SecretStr | None:
"""Get latest working token of the user."""
return self.token
async def _get_cursorrules_url(self, repository: str) -> str:
"""Get the URL for checking .cursorrules file."""
# Get repository details to get the main branch
repo_details = await self.get_repository_details_from_repo_name(repository)
if not repo_details.main_branch:
raise ResourceNotFoundError(
f'Main branch not found for repository {repository}. '
f'This repository may be empty or have no default branch configured.'
)
return f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/.cursorrules'
async def _get_microagents_directory_url(
self, repository: str, microagents_path: str
) -> str:
"""Get the URL for checking microagents directory."""
# Get repository details to get the main branch
repo_details = await self.get_repository_details_from_repo_name(repository)
if not repo_details.main_branch:
raise ResourceNotFoundError(
f'Main branch not found for repository {repository}. '
f'This repository may be empty or have no default branch configured.'
)
return f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/{microagents_path}'
def _get_microagents_directory_params(self, microagents_path: str) -> dict | None:
"""Get parameters for the microagents directory request. Return None if no parameters needed."""
return None
def _is_valid_microagent_file(self, item: dict) -> bool:
"""Check if an item represents a valid microagent file."""
return (
item['type'] == 'commit_file'
and item['path'].endswith('.md')
and not item['path'].endswith('README.md')
)
def _get_file_name_from_item(self, item: dict) -> str:
"""Extract file name from directory item."""
return item['path'].split('/')[-1]
def _get_file_path_from_item(self, item: dict, microagents_path: str) -> str:
"""Extract file path from directory item."""
return item['path']
def _has_token_expired(self, status_code: int) -> bool:
return status_code == 401
async def _get_bitbucket_headers(self) -> dict[str, str]:
"""Get headers for Bitbucket API requests."""
token_value = self.token.get_secret_value()
# Check if the token contains a colon, which indicates it's in username:password format
if ':' in token_value:
auth_str = base64.b64encode(token_value.encode()).decode()
return {
'Authorization': f'Basic {auth_str}',
'Accept': 'application/json',
}
else:
return {
'Authorization': f'Bearer {token_value}',
'Accept': 'application/json',
}
async def _make_request(
self,
url: str,
params: dict | None = None,
method: RequestMethod = RequestMethod.GET,
) -> tuple[Any, dict]:
"""Make a request to the Bitbucket API.
Args:
url: The URL to request
params: Optional parameters for the request
method: The HTTP method to use
Returns:
A tuple of (response_data, response_headers)
"""
try:
async with httpx.AsyncClient() as client:
bitbucket_headers = await self._get_bitbucket_headers()
response = await self.execute_request(
client, url, bitbucket_headers, params, method
)
if self.refresh and self._has_token_expired(response.status_code):
await self.get_latest_token()
bitbucket_headers = await self._get_bitbucket_headers()
response = await self.execute_request(
client=client,
url=url,
headers=bitbucket_headers,
params=params,
method=method,
)
response.raise_for_status()
return response.json(), dict(response.headers)
except httpx.HTTPStatusError as e:
raise self.handle_http_status_error(e)
except httpx.HTTPError as e:
raise self.handle_http_error(e)
async def get_user(self) -> User:
"""Get the authenticated user's information."""
url = f'{self.BASE_URL}/user'
data, _ = await self._make_request(url)
account_id = data.get('account_id', '')
return User(
id=account_id,
login=data.get('username', ''),
avatar_url=data.get('links', {}).get('avatar', {}).get('href', ''),
name=data.get('display_name'),
email=None, # Bitbucket API doesn't return email in this endpoint
)
def _parse_repository(
self, repo: dict, link_header: str | None = None
) -> Repository:
"""Parse a Bitbucket API repository response into a Repository object.
Args:
repo: Repository data from Bitbucket API
link_header: Optional link header for pagination
Returns:
Repository object
"""
repo_id = repo.get('uuid', '')
workspace_slug = repo.get('workspace', {}).get('slug', '')
repo_slug = repo.get('slug', '')
full_name = (
f'{workspace_slug}/{repo_slug}' if workspace_slug and repo_slug else ''
)
is_public = not repo.get('is_private', True)
owner_type = OwnerType.ORGANIZATION
main_branch = repo.get('mainbranch', {}).get('name')
return Repository(
id=repo_id,
full_name=full_name, # type: ignore[arg-type]
git_provider=ProviderType.BITBUCKET,
is_public=is_public,
stargazers_count=None, # Bitbucket doesn't have stars
pushed_at=repo.get('updated_on'),
owner_type=owner_type,
link_header=link_header,
main_branch=main_branch,
)
async def search_repositories(
self, query: str, per_page: int, sort: str, order: str, public: bool
) -> list[Repository]:
"""Search for repositories."""
repositories = []
if public:
# Extract workspace and repo from URL
# URL format: https://{domain}/{workspace}/{repo}/{additional_params}
# Split by '/' and find workspace and repo parts
url_parts = query.split('/')
if len(url_parts) >= 5: # https:, '', domain, workspace, repo
workspace_slug = url_parts[3]
repo_name = url_parts[4]
repo = await self.get_repository_details_from_repo_name(
f'{workspace_slug}/{repo_name}'
)
repositories.append(repo)
return repositories
# Search for repos once workspace prefix exists
if '/' in query:
workspace_slug, repo_query = query.split('/', 1)
return await self.get_paginated_repos(
1, per_page, sort, workspace_slug, repo_query
)
all_installations = await self.get_installations()
# Workspace prefix isn't complete. Search workspace names and repos underneath each workspace
matching_workspace_slugs = [
installation for installation in all_installations if query in installation
]
for workspace_slug in matching_workspace_slugs:
# Get repositories where query matches workspace name
try:
repos = await self.get_paginated_repos(
1, per_page, sort, workspace_slug
)
repositories.extend(repos)
except Exception:
continue
for workspace_slug in all_installations:
# Get repositories in all workspaces where query matches repo name
try:
repos = await self.get_paginated_repos(
1, per_page, sort, workspace_slug, query
)
repositories.extend(repos)
except Exception:
continue
return repositories
async def _get_user_workspaces(self) -> list[dict[str, Any]]:
"""Get all workspaces the user has access to"""
url = f'{self.BASE_URL}/workspaces'
data, _ = await self._make_request(url)
return data.get('values', [])
async def _fetch_paginated_data(
self, url: str, params: dict, max_items: int
) -> list[dict]:
"""Fetch data with pagination support for Bitbucket API.
Args:
url: The API endpoint URL
params: Query parameters for the request
max_items: Maximum number of items to fetch
Returns:
List of data items from all pages
"""
all_items: list[dict] = []
current_url = url
while current_url and len(all_items) < max_items:
response, _ = await self._make_request(current_url, params)
# Extract items from response
page_items = response.get('values', [])
if not page_items: # No more items
break
all_items.extend(page_items)
# Get the next page URL from the response
current_url = response.get('next')
# Clear params for subsequent requests since the next URL already contains all parameters
params = {}
return all_items[:max_items] # Trim to max_items if needed
async def get_installations(
self, query: str | None = None, limit: int = 100
) -> list[str]:
workspaces_url = f'{self.BASE_URL}/workspaces'
params = {}
if query:
params['q'] = f'name~"{query}"'
workspaces = await self._fetch_paginated_data(workspaces_url, params, limit)
installations: list[str] = []
for workspace in workspaces:
installations.append(workspace['slug'])
return installations
async def get_paginated_repos(
self,
page: int,
per_page: int,
sort: str,
installation_id: str | None,
query: str | None = None,
) -> list[Repository]:
"""Get paginated repositories for a specific workspace.
Args:
page: The page number to fetch
per_page: The number of repositories per page
sort: The sort field ('pushed', 'updated', 'created', 'full_name')
installation_id: The workspace slug to fetch repositories from (as int, will be converted to string)
Returns:
A list of Repository objects
"""
if not installation_id:
return []
# Convert installation_id to string for use as workspace_slug
workspace_slug = installation_id
workspace_repos_url = f'{self.BASE_URL}/repositories/{workspace_slug}'
# Map sort parameter to Bitbucket API compatible values
bitbucket_sort = sort
if sort == 'pushed':
# Bitbucket doesn't support 'pushed', use 'updated_on' instead
bitbucket_sort = '-updated_on' # Use negative prefix for descending order
elif sort == 'updated':
bitbucket_sort = '-updated_on'
elif sort == 'created':
bitbucket_sort = '-created_on'
elif sort == 'full_name':
bitbucket_sort = 'name' # Bitbucket uses 'name' not 'full_name'
else:
# Default to most recently updated first
bitbucket_sort = '-updated_on'
params = {
'pagelen': per_page,
'page': page,
'sort': bitbucket_sort,
}
if query:
params['q'] = f'name~"{query}"'
response, headers = await self._make_request(workspace_repos_url, params)
# Extract repositories from the response
repos = response.get('values', [])
# Extract next URL from response
next_link = response.get('next', '')
# Format the link header in a way that the frontend can understand
# The frontend expects a format like: <url>; rel="next"
# where the URL contains a page parameter
formatted_link_header = ''
if next_link:
# Extract the page number from the next URL if possible
page_match = re.search(r'[?&]page=(\d+)', next_link)
if page_match:
next_page = page_match.group(1)
# Format it in a way that extractNextPageFromLink in frontend can parse
formatted_link_header = (
f'<{workspace_repos_url}?page={next_page}>; rel="next"'
)
else:
# If we can't extract the page, just use the next URL as is
formatted_link_header = f'<{next_link}>; rel="next"'
repositories = [
self._parse_repository(repo, link_header=formatted_link_header)
for repo in repos
]
return repositories
async def get_all_repositories(
self, sort: str, app_mode: AppMode
) -> list[Repository]:
"""Get repositories for the authenticated user using workspaces endpoint.
This method gets all repositories (both public and private) that the user has access to
by iterating through their workspaces and fetching repositories from each workspace.
This approach is more comprehensive and efficient than the previous implementation
that made separate calls for public and private repositories.
"""
MAX_REPOS = 1000
PER_PAGE = 100 # Maximum allowed by Bitbucket API
repositories: list[Repository] = []
# Get user's workspaces with pagination
workspaces_url = f'{self.BASE_URL}/workspaces'
workspaces = await self._fetch_paginated_data(workspaces_url, {}, MAX_REPOS)
for workspace in workspaces:
workspace_slug = workspace.get('slug')
if not workspace_slug:
continue
# Get repositories for this workspace with pagination
workspace_repos_url = f'{self.BASE_URL}/repositories/{workspace_slug}'
# Map sort parameter to Bitbucket API compatible values and ensure descending order
# to show most recently changed repos at the top
bitbucket_sort = sort
if sort == 'pushed':
# Bitbucket doesn't support 'pushed', use 'updated_on' instead
bitbucket_sort = (
'-updated_on' # Use negative prefix for descending order
)
elif sort == 'updated':
bitbucket_sort = '-updated_on'
elif sort == 'created':
bitbucket_sort = '-created_on'
elif sort == 'full_name':
bitbucket_sort = 'name' # Bitbucket uses 'name' not 'full_name'
else:
# Default to most recently updated first
bitbucket_sort = '-updated_on'
params = {
'pagelen': PER_PAGE,
'sort': bitbucket_sort,
}
# Fetch all repositories for this workspace with pagination
workspace_repos = await self._fetch_paginated_data(
workspace_repos_url, params, MAX_REPOS - len(repositories)
)
for repo in workspace_repos:
repositories.append(self._parse_repository(repo))
# Stop if we've reached the maximum number of repositories
if len(repositories) >= MAX_REPOS:
break
# Stop if we've reached the maximum number of repositories
if len(repositories) >= MAX_REPOS:
break
return repositories
async def get_suggested_tasks(self) -> list[SuggestedTask]:
"""Get suggested tasks for the authenticated user across all repositories."""
# TODO: implemented suggested tasks
return []
async def get_repository_details_from_repo_name(
self, repository: str
) -> Repository:
"""Gets all repository details from repository name."""
owner, repo = self._extract_owner_and_repo(repository)
url = f'{self.BASE_URL}/repositories/{owner}/{repo}'
data, _ = await self._make_request(url)
return self._parse_repository(data)
async def get_branches(self, repository: str) -> list[Branch]:
"""Get branches for a repository."""
owner, repo = self._extract_owner_and_repo(repository)
url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches'
# Set maximum branches to fetch (similar to GitHub/GitLab implementations)
MAX_BRANCHES = 1000
PER_PAGE = 100
params = {
'pagelen': PER_PAGE,
'sort': '-target.date', # Sort by most recent commit date, descending
}
# Fetch all branches with pagination
branch_data = await self._fetch_paginated_data(url, params, MAX_BRANCHES)
branches = []
for branch in branch_data:
branches.append(
Branch(
name=branch.get('name', ''),
commit_sha=branch.get('target', {}).get('hash', ''),
protected=False, # Bitbucket doesn't expose this in the API
last_push_date=branch.get('target', {}).get('date', None),
)
)
return branches
async def get_paginated_branches(
self, repository: str, page: int = 1, per_page: int = 30
) -> PaginatedBranchesResponse:
"""Get branches for a repository with pagination."""
# Extract owner and repo from the repository string (e.g., "owner/repo")
parts = repository.split('/')
if len(parts) < 2:
raise ValueError(f'Invalid repository name: {repository}')
owner = parts[-2]
repo = parts[-1]
url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches'
params = {
'pagelen': per_page,
'page': page,
'sort': '-target.date', # Sort by most recent commit date, descending
}
response, _ = await self._make_request(url, params)
branches = []
for branch in response.get('values', []):
branches.append(
Branch(
name=branch.get('name', ''),
commit_sha=branch.get('target', {}).get('hash', ''),
protected=False, # Bitbucket doesn't expose this in the API
last_push_date=branch.get('target', {}).get('date', None),
)
)
# Bitbucket provides pagination info in the response
has_next_page = response.get('next') is not None
total_count = response.get('size') # Total number of items
return PaginatedBranchesResponse(
branches=branches,
has_next_page=has_next_page,
current_page=page,
per_page=per_page,
total_count=total_count,
)
async def search_branches(
self, repository: str, query: str, per_page: int = 30
) -> list[Branch]:
"""Search branches by name using Bitbucket API with `q` param."""
parts = repository.split('/')
if len(parts) < 2:
raise ValueError(f'Invalid repository name: {repository}')
owner = parts[-2]
repo = parts[-1]
url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches'
# Bitbucket filtering: name ~ "query"
params = {
'pagelen': per_page,
'q': f'name~"{query}"',
'sort': '-target.date',
}
response, _ = await self._make_request(url, params)
branches: list[Branch] = []
for branch in response.get('values', []):
branches.append(
Branch(
name=branch.get('name', ''),
commit_sha=branch.get('target', {}).get('hash', ''),
protected=False,
last_push_date=branch.get('target', {}).get('date', None),
)
)
return branches
async def create_pr(
self,
repo_name: str,
source_branch: str,
target_branch: str,
title: str,
body: str | None = None,
draft: bool = False,
) -> str:
"""Creates a pull request in Bitbucket.
Args:
repo_name: The repository name in the format "workspace/repo"
source_branch: The source branch name
target_branch: The target branch name
title: The title of the pull request
body: The description of the pull request
draft: Whether to create a draft pull request
Returns:
The URL of the created pull request
"""
owner, repo = self._extract_owner_and_repo(repo_name)
url = f'{self.BASE_URL}/repositories/{owner}/{repo}/pullrequests'
payload = {
'title': title,
'description': body or '',
'source': {'branch': {'name': source_branch}},
'destination': {'branch': {'name': target_branch}},
'close_source_branch': False,
'draft': draft,
}
data, _ = await self._make_request(
url=url, params=payload, method=RequestMethod.POST
)
# Return the URL to the pull request
return data.get('links', {}).get('html', {}).get('href', '')
async def get_pr_details(self, repository: str, pr_number: int) -> dict:
"""Get detailed information about a specific pull request
Args:
repository: Repository name in format 'owner/repo'
pr_number: The pull request number
Returns:
Raw Bitbucket API response for the pull request
"""
url = f'{self.BASE_URL}/repositories/{repository}/pullrequests/{pr_number}'
pr_data, _ = await self._make_request(url)
return pr_data
async def get_microagent_content(
self, repository: str, file_path: str
) -> MicroagentContentResponse:
"""Fetch individual file content from Bitbucket repository.
Args:
repository: Repository name in format 'workspace/repo_slug'
file_path: Path to the file within the repository
Returns:
MicroagentContentResponse with parsed content and triggers
Raises:
RuntimeError: If file cannot be fetched or doesn't exist
"""
# Step 1: Get repository details using existing method
repo_details = await self.get_repository_details_from_repo_name(repository)
if not repo_details.main_branch:
logger.warning(
f'No main branch found in repository info for {repository}. '
f'Repository response: mainbranch field missing'
)
raise ResourceNotFoundError(
f'Main branch not found for repository {repository}. '
f'This repository may be empty or have no default branch configured.'
)
# Step 2: Get file content using the main branch
file_url = f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/{file_path}'
response, _ = await self._make_request(file_url)
# Parse the content to extract triggers from frontmatter
return self._parse_microagent_content(response, file_path)
async def is_pr_open(self, repository: str, pr_number: int) -> bool:
"""Check if a Bitbucket pull request is still active (not closed/merged).
Args:
repository: Repository name in format 'owner/repo'
pr_number: The PR number to check
Returns:
True if PR is active (OPEN), False if closed/merged
"""
try:
pr_details = await self.get_pr_details(repository, pr_number)
# Bitbucket API response structure
# https://developer.atlassian.com/cloud/bitbucket/rest/api-group-pullrequests/#api-repositories-workspace-repo-slug-pullrequests-pull-request-id-get
if 'state' in pr_details:
# Bitbucket state values: OPEN, MERGED, DECLINED, SUPERSEDED
return pr_details['state'] == 'OPEN'
# If we can't determine the state, assume it's active (safer default)
logger.warning(
f'Could not determine Bitbucket PR status for {repository}#{pr_number}. '
f'Response keys: {list(pr_details.keys())}. Assuming PR is active.'
)
return True
except Exception as e:
logger.warning(
f'Could not determine Bitbucket PR status for {repository}#{pr_number}: {e}. '
f'Including conversation to be safe.'
)
# If we can't determine the PR status, include the conversation to be safe
return True
bitbucket_service_cls = os.environ.get(
'OPENHANDS_BITBUCKET_SERVICE_CLS',
'openhands.integrations.bitbucket.bitbucket_service.BitBucketService',
)
BitBucketServiceImpl = get_impl(BitBucketService, bitbucket_service_cls)