feat(bitbucket): supports cloud and server APIs (#11052)

Co-authored-by: Ray Myers <ray.myers@gmail.com>
Co-authored-by: Chris Bagwell <chris@cnpbagwell.com>
Co-authored-by: CHANGE <joe.laverty@openhands.dev>
Co-authored-by: Joe Laverty <jlav@users.noreply.github.com>
This commit is contained in:
Pierrick Hymbert
2026-03-03 16:51:43 +01:00
committed by GitHub
parent a927b9dc73
commit e7934ea6e5
43 changed files with 3514 additions and 12 deletions

View File

@@ -0,0 +1,107 @@
import os
from pydantic import SecretStr
from openhands.integrations.bitbucket_data_center.service import (
BitbucketDCBranchesMixin,
BitbucketDCFeaturesMixin,
BitbucketDCPRsMixin,
BitbucketDCReposMixin,
BitbucketDCResolverMixin,
)
from openhands.integrations.service_types import (
GitService,
InstallationsService,
ProviderType,
)
from openhands.utils.import_utils import get_impl
class BitbucketDCService(
BitbucketDCResolverMixin,
BitbucketDCBranchesMixin,
BitbucketDCFeaturesMixin,
BitbucketDCPRsMixin,
BitbucketDCReposMixin,
GitService,
InstallationsService,
):
"""Default implementation of GitService for Bitbucket data center integration.
This is an extension point in OpenHands that allows applications to customize Bitbucket data center
integration behavior. Applications can substitute their own implementation by:
1. Creating a class that inherits from GitService
2. Implementing all required methods
3. Setting server_config.bitbucket_service_class to the fully qualified name of the class
The class is instantiated via get_impl() in openhands.server.shared.py.
"""
def __init__(
self,
user_id: str | None = None,
external_auth_id: str | None = None,
external_auth_token: SecretStr | None = None,
token: SecretStr | None = None,
external_token_manager: bool = False,
base_domain: str | None = None,
) -> None:
self.user_id = user_id
self.external_token_manager = external_token_manager
self.external_auth_id = external_auth_id
self.external_auth_token = external_auth_token
self.base_domain = base_domain
self.BASE_URL = f'https://{base_domain}/rest/api/1.0' if base_domain else ''
if token:
token_val = token.get_secret_value()
if ':' not in token_val:
token = SecretStr(f'x-token-auth:{token_val}')
self.token = token
# Derive user_id from token when not explicitly provided.
if not user_id and token:
token_val = token.get_secret_value()
if not token_val.startswith('x-token-auth:'):
user_id = token_val.split(':', 1)[0]
self.user_id = user_id
@property
def provider(self) -> str:
return ProviderType.BITBUCKET_DATA_CENTER.value
bitbucket_dc_service_cls = os.environ.get(
'OPENHANDS_BITBUCKET_DATA_CENTER_SERVICE_CLS',
'openhands.integrations.bitbucket_data_center.bitbucket_dc_service.BitbucketDCService',
)
# Lazy loading to avoid circular imports
_bitbucket_dc_service_impl = None
def get_bitbucket_dc_service_impl():
"""Get the BitBucket data center service implementation with lazy loading."""
global _bitbucket_dc_service_impl
if _bitbucket_dc_service_impl is None:
_bitbucket_dc_service_impl = get_impl(
BitbucketDCService, bitbucket_dc_service_cls
)
return _bitbucket_dc_service_impl
# For backward compatibility, provide the implementation as a property
class _BitbucketDCServiceImplProxy:
"""Proxy class to provide lazy loading for BitbucketDCServiceImpl."""
def __getattr__(self, name):
impl = get_bitbucket_dc_service_impl()
return getattr(impl, name)
def __call__(self, *args, **kwargs):
impl = get_bitbucket_dc_service_impl()
return impl(*args, **kwargs)
BitbucketDCServiceImpl: type[BitbucketDCService] = _BitbucketDCServiceImplProxy() # type: ignore[assignment]

View File

@@ -0,0 +1,15 @@
from .base import BitbucketDCMixinBase
from .branches import BitbucketDCBranchesMixin
from .features import BitbucketDCFeaturesMixin
from .prs import BitbucketDCPRsMixin
from .repos import BitbucketDCReposMixin
from .resolver import BitbucketDCResolverMixin
__all__ = [
'BitbucketDCMixinBase',
'BitbucketDCBranchesMixin',
'BitbucketDCFeaturesMixin',
'BitbucketDCPRsMixin',
'BitbucketDCReposMixin',
'BitbucketDCResolverMixin',
]

View File

@@ -0,0 +1,333 @@
import base64
from typing import Any
import httpx
from pydantic import SecretStr
from openhands.core.logger import openhands_logger as logger
from openhands.integrations.protocols.http_client import HTTPClient
from openhands.integrations.service_types import (
AuthenticationError,
BaseGitService,
OwnerType,
ProviderType,
Repository,
RequestMethod,
ResourceNotFoundError,
User,
)
from openhands.utils.http_session import httpx_verify_option
class BitbucketDCMixinBase(BaseGitService, HTTPClient):
"""
Base mixin for BitBucket data center service containing common functionality
"""
BASE_URL: str = '' # Set dynamically from domain in __init__
user_id: str | None
def _repo_api_base(self, owner: str, repo: str) -> str:
return f'{self.BASE_URL}/projects/{owner}/repos/{repo}'
@staticmethod
def _resolve_primary_email(emails: list[dict]) -> str | None:
"""Find the primary confirmed email from a list of Bitbucket data center email objects.
Bitbucket data center's /user/emails endpoint returns objects with
'email', 'is_primary', and 'is_confirmed' keys.
"""
for entry in emails:
if entry.get('is_primary') and entry.get('is_confirmed'):
return entry.get('email')
return None
def _extract_owner_and_repo(self, repository: str) -> tuple[str, str]:
"""Extract owner and repo from repository string.
Args:
repository: Repository name in format 'project/repo_slug'
Returns:
Tuple of (owner, repo)
Raises:
ValueError: If repository format is invalid
"""
parts = repository.split('/')
if len(parts) < 2:
raise ValueError(f'Invalid repository name: {repository}')
return parts[-2], parts[-1]
async def get_latest_token(self) -> SecretStr | None:
"""Get latest working token of the user."""
return self.token
def _has_token_expired(self, status_code: int) -> bool:
return False # DC tokens cannot be refreshed programmatically
async def _get_headers(self) -> dict[str, str]:
"""Get headers for Bitbucket data center API requests."""
token_value = self.token.get_secret_value()
auth_str = base64.b64encode(token_value.encode()).decode()
return {
'Authorization': f'Basic {auth_str}',
'Accept': 'application/json',
}
async def _make_request(
self,
url: str,
params: dict | None = None,
method: RequestMethod = RequestMethod.GET,
) -> tuple[Any, dict]:
"""Make a request to the Bitbucket data center API.
Args:
url: The URL to request
params: Optional parameters for the request
method: The HTTP method to use
Returns:
A tuple of (response_data, response_headers)
"""
try:
async with httpx.AsyncClient(verify=httpx_verify_option()) as client:
bitbucket_headers = await self._get_headers()
response = await self.execute_request(
client, url, bitbucket_headers, params, method
)
if self.refresh and self._has_token_expired(response.status_code):
await self.get_latest_token()
bitbucket_headers = await self._get_headers()
response = await self.execute_request(
client=client,
url=url,
headers=bitbucket_headers,
params=params,
method=method,
)
response.raise_for_status()
try:
data = response.json()
except ValueError:
data = response.text
return data, dict(response.headers)
except httpx.HTTPStatusError as e:
raise self.handle_http_status_error(e)
except httpx.HTTPError as e:
raise self.handle_http_error(e)
async def verify_access(self) -> None:
"""Verify that the token and host are valid by making a lightweight API call.
Raises an exception if the token is invalid or the host is unreachable.
"""
url = f'{self.BASE_URL}/repos'
await self._make_request(url, {'limit': '1'})
async def _fetch_paginated_data(
self, url: str, params: dict, max_items: int
) -> list[dict]:
"""Fetch data with pagination support for Bitbucket data center API.
Args:
url: The API endpoint URL
params: Query parameters for the request
max_items: Maximum number of items to fetch
Returns:
List of data items from all pages
"""
all_items: list[dict] = []
current_url = url
base_endpoint = url
while current_url and len(all_items) < max_items:
response, _ = await self._make_request(current_url, params)
# Extract items from response
page_items = response.get('values', [])
all_items.extend(page_items)
if response.get('isLastPage', True):
break
next_start = response.get('nextPageStart')
if next_start is None:
break
params = params or {}
params = dict(params)
params['start'] = next_start
current_url = base_endpoint
return all_items[:max_items]
async def get_user_emails(self) -> list[dict]:
"""Fetch the authenticated user's email addresses from Bitbucket data center.
Calls GET /user/emails which returns a paginated response with a
'values' list of email objects containing 'email', 'is_primary',
and 'is_confirmed' fields.
"""
url = f'{self.BASE_URL}/user/emails'
response, _ = await self._make_request(url)
return response.get('values', [])
async def get_user(self) -> User:
"""Get the authenticated user's information."""
if not self.user_id:
# HTTP Access tokens (x-token-auth) don't have user info.
# For OAuth, the user_id should be set.
return User(
id='',
login='',
avatar_url='',
name=None,
email=None,
)
# Basic auth - extract username and query users API to get slug
users_url = f'{self.BASE_URL}/users'
data, _ = await self._make_request(
users_url, {'filter': self.user_id, 'avatarSize': 64}
)
users = data.get('values', [])
if not users:
raise AuthenticationError(f'User not found: {self.user_id}')
user_data = users[0]
avatar = user_data.get('avatarUrl', '')
# Handle relative avatar URLs (Server returns /users/... paths)
if avatar.startswith('/users'):
# Strip /rest/api/1.0 from BASE_URL to get the base server URL
base_server_url = self.BASE_URL.rsplit('/rest/api/1.0', 1)[0]
avatar = f'{base_server_url}{avatar}'
display_name = user_data.get('displayName')
email = user_data.get('emailAddress')
return User(
id=str(user_data.get('id') or user_data.get('slug') or self.user_id),
login=user_data.get('name') or self.user_id,
avatar_url=avatar,
name=display_name,
email=email,
)
async def _parse_repository(
self, repo: dict, link_header: str | None = None
) -> Repository:
"""Parse a Bitbucket data center API repository response into a Repository object.
Args:
repo: Repository data from Bitbucket data center API
link_header: Optional link header for pagination
Returns:
Repository object
"""
project_key = repo.get('project', {}).get('key', '')
repo_slug = repo.get('slug', '')
if not project_key or not repo_slug:
raise ValueError(
f'Cannot parse repository: missing project key or slug. '
f'Got project_key={project_key!r}, repo_slug={repo_slug!r}'
)
full_name = f'{project_key}/{repo_slug}'
is_public = repo.get('public', False)
main_branch: str | None = None
try:
default_branch_url = (
f'{self._repo_api_base(project_key, repo_slug)}/default-branch'
)
default_branch_data, _ = await self._make_request(default_branch_url)
main_branch = default_branch_data.get('displayId') or None
except Exception as e:
logger.debug(f'Could not fetch default branch for {full_name}: {e}')
return Repository(
id=str(repo.get('id', '')),
full_name=full_name,
git_provider=ProviderType.BITBUCKET_DATA_CENTER,
is_public=is_public,
stargazers_count=None,
pushed_at=None,
owner_type=OwnerType.ORGANIZATION,
link_header=link_header,
main_branch=main_branch,
)
async def get_repository_details_from_repo_name(
self, repository: str
) -> Repository:
"""Get repository details from repository name.
Args:
repository: Repository name in format 'project/repo_slug'
Returns:
Repository object with details
"""
owner, repo = self._extract_owner_and_repo(repository)
url = self._repo_api_base(owner, repo)
data, _ = await self._make_request(url)
return await self._parse_repository(data)
async def _get_cursorrules_url(self, repository: str) -> str:
"""Get the URL for checking .cursorrules file."""
# Get repository details to get the main branch
repo_details = await self.get_repository_details_from_repo_name(repository)
if not repo_details.main_branch:
raise ResourceNotFoundError(
f'Main branch not found for repository {repository}. '
f'This repository may be empty or have no default branch configured.'
)
owner, repo = self._extract_owner_and_repo(repository)
return (
f'{self.BASE_URL}/projects/{owner}/repos/{repo}/browse/.cursorrules'
f'?at=refs/heads/{repo_details.main_branch}'
)
async def _get_microagents_directory_url(
self, repository: str, microagents_path: str
) -> str:
"""Get the URL for checking microagents directory."""
# Get repository details to get the main branch
repo_details = await self.get_repository_details_from_repo_name(repository)
if not repo_details.main_branch:
raise ResourceNotFoundError(
f'Main branch not found for repository {repository}. '
f'This repository may be empty or have no default branch configured.'
)
owner, repo = self._extract_owner_and_repo(repository)
return (
f'{self.BASE_URL}/projects/{owner}/repos/{repo}/browse/{microagents_path}'
f'?at=refs/heads/{repo_details.main_branch}'
)
def _get_microagents_directory_params(self, microagents_path: str) -> dict | None:
"""Get parameters for the microagents directory request. Return None if no parameters needed."""
return None
def _is_valid_microagent_file(self, item: dict) -> bool:
"""Check if an item represents a valid microagent file."""
file_name = item.get('path', {}).get('name', '')
return (
item.get('type') == 'FILE'
and file_name.endswith('.md')
and file_name != 'README.md'
)
def _get_file_name_from_item(self, item: dict) -> str:
"""Extract file name from directory item."""
return item.get('path', {}).get('name', '')
def _get_file_path_from_item(self, item: dict, microagents_path: str) -> str:
"""Extract file path from directory item."""
file_name = self._get_file_name_from_item(item)
return f'{microagents_path}/{file_name}'

View File

@@ -0,0 +1,136 @@
from datetime import datetime, timezone
from openhands.integrations.bitbucket_data_center.service.base import (
BitbucketDCMixinBase,
)
from openhands.integrations.service_types import Branch, PaginatedBranchesResponse
class BitbucketDCBranchesMixin(BitbucketDCMixinBase):
"""
Mixin for BitBucket data center branch-related operations
"""
async def get_branches(self, repository: str) -> list[Branch]:
"""Get branches for a repository."""
owner, repo = self._extract_owner_and_repo(repository)
url = f'{self._repo_api_base(owner, repo)}/branches'
# Set maximum branches to fetch (similar to GitHub/GitLab implementations)
MAX_BRANCHES = 1000
PER_PAGE = 100
params = {
'limit': PER_PAGE,
'orderBy': 'MODIFICATION',
}
# Fetch all branches with pagination
branch_data = await self._fetch_paginated_data(url, params, MAX_BRANCHES)
return [self._parse_branch(branch) for branch in branch_data]
async def get_paginated_branches(
self, repository: str, page: int = 1, per_page: int = 30
) -> PaginatedBranchesResponse:
"""Get branches for a repository with pagination."""
# Extract owner and repo from the repository string (e.g., "owner/repo")
owner, repo = self._extract_owner_and_repo(repository)
parts = repository.split('/')
if len(parts) < 2:
raise ValueError(f'Invalid repository name: {repository}')
owner = parts[-2]
repo = parts[-1]
url = f'{self._repo_api_base(owner, repo)}/branches'
start = max((page - 1) * per_page, 0)
params = {
'limit': per_page,
'start': start,
'orderBy': 'MODIFICATION',
}
response, _ = await self._make_request(url, params)
branches = [self._parse_branch(branch) for branch in response.get('values', [])]
has_next_page = not response.get('isLastPage', True)
total_count = response.get('size')
return PaginatedBranchesResponse(
branches=branches,
has_next_page=has_next_page,
current_page=page,
per_page=per_page,
total_count=total_count,
)
async def search_branches(
self, repository: str, query: str, per_page: int = 30
) -> list[Branch]:
"""Search branches by name using Bitbucket data center API with `q` param."""
owner, repo = self._extract_owner_and_repo(repository)
url = f'{self._repo_api_base(owner, repo)}/branches'
params = {
'limit': per_page,
'filterText': query,
'orderBy': 'MODIFICATION',
}
response, _ = await self._make_request(url, params)
return [self._parse_branch(branch) for branch in response.get('values', [])]
def _parse_branch(self, branch: dict) -> Branch:
"""Normalize Bitbucket branch representations across Cloud and Server."""
name = branch.get('displayId') or ''
if not name:
branch_id = branch.get('id', '')
if isinstance(branch_id, str) and branch_id.startswith('refs/heads/'):
name = branch_id.split('refs/heads/', 1)[-1]
elif isinstance(branch_id, str):
name = branch_id
commit_sha = branch.get('latestCommit', '')
last_push_date = self._extract_server_branch_last_modified(branch)
return Branch(
name=name,
commit_sha=commit_sha,
protected=False, # Bitbucket doesn't expose branch protection via these endpoints
last_push_date=last_push_date,
)
def _extract_server_branch_last_modified(self, branch: dict) -> str | None:
"""Extract the last modified timestamp from a Bitbucket Server branch payload."""
metadata = branch.get('metadata')
if not isinstance(metadata, dict):
return None
for value in metadata.values():
if not isinstance(value, list):
continue
for entry in value:
if not isinstance(entry, dict):
continue
timestamp = (
entry.get('authorTimestamp')
or entry.get('committerTimestamp')
or entry.get('timestamp')
or entry.get('lastModified')
)
if isinstance(timestamp, (int, float)):
return datetime.fromtimestamp(
timestamp / 1000, tz=timezone.utc
).isoformat()
if isinstance(timestamp, str):
# Some Bitbucket instances might already return ISO 8601 strings
return timestamp
return None

View File

@@ -0,0 +1,96 @@
from openhands.core.logger import openhands_logger as logger
from openhands.integrations.bitbucket_data_center.service.base import (
BitbucketDCMixinBase,
)
from openhands.integrations.service_types import ResourceNotFoundError
from openhands.microagent.types import MicroagentContentResponse, MicroagentResponse
class BitbucketDCFeaturesMixin(BitbucketDCMixinBase):
"""
Mixin for BitBucket data center feature operations (microagents, cursor rules, etc.)
"""
async def get_microagent_content(
self, repository: str, file_path: str
) -> MicroagentContentResponse:
"""Fetch individual file content from Bitbucket data center repository.
Args:
repository: Repository name in format 'project/repo_slug'
file_path: Path to the file within the repository
Returns:
MicroagentContentResponse with parsed content and triggers
Raises:
RuntimeError: If file cannot be fetched or doesn't exist
"""
# Step 1: Get repository details using existing method
repo_details = await self.get_repository_details_from_repo_name(repository)
if not repo_details.main_branch:
logger.warning(
f'No main branch found in repository info for {repository}. '
f'Repository response: mainbranch field missing'
)
raise ResourceNotFoundError(
f'Main branch not found for repository {repository}. '
f'This repository may be empty or have no default branch configured.'
)
# Step 2: Get file content using the main branch
owner, repo = self._extract_owner_and_repo(repository)
repo_base = self._repo_api_base(owner, repo)
file_url = f'{repo_base}/browse/{file_path}'
params = {'at': f'refs/heads/{repo_details.main_branch}'}
response, _ = await self._make_request(file_url, params=params)
if isinstance(response, dict):
lines = response.get('lines')
if isinstance(lines, list):
content = '\n'.join(
line.get('text', '') for line in lines if isinstance(line, dict)
)
else:
content = response.get('content', '')
else:
content = str(response)
# Parse the content to extract triggers from frontmatter
return self._parse_microagent_content(content, file_path)
async def _process_microagents_directory(
self, repository: str, microagents_path: str
) -> list[MicroagentResponse]:
microagents = []
try:
directory_url = await self._get_microagents_directory_url(
repository, microagents_path
)
directory_params = self._get_microagents_directory_params(microagents_path)
response, _ = await self._make_request(directory_url, directory_params)
# Bitbucket DC browse endpoint nests items under response['children']['values']
items = response.get('children', {}).get('values', [])
for item in items:
if self._is_valid_microagent_file(item):
try:
file_name = self._get_file_name_from_item(item)
file_path = self._get_file_path_from_item(
item, microagents_path
)
microagents.append(
self._create_microagent_response(file_name, file_path)
)
except Exception as e:
logger.warning(f'Error processing microagent {item}: {str(e)}')
except ResourceNotFoundError:
logger.info(
f'No microagents directory found in {repository} at {microagents_path}'
)
except Exception as e:
logger.warning(f'Error fetching microagents directory: {str(e)}')
return microagents

View File

@@ -0,0 +1,134 @@
from typing import Any
from openhands.core.logger import openhands_logger as logger
from openhands.integrations.bitbucket_data_center.service.base import (
BitbucketDCMixinBase,
)
from openhands.integrations.service_types import RequestMethod
class BitbucketDCPRsMixin(BitbucketDCMixinBase):
"""
Mixin for BitBucket data center pull request operations
"""
async def create_pr(
self,
repo_name: str,
source_branch: str,
target_branch: str,
title: str,
body: str | None = None,
draft: bool = False,
) -> str:
"""Creates a pull request in Bitbucket data center.
Args:
repo_name: The repository name in the format "project/repo"
source_branch: The source branch name
target_branch: The target branch name
title: The title of the pull request
body: The description of the pull request
draft: Whether to create a draft pull request
Returns:
The URL of the created pull request
"""
owner, repo = self._extract_owner_and_repo(repo_name)
repo_base = self._repo_api_base(owner, repo)
payload: dict[str, Any]
url = f'{repo_base}/pull-requests'
payload = {
'title': title,
'description': body or '',
'fromRef': {
'id': f'refs/heads/{source_branch}',
'repository': {'slug': repo, 'project': {'key': owner}},
},
'toRef': {
'id': f'refs/heads/{target_branch}',
'repository': {'slug': repo, 'project': {'key': owner}},
},
}
data, _ = await self._make_request(
url=url, params=payload, method=RequestMethod.POST
)
# Return the URL to the pull request
links = data.get('links', {}) if isinstance(data, dict) else {}
if isinstance(links, dict):
html_link = links.get('html')
if isinstance(html_link, dict):
href = html_link.get('href')
if href:
return href
if isinstance(html_link, list) and html_link:
href = html_link[0].get('href')
if href:
return href
self_link = links.get('self')
if isinstance(self_link, dict):
href = self_link.get('href')
if href:
return href
if isinstance(self_link, list) and self_link:
href = self_link[0].get('href')
if href:
return href
return ''
async def get_pr_details(self, repository: str, pr_number: int) -> dict:
"""Get detailed information about a specific pull request
Args:
repository: Repository name in format 'owner/repo'
pr_number: The pull request number
Returns:
Raw Bitbucket data center API response for the pull request
"""
owner, repo = self._extract_owner_and_repo(repository)
repo_base = self._repo_api_base(owner, repo)
url = f'{repo_base}/pull-requests/{pr_number}'
pr_data, _ = await self._make_request(url)
return pr_data
async def is_pr_open(self, repository: str, pr_number: int) -> bool:
"""Check if a Bitbucket data center pull request is still active (not closed/merged).
Args:
repository: Repository name in format 'owner/repo'
pr_number: The PR number to check
Returns:
True if PR is active (OPEN), False if closed/merged
"""
try:
pr_details = await self.get_pr_details(repository, pr_number)
# Bitbucket data center API response structure
if 'state' in pr_details:
# Bitbucket data center state values: OPEN, MERGED, DECLINED, SUPERSEDED
return pr_details['state'] == 'OPEN'
# If we can't determine the state, assume it's active (safer default)
logger.warning(
f'Could not determine Bitbucket PR status for {repository}#{pr_number}. '
f'Response keys: {list(pr_details.keys())}. Assuming PR is active.'
)
return True
except Exception as e:
logger.warning(
f'Could not determine Bitbucket PR status for {repository}#{pr_number}: {e}. '
f'Including conversation to be safe.'
)
# If we can't determine the PR status, include the conversation to be safe
return True

View File

@@ -0,0 +1,203 @@
from typing import Any
from urllib.parse import urlparse
from openhands.integrations.bitbucket_data_center.service.base import (
BitbucketDCMixinBase,
)
from openhands.integrations.service_types import Repository, SuggestedTask
from openhands.server.types import AppMode
class BitbucketDCReposMixin(BitbucketDCMixinBase):
"""
Mixin for BitBucket data center repository-related operations
"""
async def search_repositories(
self,
query: str,
per_page: int,
sort: str,
order: str,
public: bool,
app_mode: AppMode,
) -> list[Repository]:
"""Search for repositories."""
repositories = []
if public:
try:
parsed_url = urlparse(query)
path_segments = [
segment for segment in parsed_url.path.split('/') if segment
]
if 'projects' in path_segments:
idx = path_segments.index('projects')
if (
len(path_segments) > idx + 2
and path_segments[idx + 1]
and path_segments[idx + 2] == 'repos'
):
project_key = path_segments[idx + 1]
repo_name = (
path_segments[idx + 3]
if len(path_segments) > idx + 3
else ''
)
elif len(path_segments) > idx + 2:
project_key = path_segments[idx + 1]
repo_name = path_segments[idx + 2]
else:
project_key = ''
repo_name = ''
else:
project_key = path_segments[0] if len(path_segments) >= 1 else ''
repo_name = path_segments[1] if len(path_segments) >= 2 else ''
if project_key and repo_name:
repo = await self.get_repository_details_from_repo_name(
f'{project_key}/{repo_name}'
)
repositories.append(repo)
except (ValueError, IndexError):
pass
return repositories
MAX_REPOS = 1000
# Search for repos once project prefix exists
if '/' in query:
project_slug, repo_query = query.split('/', 1)
project_repos_url = f'{self.BASE_URL}/projects/{project_slug}/repos'
raw_repos = await self._fetch_paginated_data(
project_repos_url, {'limit': per_page}, MAX_REPOS
)
if repo_query:
raw_repos = [
r
for r in raw_repos
if repo_query.lower() in r.get('slug', '').lower()
or repo_query.lower() in r.get('name', '').lower()
]
return [await self._parse_repository(repo) for repo in raw_repos]
# No '/' in query, search across all projects
all_projects = await self.get_installations()
for project_key in all_projects:
try:
repos = await self.get_paginated_repos(
1, per_page, sort, project_key, query
)
repositories.extend(repos)
except Exception:
continue
return repositories
async def _get_user_projects(self) -> list[dict[str, Any]]:
"""Get all projects the user has access to"""
projects_url = f'{self.BASE_URL}/projects'
projects = await self._fetch_paginated_data(projects_url, {}, 100)
return projects
async def get_installations(
self, query: str | None = None, limit: int = 100
) -> list[str]:
projects_url = f'{self.BASE_URL}/projects'
params: dict[str, Any] = {'limit': limit}
projects = await self._fetch_paginated_data(projects_url, params, limit)
project_keys: list[str] = []
for project in projects:
key = project.get('key')
name = project.get('name', '')
if not key:
continue
if query and query.lower() not in f'{key}{name}'.lower():
continue
project_keys.append(key)
return project_keys
async def get_paginated_repos(
self,
page: int,
per_page: int,
sort: str,
installation_id: str | None,
query: str | None = None,
) -> list[Repository]:
"""Get paginated repositories for a specific project.
Args:
page: The page number to fetch
per_page: The number of repositories per page
sort: The sort field ('pushed', 'updated', 'created', 'full_name')
installation_id: The project slug to fetch repositories from (as int, will be converted to string)
Returns:
A list of Repository objects
"""
if not installation_id:
return []
# Convert installation_id to string for use as project_slug
project_slug = installation_id
project_repos_url = f'{self.BASE_URL}/projects/{project_slug}/repos'
# Calculate start offset from page number (Bitbucket Server uses 0-based start index)
start = (page - 1) * per_page
params: dict[str, Any] = {'limit': per_page, 'start': start}
response, _ = await self._make_request(project_repos_url, params)
repos = response.get('values', [])
if query:
repos = [
repo
for repo in repos
if query.lower() in repo.get('slug', '').lower()
or query.lower() in repo.get('name', '').lower()
]
formatted_link_header = ''
if not response.get('isLastPage', True):
next_page = page + 1
# Use 'page=' format for frontend compatibility with extractNextPageFromLink
formatted_link_header = (
f'<{project_repos_url}?page={next_page}>; rel="next"'
)
return [
await self._parse_repository(repo, link_header=formatted_link_header)
for repo in repos
]
async def get_all_repositories(
self, sort: str, app_mode: AppMode
) -> list[Repository]:
"""Get repositories for the authenticated user using workspaces endpoint.
This method gets all repositories (both public and private) that the user has access to
by iterating through their workspaces and fetching repositories from each workspace.
This approach is more comprehensive and efficient than the previous implementation
that made separate calls for public and private repositories.
"""
MAX_REPOS = 1000
PER_PAGE = 100 # Maximum allowed by Bitbucket data center API
repositories: list[Repository] = []
projects = await self.get_installations(limit=MAX_REPOS)
for project_key in projects:
project_repos_url = f'{self.BASE_URL}/projects/{project_key}/repos'
project_repos = await self._fetch_paginated_data(
project_repos_url,
{'limit': PER_PAGE},
MAX_REPOS - len(repositories),
)
for repo in project_repos:
repositories.append(await self._parse_repository(repo))
if len(repositories) >= MAX_REPOS:
break
if len(repositories) >= MAX_REPOS:
break
return repositories
async def get_suggested_tasks(self) -> list[SuggestedTask]:
"""Get suggested tasks for the authenticated user across all repositories."""
# TODO: implemented suggested tasks
return []

View File

@@ -0,0 +1,113 @@
from datetime import datetime, timezone
from openhands.integrations.bitbucket_data_center.service.base import (
BitbucketDCMixinBase,
)
from openhands.integrations.service_types import Comment
class BitbucketDCResolverMixin(BitbucketDCMixinBase):
"""
Helper methods used for the Bitbucket Data Center Resolver
"""
async def get_pr_title_and_body(
self, owner: str, repo_slug: str, pr_id: int
) -> tuple[str, str]:
"""Get the title and body of a pull request.
Args:
owner: Project key (e.g. 'PROJ')
repo_slug: Repository slug
pr_id: Pull request ID
Returns:
A tuple of (title, body)
"""
url = (
f'{self.BASE_URL}/projects/{owner}/repos/{repo_slug}/pull-requests/{pr_id}'
)
response, _ = await self._make_request(url)
title = response.get('title') or ''
body = response.get('description') or ''
return title, body
async def get_pr_comments(
self, owner: str, repo_slug: str, pr_id: int, max_comments: int = 10
) -> list[Comment]:
"""Get comments for a pull request.
Uses the pull-requests/{id}/activities endpoint, filtering for
COMMENTED actions — the same approach used by the resolver interface.
Args:
owner: Project key (e.g. 'PROJ')
repo_slug: Repository slug
pr_id: Pull request ID
max_comments: Maximum number of comments to retrieve
Returns:
List of Comment objects ordered by creation date
"""
url = f'{self.BASE_URL}/projects/{owner}/repos/{repo_slug}/pull-requests/{pr_id}/activities'
all_raw: list[dict] = []
params: dict = {'limit': 100, 'start': 0}
while len(all_raw) < max_comments:
response, _ = await self._make_request(url, params)
for activity in response.get('values', []):
if activity.get('action') == 'COMMENTED':
comment = activity.get('comment', {})
if comment:
all_raw.append(comment)
if response.get('isLastPage', True):
break
next_start = response.get('nextPageStart')
if next_start is None:
break
params = {'limit': 100, 'start': next_start}
return self._process_raw_comments(all_raw, max_comments)
def _process_raw_comments(
self, comments: list, max_comments: int = 10
) -> list[Comment]:
"""Convert raw Bitbucket DC comment dicts to Comment objects."""
all_comments: list[Comment] = []
for comment_data in comments:
# Bitbucket DC activities use epoch milliseconds for createdDate/updatedDate
created_ms = comment_data.get('createdDate')
updated_ms = comment_data.get('updatedDate')
created_at = (
datetime.fromtimestamp(created_ms / 1000, tz=timezone.utc)
if created_ms is not None
else datetime.fromtimestamp(0, tz=timezone.utc)
)
updated_at = (
datetime.fromtimestamp(updated_ms / 1000, tz=timezone.utc)
if updated_ms is not None
else datetime.fromtimestamp(0, tz=timezone.utc)
)
author = (
comment_data.get('author', {}).get('slug')
or comment_data.get('author', {}).get('name')
or 'unknown'
)
all_comments.append(
Comment(
id=str(comment_data.get('id', 'unknown')),
body=self._truncate_comment(comment_data.get('text', '')),
author=author,
created_at=created_at,
updated_at=updated_at,
system=False,
)
)
all_comments.sort(key=lambda c: c.created_at)
return all_comments[-max_comments:]

View File

@@ -22,6 +22,9 @@ from openhands.integrations.azure_devops.azure_devops_service import (
AzureDevOpsServiceImpl,
)
from openhands.integrations.bitbucket.bitbucket_service import BitBucketServiceImpl
from openhands.integrations.bitbucket_data_center.bitbucket_dc_service import (
BitbucketDCServiceImpl,
)
from openhands.integrations.forgejo.forgejo_service import ForgejoServiceImpl
from openhands.integrations.github.github_service import GithubServiceImpl
from openhands.integrations.gitlab.gitlab_service import GitLabServiceImpl
@@ -128,6 +131,7 @@ class ProviderHandler:
ProviderType.GITHUB: GithubServiceImpl,
ProviderType.GITLAB: GitLabServiceImpl,
ProviderType.BITBUCKET: BitBucketServiceImpl,
ProviderType.BITBUCKET_DATA_CENTER: BitbucketDCServiceImpl,
ProviderType.FORGEJO: ForgejoServiceImpl,
ProviderType.AZURE_DEVOPS: AzureDevOpsServiceImpl,
}
@@ -222,6 +226,18 @@ class ProviderHandler:
return []
async def get_bitbucket_dc_projects(self) -> list[str]:
service = cast(
InstallationsService,
self.get_service(ProviderType.BITBUCKET_DATA_CENTER),
)
try:
return await service.get_installations()
except Exception as e:
logger.warning(f'Failed to get bitbucket data center projects {e}')
return []
async def get_azure_devops_organizations(self) -> list[str]:
service = cast(
InstallationsService, self.get_service(ProviderType.AZURE_DEVOPS)
@@ -341,8 +357,9 @@ class ProviderHandler:
def _is_repository_url(self, query: str, provider: ProviderType) -> bool:
"""Check if the query is a repository URL."""
custom_host = self.provider_tokens[provider].host
custom_host_exists = custom_host and custom_host in query
default_host_exists = self.PROVIDER_DOMAINS[provider] in query
custom_host_exists = bool(custom_host and custom_host in query)
default_domain = self.PROVIDER_DOMAINS.get(provider)
default_host_exists = default_domain is not None and default_domain in query
return query.startswith(('http://', 'https://')) and (
custom_host_exists or default_host_exists
@@ -673,7 +690,7 @@ class ProviderHandler:
provider = repository.git_provider
repo_name = repository.full_name
domain = self.PROVIDER_DOMAINS[provider]
domain = self.PROVIDER_DOMAINS.get(provider, '')
# If provider tokens are provided, use the host from the token if available
# Note: For Azure DevOps, don't use the host field as it may contain org/project path
@@ -724,6 +741,24 @@ class ProviderHandler:
else:
# Access token format: use x-token-auth
remote_url = f'{protocol}://x-token-auth:{token_value}@{domain}/{repo_name}.git'
elif provider == ProviderType.BITBUCKET_DATA_CENTER:
# DC uses HTTP Basic auth — token must be in username:token format
project, repo_slug = (
repo_name.split('/', 1)
if '/' in repo_name
else (repo_name, repo_name)
)
scm_path = f'scm/{project.lower()}/{repo_slug}.git'
# Percent-encode each credential part so special characters
# (e.g. @, #, /) don't break the URL.
if ':' in token_value:
dc_user, dc_pass = token_value.split(':', 1)
url_creds = (
f'{quote(dc_user, safe="")}:{quote(dc_pass, safe="")}'
)
else:
url_creds = f'x-token-auth:{quote(token_value, safe="")}'
remote_url = f'{protocol}://{url_creds}@{domain}/{scm_path}'
elif provider == ProviderType.AZURE_DEVOPS:
# Azure DevOps uses PAT with Basic auth
# Format: https://{anything}:{PAT}@dev.azure.com/{org}/{project}/_git/{repo}

View File

@@ -21,6 +21,7 @@ class ProviderType(Enum):
GITHUB = 'github'
GITLAB = 'gitlab'
BITBUCKET = 'bitbucket'
BITBUCKET_DATA_CENTER = 'bitbucket_data_center'
FORGEJO = 'forgejo'
AZURE_DEVOPS = 'azure_devops'
ENTERPRISE_SSO = 'enterprise_sso'
@@ -78,6 +79,16 @@ class SuggestedTask(BaseModel):
'ciProvider': 'Bitbucket',
'requestVerb': 'pull request',
}
elif self.git_provider == ProviderType.BITBUCKET_DATA_CENTER:
return {
'requestType': 'Pull Request',
'requestTypeShort': 'PR',
'apiName': 'Bitbucket Data Center API',
'tokenEnvVar': 'BITBUCKET_DATA_CENTER_TOKEN',
'ciSystem': 'Bitbucket Pipelines',
'ciProvider': 'Bitbucket Data Center',
'requestVerb': 'pull request',
}
raise ValueError(f'Provider {self.git_provider} for suggested task prompts')

View File

@@ -5,6 +5,9 @@ from openhands.integrations.azure_devops.azure_devops_service import (
AzureDevOpsServiceImpl as AzureDevOpsService,
)
from openhands.integrations.bitbucket.bitbucket_service import BitBucketService
from openhands.integrations.bitbucket_data_center.bitbucket_dc_service import (
BitbucketDCService,
)
from openhands.integrations.forgejo.forgejo_service import ForgejoService
from openhands.integrations.github.github_service import GitHubService
from openhands.integrations.gitlab.gitlab_service import GitLabService
@@ -14,7 +17,7 @@ from openhands.integrations.provider import ProviderType
async def validate_provider_token(
token: SecretStr, base_domain: str | None = None
) -> ProviderType | None:
"""Determine whether a token is for GitHub, GitLab, Bitbucket, or Azure DevOps by attempting to get user info from the services.
"""Determine whether a token is for GitHub, GitLab, Bitbucket, Bitbucket Data Center, or Azure DevOps by attempting to get user info from the services.
Args:
token: The token to check
@@ -69,6 +72,18 @@ async def validate_provider_token(
except Exception as e:
bitbucket_error = e
# Try Bitbucket Data Center if a base_domain was provided (always self-hosted)
bitbucket_dc_error = None
if base_domain:
try:
bitbucket_dc_service = BitbucketDCService(
token=token, base_domain=base_domain
)
await bitbucket_dc_service.verify_access()
return ProviderType.BITBUCKET_DATA_CENTER
except Exception as e:
bitbucket_dc_error = e
# Try Azure DevOps last
azure_devops_error = None
try:
@@ -79,7 +94,7 @@ async def validate_provider_token(
azure_devops_error = e
logger.debug(
f'Failed to validate token: {github_error} \n {gitlab_error} \n {forgejo_error} \n {bitbucket_error} \n {azure_devops_error}'
f'Failed to validate token: {github_error} \n {gitlab_error} \n {forgejo_error} \n {bitbucket_error} \n {bitbucket_dc_error} \n {azure_devops_error}'
)
return None